[jira] [Created] (FLINK-12992) All host(s) tried for query failed (tried com.datastax.driver.core.exceptions.TransportException: Error writing...)
yanxiaobin created FLINK-12992: -- Summary: All host(s) tried for query failed (tried com.datastax.driver.core.exceptions.TransportException: Error writing...) Key: FLINK-12992 URL: https://issues.apache.org/jira/browse/FLINK-12992 Project: Flink Issue Type: Bug Components: Connectors / Cassandra Affects Versions: 1.8.0, 1.7.2 Environment: org.apache.flink flink-connector-cassandra_2.11 1.8.0 Reporter: yanxiaobin We are using flink streming application with cassandra connector providing sinks that writes data into a [Apache Cassandra|https://cassandra.apache.org/] database. At first we found the following exceptions:All host(s) tried for query failed (tried com.datastax.driver.core.exceptions.TransportException: Error writing...). This exception will cause the streaming job to fail And we have carefully checked that Cassandra service and network are all normal. Finally, we refer to the source code of DataStax Java Driver that the connector depends on. We found that the real exception caused the problem is as follows: com.datastax.shaded.netty.handler.codec.EncoderException: java.lang.IllegalAccessError: com/datastax/driver/core/Frame at com.datastax.shaded.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:107) at com.datastax.shaded.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:643) at com.datastax.shaded.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:700) at com.datastax.shaded.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:636) at com.datastax.shaded.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:112) at com.datastax.shaded.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:643) at com.datastax.shaded.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:700) at com.datastax.shaded.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:636) at com.datastax.shaded.netty.handler.timeout.IdleStateHandler.write(IdleStateHandler.java:284) at com.datastax.shaded.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:643) at com.datastax.shaded.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:700) at com.datastax.shaded.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:636) at com.datastax.shaded.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:622) at com.datastax.shaded.netty.channel.DefaultChannelPipeline.write(DefaultChannelPipeline.java:939) at com.datastax.shaded.netty.channel.AbstractChannel.write(AbstractChannel.java:234) at com.datastax.driver.core.Connection$Flusher.run(Connection.java:872) at com.datastax.shaded.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:358) at com.datastax.shaded.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357) at com.datastax.shaded.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:112) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.IllegalAccessError: com/datastax/driver/core/Frame at com.datastax.shaded.netty.util.internal.__matchers__.com.datastax.driver.core.FrameMatcher.match(NoOpTypeParameterMatcher.java) at com.datastax.shaded.netty.handler.codec.MessageToMessageEncoder.acceptOutboundMessage(MessageToMessageEncoder.java:77) at com.datastax.shaded.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:84) Based on this exception, we found relevant information [https://datastax-oss.atlassian.net/browse/JAVA-1337?page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel] Because I found that the latest version of flink-cassandra-connector uses the datastax Java driver old version 3.0.0.Perhaps we should upgrade the version on which the connector depends to Java driver 3.3.0+ to avoid this problem. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-16791) Could not deploy Yarn job cluster when using flink-s3-fs-hadoop-1.10.0.jar
[ https://issues.apache.org/jira/browse/FLINK-16791?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17068293#comment-17068293 ] yanxiaobin commented on FLINK-16791: Hi, thanks everyone. Just as [~kkl0u] said, It works to use the plugins mechanism with restricted classloaders as described here [https://ci.apache.org/projects/flink/flink-docs-master/ops/plugins.html]. > Could not deploy Yarn job cluster when using flink-s3-fs-hadoop-1.10.0.jar > --- > > Key: FLINK-16791 > URL: https://issues.apache.org/jira/browse/FLINK-16791 > Project: Flink > Issue Type: Bug > Components: Client / Job Submission >Affects Versions: 1.10.0 >Reporter: yanxiaobin >Priority: Major > > Since aws s3 is needed, so copy the flink-s3-fs-hadoop-1.10.0.jar JAR file to > the {{lib}} directory of Flink distribution. But when I submit a single flink > job on yarn mode, I found the following problems: > > Caused by: java.lang.ClassCastException: > org.apache.hadoop.yarn.proto.YarnServiceProtos$GetClusterNodesRequestProto > cannot be cast to com.google.protobuf.Message > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:225) > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:116) > at com.sun.proxy.$Proxy11.getClusterNodes(Unknown Source) > at > org.apache.hadoop.yarn.api.impl.pb.client.ApplicationClientProtocolPBClientImpl.getClusterNodes(ApplicationClientProtocolPBClientImpl.java:303) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:422) > at > org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:165) > at > org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:157) > at > org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:359) > at com.sun.proxy.$Proxy12.getClusterNodes(Unknown Source) > at > org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.getNodeReports(YarnClientImpl.java:564) > at > org.apache.flink.yarn.YarnClientYarnClusterInformationRetriever.getMaxVcores(YarnClientYarnClusterInformationRetriever.java:43) > at > org.apache.flink.yarn.YarnClusterDescriptor.isReadyForDeployment(YarnClusterDescriptor.java:278) > at > org.apache.flink.yarn.YarnClusterDescriptor.deployInternal(YarnClusterDescriptor.java:444) > at > org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:390) > ... 24 more -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-16791) Could not deploy Yarn job cluster when using flink-s3-fs-hadoop-1.10.0.jar
[ https://issues.apache.org/jira/browse/FLINK-16791?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17071439#comment-17071439 ] yanxiaobin commented on FLINK-16791: ok, thanks. > Could not deploy Yarn job cluster when using flink-s3-fs-hadoop-1.10.0.jar > --- > > Key: FLINK-16791 > URL: https://issues.apache.org/jira/browse/FLINK-16791 > Project: Flink > Issue Type: Bug > Components: Client / Job Submission >Affects Versions: 1.10.0 >Reporter: yanxiaobin >Priority: Major > > Since aws s3 is needed, so copy the flink-s3-fs-hadoop-1.10.0.jar JAR file to > the {{lib}} directory of Flink distribution. But when I submit a single flink > job on yarn mode, I found the following problems: > > Caused by: java.lang.ClassCastException: > org.apache.hadoop.yarn.proto.YarnServiceProtos$GetClusterNodesRequestProto > cannot be cast to com.google.protobuf.Message > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:225) > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:116) > at com.sun.proxy.$Proxy11.getClusterNodes(Unknown Source) > at > org.apache.hadoop.yarn.api.impl.pb.client.ApplicationClientProtocolPBClientImpl.getClusterNodes(ApplicationClientProtocolPBClientImpl.java:303) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:422) > at > org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:165) > at > org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:157) > at > org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:359) > at com.sun.proxy.$Proxy12.getClusterNodes(Unknown Source) > at > org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.getNodeReports(YarnClientImpl.java:564) > at > org.apache.flink.yarn.YarnClientYarnClusterInformationRetriever.getMaxVcores(YarnClientYarnClusterInformationRetriever.java:43) > at > org.apache.flink.yarn.YarnClusterDescriptor.isReadyForDeployment(YarnClusterDescriptor.java:278) > at > org.apache.flink.yarn.YarnClusterDescriptor.deployInternal(YarnClusterDescriptor.java:444) > at > org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:390) > ... 24 more -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16496006#comment-16496006 ] yanxiaobin commented on FLINK-8500: --- So far, it can solve the current problems, but in the long run, there will still be some limitations on the support of future Kafka version. By the way, can we fix this problem in the 1.5.x series? > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Major > Fix For: 1.6.0 > > Attachments: image-2018-01-30-14-58-58-167.png, > image-2018-01-31-10-48-59-633.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16496136#comment-16496136 ] yanxiaobin commented on FLINK-8500: --- [~tzulitai] thanks, greate. > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Major > Fix For: 1.6.0 > > Attachments: image-2018-01-30-14-58-58-167.png, > image-2018-01-31-10-48-59-633.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16496136#comment-16496136 ] yanxiaobin edited comment on FLINK-8500 at 5/31/18 5:42 AM: [~tzulitai] thanks. That is great. was (Author: backlight): [~tzulitai] thanks, greate. > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Major > Fix For: 1.6.0 > > Attachments: image-2018-01-30-14-58-58-167.png, > image-2018-01-31-10-48-59-633.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16451808#comment-16451808 ] yanxiaobin commented on FLINK-8500: --- hi [~aljoscha] :What is your idea? I now solve the problem by adjusting the source code, but it is very inconvenient for future upgrade. I hope it can be resolved in the new distribution as soon as possible so that I do not have to maintain another set of code myself. > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Major > Fix For: 1.6.0 > > Attachments: image-2018-01-30-14-58-58-167.png, > image-2018-01-31-10-48-59-633.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16488779#comment-16488779 ] yanxiaobin commented on FLINK-8500: --- I prefer to to expose Kafka's ConsumerRecord directly(DeserializationSchema), thus ensuring the flexibility of the interface, otherwise there will always be various limitations. > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Major > Fix For: 1.6.0 > > Attachments: image-2018-01-30-14-58-58-167.png, > image-2018-01-31-10-48-59-633.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8794) When using BucketingSink, it happens that one of the files is always in the [.in-progress] state
[ https://issues.apache.org/jira/browse/FLINK-8794?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] yanxiaobin updated FLINK-8794: -- Description: When using BucketingSink, it happens that one of the files is always in the [.in-progress] state. And this state has never changed after that. The underlying use of S3 as storage. {code:java} // code placeholder {code} 2018-02-28 11:58:42 147341619 _part-28-0.in-progress 2018-02-28 12:06:27 147315059 part-0-0 2018-02-28 12:06:27 147462359 part-1-0 2018-02-28 12:06:27 147316006 part-10-0 2018-02-28 12:06:28 147349854 part-100-0 2018-02-28 12:06:27 147421625 part-101-0 2018-02-28 12:06:27 147443830 part-102-0 2018-02-28 12:06:27 147372801 part-103-0 2018-02-28 12:06:27 147343670 part-104-0 .. was:When using BucketingSink, it happens that one of the files is always in the [.in-progress] state. And this state has never changed after that. The underlying use of S3 as storage. > When using BucketingSink, it happens that one of the files is always in the > [.in-progress] state > > > Key: FLINK-8794 > URL: https://issues.apache.org/jira/browse/FLINK-8794 > Project: Flink > Issue Type: Bug > Components: filesystem-connector >Affects Versions: 1.4.0, 1.4.1 >Reporter: yanxiaobin >Priority: Major > > When using BucketingSink, it happens that one of the files is always in the > [.in-progress] state. And this state has never changed after that. The > underlying use of S3 as storage. > > {code:java} > // code placeholder > {code} > 2018-02-28 11:58:42 147341619 _part-28-0.in-progress > 2018-02-28 12:06:27 147315059 part-0-0 > 2018-02-28 12:06:27 147462359 part-1-0 > 2018-02-28 12:06:27 147316006 part-10-0 > 2018-02-28 12:06:28 147349854 part-100-0 > 2018-02-28 12:06:27 147421625 part-101-0 > 2018-02-28 12:06:27 147443830 part-102-0 > 2018-02-28 12:06:27 147372801 part-103-0 > 2018-02-28 12:06:27 147343670 part-104-0 > .. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8794) When using BucketingSink, it happens that one of the files is always in the [.in-progress] state
[ https://issues.apache.org/jira/browse/FLINK-8794?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] yanxiaobin updated FLINK-8794: -- Description: When using BucketingSink, it happens that one of the files is always in the [.in-progress] state. And this state has never changed after that. The underlying use of S3 as storage. {code:java} // code placeholder {code} 2018-02-28 11:58:42 147341619 {color:#d04437}_part-28-0.in-progress{color} 2018-02-28 12:06:27 147315059 part-0-0 2018-02-28 12:06:27 147462359 part-1-0 2018-02-28 12:06:27 147316006 part-10-0 2018-02-28 12:06:28 147349854 part-100-0 2018-02-28 12:06:27 147421625 part-101-0 2018-02-28 12:06:27 147443830 part-102-0 2018-02-28 12:06:27 147372801 part-103-0 2018-02-28 12:06:27 147343670 part-104-0 .. was: When using BucketingSink, it happens that one of the files is always in the [.in-progress] state. And this state has never changed after that. The underlying use of S3 as storage. {code:java} // code placeholder {code} 2018-02-28 11:58:42 147341619 _part-28-0.in-progress 2018-02-28 12:06:27 147315059 part-0-0 2018-02-28 12:06:27 147462359 part-1-0 2018-02-28 12:06:27 147316006 part-10-0 2018-02-28 12:06:28 147349854 part-100-0 2018-02-28 12:06:27 147421625 part-101-0 2018-02-28 12:06:27 147443830 part-102-0 2018-02-28 12:06:27 147372801 part-103-0 2018-02-28 12:06:27 147343670 part-104-0 .. > When using BucketingSink, it happens that one of the files is always in the > [.in-progress] state > > > Key: FLINK-8794 > URL: https://issues.apache.org/jira/browse/FLINK-8794 > Project: Flink > Issue Type: Bug > Components: filesystem-connector >Affects Versions: 1.4.0, 1.4.1 >Reporter: yanxiaobin >Priority: Major > > When using BucketingSink, it happens that one of the files is always in the > [.in-progress] state. And this state has never changed after that. The > underlying use of S3 as storage. > > {code:java} > // code placeholder > {code} > 2018-02-28 11:58:42 147341619 {color:#d04437}_part-28-0.in-progress{color} > 2018-02-28 12:06:27 147315059 part-0-0 > 2018-02-28 12:06:27 147462359 part-1-0 > 2018-02-28 12:06:27 147316006 part-10-0 > 2018-02-28 12:06:28 147349854 part-100-0 > 2018-02-28 12:06:27 147421625 part-101-0 > 2018-02-28 12:06:27 147443830 part-102-0 > 2018-02-28 12:06:27 147372801 part-103-0 > 2018-02-28 12:06:27 147343670 part-104-0 > .. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8794) When using BucketingSink, it happens that one of the files is always in the [.in-progress] state
[ https://issues.apache.org/jira/browse/FLINK-8794?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383209#comment-16383209 ] yanxiaobin commented on FLINK-8794: --- hi, [~aljoscha] Thank you for your reply! There are the following points: 1.What I described above is that there will be such a situation when there is no failure in this job. 2.This happens when a job has a failure(because one of the taskmanager nodes downtime) and recovery. Fault tolerance of a node in distributed computing is necessary.Because this is a problem in this case. 3.When recovery, the previous in-progress and pending files are not cleared,this causes the downstream processor to read excess dirty data. 5.I think we should first place data in computing nodes' local files, then upload them to the distributed file system after the local file is written completely, for example, S3, HDFS. We are blocked of the problem at the moment. and because of this problem, we can't use this job. > When using BucketingSink, it happens that one of the files is always in the > [.in-progress] state > > > Key: FLINK-8794 > URL: https://issues.apache.org/jira/browse/FLINK-8794 > Project: Flink > Issue Type: Bug > Components: filesystem-connector >Affects Versions: 1.4.0, 1.4.1 >Reporter: yanxiaobin >Priority: Major > > When using BucketingSink, it happens that one of the files is always in the > [.in-progress] state. And this state has never changed after that. The > underlying use of S3 as storage. > > {code:java} > // code placeholder > {code} > 2018-02-28 11:58:42 147341619 {color:#d04437}_part-28-0.in-progress{color} > 2018-02-28 12:06:27 147315059 part-0-0 > 2018-02-28 12:06:27 147462359 part-1-0 > 2018-02-28 12:06:27 147316006 part-10-0 > 2018-02-28 12:06:28 147349854 part-100-0 > 2018-02-28 12:06:27 147421625 part-101-0 > 2018-02-28 12:06:27 147443830 part-102-0 > 2018-02-28 12:06:27 147372801 part-103-0 > 2018-02-28 12:06:27 147343670 part-104-0 > .. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8794) When using BucketingSink, it happens that one of the files is always in the [.in-progress] state
[ https://issues.apache.org/jira/browse/FLINK-8794?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383356#comment-16383356 ] yanxiaobin commented on FLINK-8794: --- hi, [~pnowojski] ! Thank you for your suggestion! The downstream processor can ignore the files with "*pending" or "*in-progress" sufixes and "_" prefix, but I don't think it's a good way to deal with it. We can change this behaviour/add an option for BucketingSink to use temporary "in-progress" and "pending" directories instead of prefixes, but the temporary "in-progress" and "pending" directories is still also a subdirectory of the base directory, and the downstream processor may still read the base directory recursively, It also results in reading redundant dirty data. I think the temporary data produced during the program should be isolated from the final output data. Thanks! Also [~kkl0u] could you elaborate why rescaling forced us to keep lingering files? > When using BucketingSink, it happens that one of the files is always in the > [.in-progress] state > > > Key: FLINK-8794 > URL: https://issues.apache.org/jira/browse/FLINK-8794 > Project: Flink > Issue Type: Bug > Components: filesystem-connector >Affects Versions: 1.4.0, 1.4.1 >Reporter: yanxiaobin >Priority: Major > > When using BucketingSink, it happens that one of the files is always in the > [.in-progress] state. And this state has never changed after that. The > underlying use of S3 as storage. > > {code:java} > // code placeholder > {code} > 2018-02-28 11:58:42 147341619 {color:#d04437}_part-28-0.in-progress{color} > 2018-02-28 12:06:27 147315059 part-0-0 > 2018-02-28 12:06:27 147462359 part-1-0 > 2018-02-28 12:06:27 147316006 part-10-0 > 2018-02-28 12:06:28 147349854 part-100-0 > 2018-02-28 12:06:27 147421625 part-101-0 > 2018-02-28 12:06:27 147443830 part-102-0 > 2018-02-28 12:06:27 147372801 part-103-0 > 2018-02-28 12:06:27 147343670 part-104-0 > .. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-8794) When using BucketingSink, it happens that one of the files is always in the [.in-progress] state
[ https://issues.apache.org/jira/browse/FLINK-8794?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383356#comment-16383356 ] yanxiaobin edited comment on FLINK-8794 at 3/2/18 8:40 AM: --- hi, [~pnowojski] Thank you for your suggestion! The downstream processor can ignore the files with "*pending" or "*in-progress" sufixes and "_" prefix, but I don't think it's a good way to deal with it. We can change this behaviour/add an option for BucketingSink to use temporary "in-progress" and "pending" directories instead of prefixes, but the temporary "in-progress" and "pending" directories is still also a subdirectory of the base directory, and the downstream processor may still read the base directory recursively, It also results in reading redundant dirty data. I think the temporary data produced during the program should be isolated from the final output data. Thanks! Also [~kkl0u] could you elaborate why rescaling forced us to keep lingering files? was (Author: backlight): hi, [~pnowojski] ! Thank you for your suggestion! The downstream processor can ignore the files with "*pending" or "*in-progress" sufixes and "_" prefix, but I don't think it's a good way to deal with it. We can change this behaviour/add an option for BucketingSink to use temporary "in-progress" and "pending" directories instead of prefixes, but the temporary "in-progress" and "pending" directories is still also a subdirectory of the base directory, and the downstream processor may still read the base directory recursively, It also results in reading redundant dirty data. I think the temporary data produced during the program should be isolated from the final output data. Thanks! Also [~kkl0u] could you elaborate why rescaling forced us to keep lingering files? > When using BucketingSink, it happens that one of the files is always in the > [.in-progress] state > > > Key: FLINK-8794 > URL: https://issues.apache.org/jira/browse/FLINK-8794 > Project: Flink > Issue Type: Bug > Components: filesystem-connector >Affects Versions: 1.4.0, 1.4.1 >Reporter: yanxiaobin >Priority: Major > > When using BucketingSink, it happens that one of the files is always in the > [.in-progress] state. And this state has never changed after that. The > underlying use of S3 as storage. > > {code:java} > // code placeholder > {code} > 2018-02-28 11:58:42 147341619 {color:#d04437}_part-28-0.in-progress{color} > 2018-02-28 12:06:27 147315059 part-0-0 > 2018-02-28 12:06:27 147462359 part-1-0 > 2018-02-28 12:06:27 147316006 part-10-0 > 2018-02-28 12:06:28 147349854 part-100-0 > 2018-02-28 12:06:27 147421625 part-101-0 > 2018-02-28 12:06:27 147443830 part-102-0 > 2018-02-28 12:06:27 147372801 part-103-0 > 2018-02-28 12:06:27 147343670 part-104-0 > .. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8794) When using BucketingSink, it happens that one of the files is always in the [.in-progress] state
[ https://issues.apache.org/jira/browse/FLINK-8794?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16384954#comment-16384954 ] yanxiaobin commented on FLINK-8794: --- yes. If we allow for different directory that should be already enough.So how do we solve this problem? Adn when using BucketingSink, it happens that one of the files is always in the [.in-progress] state. And this state has never changed after that. > When using BucketingSink, it happens that one of the files is always in the > [.in-progress] state > > > Key: FLINK-8794 > URL: https://issues.apache.org/jira/browse/FLINK-8794 > Project: Flink > Issue Type: Bug > Components: filesystem-connector >Affects Versions: 1.4.0, 1.4.1 >Reporter: yanxiaobin >Priority: Major > > When using BucketingSink, it happens that one of the files is always in the > [.in-progress] state. And this state has never changed after that. The > underlying use of S3 as storage. > > {code:java} > // code placeholder > {code} > 2018-02-28 11:58:42 147341619 {color:#d04437}_part-28-0.in-progress{color} > 2018-02-28 12:06:27 147315059 part-0-0 > 2018-02-28 12:06:27 147462359 part-1-0 > 2018-02-28 12:06:27 147316006 part-10-0 > 2018-02-28 12:06:28 147349854 part-100-0 > 2018-02-28 12:06:27 147421625 part-101-0 > 2018-02-28 12:06:27 147443830 part-102-0 > 2018-02-28 12:06:27 147372801 part-103-0 > 2018-02-28 12:06:27 147343670 part-104-0 > .. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-8794) When using BucketingSink, it happens that one of the files is always in the [.in-progress] state
[ https://issues.apache.org/jira/browse/FLINK-8794?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16384954#comment-16384954 ] yanxiaobin edited comment on FLINK-8794 at 3/4/18 4:13 AM: --- yes. If we allow for different directory that should be already enough.So how do we solve this problem? And when using BucketingSink, it happens that one of the files is always in the [.in-progress] state. And this state has never changed after that. was (Author: backlight): yes. If we allow for different directory that should be already enough.So how do we solve this problem? Adn when using BucketingSink, it happens that one of the files is always in the [.in-progress] state. And this state has never changed after that. > When using BucketingSink, it happens that one of the files is always in the > [.in-progress] state > > > Key: FLINK-8794 > URL: https://issues.apache.org/jira/browse/FLINK-8794 > Project: Flink > Issue Type: Bug > Components: filesystem-connector >Affects Versions: 1.4.0, 1.4.1 >Reporter: yanxiaobin >Priority: Major > > When using BucketingSink, it happens that one of the files is always in the > [.in-progress] state. And this state has never changed after that. The > underlying use of S3 as storage. > > {code:java} > // code placeholder > {code} > 2018-02-28 11:58:42 147341619 {color:#d04437}_part-28-0.in-progress{color} > 2018-02-28 12:06:27 147315059 part-0-0 > 2018-02-28 12:06:27 147462359 part-1-0 > 2018-02-28 12:06:27 147316006 part-10-0 > 2018-02-28 12:06:28 147349854 part-100-0 > 2018-02-28 12:06:27 147421625 part-101-0 > 2018-02-28 12:06:27 147443830 part-102-0 > 2018-02-28 12:06:27 147372801 part-103-0 > 2018-02-28 12:06:27 147343670 part-104-0 > .. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16390914#comment-16390914 ] yanxiaobin commented on FLINK-8500: --- Sorry to reply so late. I see. We really should be careful here. I want to find out how I should get Kafka timestamp in like flatmap and map methods of datastream. > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Major > Fix For: 1.6.0 > > Attachments: image-2018-01-30-14-58-58-167.png, > image-2018-01-31-10-48-59-633.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16390914#comment-16390914 ] yanxiaobin edited comment on FLINK-8500 at 3/8/18 8:47 AM: --- Sorry to reply so late. I see. We really should be careful here. I want to find out how I should get Kafka timestamp in like flatmap and map methods of datastream using scala programming language. was (Author: backlight): Sorry to reply so late. I see. We really should be careful here. I want to find out how I should get Kafka timestamp in like flatmap and map methods of datastream. > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Major > Fix For: 1.6.0 > > Attachments: image-2018-01-30-14-58-58-167.png, > image-2018-01-31-10-48-59-633.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8794) When using BucketingSink, it happens that one of the files is always in the [.in-progress] state
[ https://issues.apache.org/jira/browse/FLINK-8794?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] yanxiaobin updated FLINK-8794: -- Issue Type: Improvement (was: Bug) > When using BucketingSink, it happens that one of the files is always in the > [.in-progress] state > > > Key: FLINK-8794 > URL: https://issues.apache.org/jira/browse/FLINK-8794 > Project: Flink > Issue Type: Improvement > Components: filesystem-connector >Affects Versions: 1.4.0, 1.4.1 >Reporter: yanxiaobin >Priority: Major > > When using BucketingSink, it happens that one of the files is always in the > [.in-progress] state. And this state has never changed after that. The > underlying use of S3 as storage. > > {code:java} > // code placeholder > {code} > 2018-02-28 11:58:42 147341619 {color:#d04437}_part-28-0.in-progress{color} > 2018-02-28 12:06:27 147315059 part-0-0 > 2018-02-28 12:06:27 147462359 part-1-0 > 2018-02-28 12:06:27 147316006 part-10-0 > 2018-02-28 12:06:28 147349854 part-100-0 > 2018-02-28 12:06:27 147421625 part-101-0 > 2018-02-28 12:06:27 147443830 part-102-0 > 2018-02-28 12:06:27 147372801 part-103-0 > 2018-02-28 12:06:27 147343670 part-104-0 > .. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8794) When using BucketingSink, it happens that one of the files is always in the [.in-progress] state
[ https://issues.apache.org/jira/browse/FLINK-8794?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16393956#comment-16393956 ] yanxiaobin commented on FLINK-8794: --- About : 1.What I described above is that there will be such a situation when there is no failure in this job. I found through log that filesystem's rename method has been executed without any exception, but the filename hasn't changed, so I think it should be S3's problem. This should not be a problem with Flink. > When using BucketingSink, it happens that one of the files is always in the > [.in-progress] state > > > Key: FLINK-8794 > URL: https://issues.apache.org/jira/browse/FLINK-8794 > Project: Flink > Issue Type: Improvement > Components: filesystem-connector >Affects Versions: 1.4.0, 1.4.1 >Reporter: yanxiaobin >Priority: Major > > When using BucketingSink, it happens that one of the files is always in the > [.in-progress] state. And this state has never changed after that. The > underlying use of S3 as storage. > > {code:java} > // code placeholder > {code} > 2018-02-28 11:58:42 147341619 {color:#d04437}_part-28-0.in-progress{color} > 2018-02-28 12:06:27 147315059 part-0-0 > 2018-02-28 12:06:27 147462359 part-1-0 > 2018-02-28 12:06:27 147316006 part-10-0 > 2018-02-28 12:06:28 147349854 part-100-0 > 2018-02-28 12:06:27 147421625 part-101-0 > 2018-02-28 12:06:27 147443830 part-102-0 > 2018-02-28 12:06:27 147372801 part-103-0 > 2018-02-28 12:06:27 147343670 part-104-0 > .. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-8794) When using BucketingSink, it happens that one of the files is always in the [.in-progress] state
[ https://issues.apache.org/jira/browse/FLINK-8794?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16393956#comment-16393956 ] yanxiaobin edited comment on FLINK-8794 at 3/10/18 1:53 AM: About : 1.What I described above is that there will be such a situation when there is no failure in this job. I think I've found the problem. I found through log that filesystem's rename method has been executed without any exception, but the filename hasn't changed, so I think it should be S3's problem. This should not be a problem with Flink. was (Author: backlight): About : 1.What I described above is that there will be such a situation when there is no failure in this job. I found through log that filesystem's rename method has been executed without any exception, but the filename hasn't changed, so I think it should be S3's problem. This should not be a problem with Flink. > When using BucketingSink, it happens that one of the files is always in the > [.in-progress] state > > > Key: FLINK-8794 > URL: https://issues.apache.org/jira/browse/FLINK-8794 > Project: Flink > Issue Type: Improvement > Components: filesystem-connector >Affects Versions: 1.4.0, 1.4.1 >Reporter: yanxiaobin >Priority: Major > > When using BucketingSink, it happens that one of the files is always in the > [.in-progress] state. And this state has never changed after that. The > underlying use of S3 as storage. > > {code:java} > // code placeholder > {code} > 2018-02-28 11:58:42 147341619 {color:#d04437}_part-28-0.in-progress{color} > 2018-02-28 12:06:27 147315059 part-0-0 > 2018-02-28 12:06:27 147462359 part-1-0 > 2018-02-28 12:06:27 147316006 part-10-0 > 2018-02-28 12:06:28 147349854 part-100-0 > 2018-02-28 12:06:27 147421625 part-101-0 > 2018-02-28 12:06:27 147443830 part-102-0 > 2018-02-28 12:06:27 147372801 part-103-0 > 2018-02-28 12:06:27 147343670 part-104-0 > .. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8794) When using BucketingSink, it happens that one of the files is always in the [.in-progress] state
[ https://issues.apache.org/jira/browse/FLINK-8794?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16395286#comment-16395286 ] yanxiaobin commented on FLINK-8794: --- I will open a case with AWS for the root cause of the problem. > When using BucketingSink, it happens that one of the files is always in the > [.in-progress] state > > > Key: FLINK-8794 > URL: https://issues.apache.org/jira/browse/FLINK-8794 > Project: Flink > Issue Type: Improvement > Components: filesystem-connector >Affects Versions: 1.4.0, 1.4.1 >Reporter: yanxiaobin >Priority: Major > > When using BucketingSink, it happens that one of the files is always in the > [.in-progress] state. And this state has never changed after that. The > underlying use of S3 as storage. > > {code:java} > // code placeholder > {code} > 2018-02-28 11:58:42 147341619 {color:#d04437}_part-28-0.in-progress{color} > 2018-02-28 12:06:27 147315059 part-0-0 > 2018-02-28 12:06:27 147462359 part-1-0 > 2018-02-28 12:06:27 147316006 part-10-0 > 2018-02-28 12:06:28 147349854 part-100-0 > 2018-02-28 12:06:27 147421625 part-101-0 > 2018-02-28 12:06:27 147443830 part-102-0 > 2018-02-28 12:06:27 147372801 part-103-0 > 2018-02-28 12:06:27 147343670 part-104-0 > .. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8939) Provide better support for saving streaming data to s3
[ https://issues.apache.org/jira/browse/FLINK-8939?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16398211#comment-16398211 ] yanxiaobin commented on FLINK-8939: --- I am also using BucketingSink API for saving streaming data to s3. Have you ever met this problem?see : https://issues.apache.org/jira/browse/FLINK-8794 > Provide better support for saving streaming data to s3 > -- > > Key: FLINK-8939 > URL: https://issues.apache.org/jira/browse/FLINK-8939 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: chris snow >Priority: Major > Attachments: 18652BC0-DD67-42C3-9A33-12F7BC10F9F3.png > > > Flink seems to struggle saving data to s3 due to the lack of a truncate > method, and in my test this resulted in lots of files with a .valid-length > suffix > I’m using a bucketing sink: > {code:java} > return new BucketingSink>(path) > .setWriter(writer) > .setBucketer(new DateTimeBucketer Object>>(formatString));{code} > > !18652BC0-DD67-42C3-9A33-12F7BC10F9F3.png! > See also, the discussion in the comments here: > https://issues.apache.org/jira/browse/FLINK-8543?filter=-2 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-8939) Provide better support for saving streaming data to s3
[ https://issues.apache.org/jira/browse/FLINK-8939?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16398211#comment-16398211 ] yanxiaobin edited comment on FLINK-8939 at 3/14/18 8:07 AM: I am also using BucketingSink API for saving streaming data to s3. Have you ever met this problem?see : [FLINK-8794|https://issues.apache.org/jira/browse/FLINK-8794] was (Author: backlight): I am also using BucketingSink API for saving streaming data to s3. Have you ever met this problem?see : https://issues.apache.org/jira/browse/FLINK-8794 > Provide better support for saving streaming data to s3 > -- > > Key: FLINK-8939 > URL: https://issues.apache.org/jira/browse/FLINK-8939 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: chris snow >Priority: Major > Attachments: 18652BC0-DD67-42C3-9A33-12F7BC10F9F3.png > > > Flink seems to struggle saving data to s3 due to the lack of a truncate > method, and in my test this resulted in lots of files with a .valid-length > suffix > I’m using a bucketing sink: > {code:java} > return new BucketingSink>(path) > .setWriter(writer) > .setBucketer(new DateTimeBucketer Object>>(formatString));{code} > > !18652BC0-DD67-42C3-9A33-12F7BC10F9F3.png! > See also, the discussion in the comments here: > https://issues.apache.org/jira/browse/FLINK-8543?filter=-2 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8794) When using BucketingSink, it happens that one of the files is always in the [.in-progress] state
[ https://issues.apache.org/jira/browse/FLINK-8794?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16404584#comment-16404584 ] yanxiaobin commented on FLINK-8794: --- S3 follows the *eventually consistent* principle, so this problem S3 has no good solution for the time being. > When using BucketingSink, it happens that one of the files is always in the > [.in-progress] state > > > Key: FLINK-8794 > URL: https://issues.apache.org/jira/browse/FLINK-8794 > Project: Flink > Issue Type: Improvement > Components: filesystem-connector >Affects Versions: 1.4.0, 1.4.1 >Reporter: yanxiaobin >Priority: Major > > When using BucketingSink, it happens that one of the files is always in the > [.in-progress] state. And this state has never changed after that. The > underlying use of S3 as storage. > > {code:java} > // code placeholder > {code} > 2018-02-28 11:58:42 147341619 {color:#d04437}_part-28-0.in-progress{color} > 2018-02-28 12:06:27 147315059 part-0-0 > 2018-02-28 12:06:27 147462359 part-1-0 > 2018-02-28 12:06:27 147316006 part-10-0 > 2018-02-28 12:06:28 147349854 part-100-0 > 2018-02-28 12:06:27 147421625 part-101-0 > 2018-02-28 12:06:27 147443830 part-102-0 > 2018-02-28 12:06:27 147372801 part-103-0 > 2018-02-28 12:06:27 147343670 part-104-0 > .. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8939) Provide better support for saving streaming data to s3
[ https://issues.apache.org/jira/browse/FLINK-8939?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16413319#comment-16413319 ] yanxiaobin commented on FLINK-8939: --- thanks! Because S3 follows the *eventually consistent* principle. When using S3(not enable consistent view), sometimes the file is not renamed from the.inprogress suffix to the.pending suffix. > Provide better support for saving streaming data to s3 > -- > > Key: FLINK-8939 > URL: https://issues.apache.org/jira/browse/FLINK-8939 > Project: Flink > Issue Type: Improvement > Components: DataStream API, Streaming Connectors >Reporter: chris snow >Priority: Major > Attachments: 18652BC0-DD67-42C3-9A33-12F7BC10F9F3.png > > > Flink seems to struggle saving data to s3 due to the lack of a truncate > method, and in my test this resulted in lots of files with a .valid-length > suffix > I’m using a bucketing sink: > {code:java} > return new BucketingSink>(path) > .setWriter(writer) > .setBucketer(new DateTimeBucketer Object>>(formatString));{code} > > !18652BC0-DD67-42C3-9A33-12F7BC10F9F3.png! > See also, the discussion in the comments here: > https://issues.apache.org/jira/browse/FLINK-8543?filter=-2 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8939) Provide better support for saving streaming data to s3
[ https://issues.apache.org/jira/browse/FLINK-8939?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16415112#comment-16415112 ] yanxiaobin commented on FLINK-8939: --- I'm using AWS S3. This problem has been plaguing me all the time. > Provide better support for saving streaming data to s3 > -- > > Key: FLINK-8939 > URL: https://issues.apache.org/jira/browse/FLINK-8939 > Project: Flink > Issue Type: Improvement > Components: DataStream API, Streaming Connectors >Reporter: chris snow >Priority: Major > Attachments: 18652BC0-DD67-42C3-9A33-12F7BC10F9F3.png > > > Flink seems to struggle saving data to s3 due to the lack of a truncate > method, and in my test this resulted in lots of files with a .valid-length > suffix > I’m using a bucketing sink: > {code:java} > return new BucketingSink>(path) > .setWriter(writer) > .setBucketer(new DateTimeBucketer Object>>(formatString));{code} > > !18652BC0-DD67-42C3-9A33-12F7BC10F9F3.png! > See also, the discussion in the comments here: > https://issues.apache.org/jira/browse/FLINK-8543?filter=-2 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8794) When using BucketingSink, it happens that one of the files is always in the [.in-progress] state
[ https://issues.apache.org/jira/browse/FLINK-8794?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16415131#comment-16415131 ] yanxiaobin commented on FLINK-8794: --- The underlying implementation is com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem. I am using hadoop 2.7.3.I haven't thought of a good solution for the time being. > When using BucketingSink, it happens that one of the files is always in the > [.in-progress] state > > > Key: FLINK-8794 > URL: https://issues.apache.org/jira/browse/FLINK-8794 > Project: Flink > Issue Type: Improvement > Components: filesystem-connector >Affects Versions: 1.4.0, 1.4.1 >Reporter: yanxiaobin >Priority: Major > > When using BucketingSink, it happens that one of the files is always in the > [.in-progress] state. And this state has never changed after that. The > underlying use of S3 as storage. > > {code:java} > // code placeholder > {code} > 2018-02-28 11:58:42 147341619 {color:#d04437}_part-28-0.in-progress{color} > 2018-02-28 12:06:27 147315059 part-0-0 > 2018-02-28 12:06:27 147462359 part-1-0 > 2018-02-28 12:06:27 147316006 part-10-0 > 2018-02-28 12:06:28 147349854 part-100-0 > 2018-02-28 12:06:27 147421625 part-101-0 > 2018-02-28 12:06:27 147443830 part-102-0 > 2018-02-28 12:06:27 147372801 part-103-0 > 2018-02-28 12:06:27 147343670 part-104-0 > .. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8794) When using BucketingSink, it happens that one of the files is always in the [.in-progress] state
[ https://issues.apache.org/jira/browse/FLINK-8794?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16416734#comment-16416734 ] yanxiaobin commented on FLINK-8794: --- thanks! indeed so. Enable consistent-view can cause other problems. > When using BucketingSink, it happens that one of the files is always in the > [.in-progress] state > > > Key: FLINK-8794 > URL: https://issues.apache.org/jira/browse/FLINK-8794 > Project: Flink > Issue Type: Improvement > Components: filesystem-connector >Affects Versions: 1.4.0, 1.4.1 >Reporter: yanxiaobin >Priority: Major > > When using BucketingSink, it happens that one of the files is always in the > [.in-progress] state. And this state has never changed after that. The > underlying use of S3 as storage. > > {code:java} > // code placeholder > {code} > 2018-02-28 11:58:42 147341619 {color:#d04437}_part-28-0.in-progress{color} > 2018-02-28 12:06:27 147315059 part-0-0 > 2018-02-28 12:06:27 147462359 part-1-0 > 2018-02-28 12:06:27 147316006 part-10-0 > 2018-02-28 12:06:28 147349854 part-100-0 > 2018-02-28 12:06:27 147421625 part-101-0 > 2018-02-28 12:06:27 147443830 part-102-0 > 2018-02-28 12:06:27 147372801 part-103-0 > 2018-02-28 12:06:27 147343670 part-104-0 > .. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8794) When using BucketingSink, it happens that one of the files is always in the [.in-progress] state
[ https://issues.apache.org/jira/browse/FLINK-8794?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16417174#comment-16417174 ] yanxiaobin commented on FLINK-8794: --- Other problems have nothing to do with Flink. > When using BucketingSink, it happens that one of the files is always in the > [.in-progress] state > > > Key: FLINK-8794 > URL: https://issues.apache.org/jira/browse/FLINK-8794 > Project: Flink > Issue Type: Improvement > Components: filesystem-connector >Affects Versions: 1.4.0, 1.4.1 >Reporter: yanxiaobin >Priority: Major > > When using BucketingSink, it happens that one of the files is always in the > [.in-progress] state. And this state has never changed after that. The > underlying use of S3 as storage. > > {code:java} > // code placeholder > {code} > 2018-02-28 11:58:42 147341619 {color:#d04437}_part-28-0.in-progress{color} > 2018-02-28 12:06:27 147315059 part-0-0 > 2018-02-28 12:06:27 147462359 part-1-0 > 2018-02-28 12:06:27 147316006 part-10-0 > 2018-02-28 12:06:28 147349854 part-100-0 > 2018-02-28 12:06:27 147421625 part-101-0 > 2018-02-28 12:06:27 147443830 part-102-0 > 2018-02-28 12:06:27 147372801 part-103-0 > 2018-02-28 12:06:27 147343670 part-104-0 > .. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-8321) Generate _SUCCESS (map-reduce style) when folder has been written!
yanxiaobin created FLINK-8321: - Summary: Generate _SUCCESS (map-reduce style) when folder has been written! Key: FLINK-8321 URL: https://issues.apache.org/jira/browse/FLINK-8321 Project: Flink Issue Type: Bug Components: filesystem-connector Affects Versions: 1.4.0 Environment: How to write the data that is processed in each time window to each single HDFS directory correspondingly when using DataStream API in streaming processing application, and generate _SUCCESS (map-reduce style) when folder has been written! Reporter: yanxiaobin -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-8321) Generate _SUCCESS (map-reduce style) when folder has been written!
[ https://issues.apache.org/jira/browse/FLINK-8321?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] yanxiaobin updated FLINK-8321: -- Description: There is no success file generation, and the downstream processing can't determine when the directory is finished > Generate _SUCCESS (map-reduce style) when folder has been written! > -- > > Key: FLINK-8321 > URL: https://issues.apache.org/jira/browse/FLINK-8321 > Project: Flink > Issue Type: Bug > Components: filesystem-connector >Affects Versions: 1.4.0 > Environment: How to write the data that is processed in > each time window to each single HDFS directory correspondingly when using > DataStream API in streaming processing application, and generate _SUCCESS > (map-reduce style) when folder has been written! >Reporter: yanxiaobin > > There is no success file generation, and the downstream processing can't > determine when the directory is finished -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
yanxiaobin created FLINK-8500: - Summary: Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) Key: FLINK-8500 URL: https://issues.apache.org/jira/browse/FLINK-8500 Project: Flink Issue Type: Improvement Components: Kafka Connector Affects Versions: 1.4.0 Reporter: yanxiaobin The method deserialize of KeyedDeserializationSchema needs a parameter 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, this is useful! -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16338958#comment-16338958 ] yanxiaobin commented on FLINK-8500: --- I mean that I want to use Kafka message timestamp as event time! Thanks! > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Major > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-5479) Per-partition watermarks in FlinkKafkaConsumer should consider idle partitions
[ https://issues.apache.org/jira/browse/FLINK-5479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16343115#comment-16343115 ] yanxiaobin commented on FLINK-5479: --- I've also met this problem at the moment. > Per-partition watermarks in FlinkKafkaConsumer should consider idle partitions > -- > > Key: FLINK-5479 > URL: https://issues.apache.org/jira/browse/FLINK-5479 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Tzu-Li (Gordon) Tai >Priority: Blocker > Fix For: 1.5.0 > > > Reported in ML: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-topic-partition-skewness-causes-watermark-not-being-emitted-td11008.html > Similar to what's happening to idle sources blocking watermark progression in > downstream operators (see FLINK-5017), the per-partition watermark mechanism > in {{FlinkKafkaConsumer}} is also being blocked of progressing watermarks > when a partition is idle. The watermark of idle partitions is always > {{Long.MIN_VALUE}}, therefore the overall min watermark across all partitions > of a consumer subtask will never proceed. > It's normally not a common case to have Kafka partitions not producing any > data, but it'll probably be good to handle this as well. I think we should > have a localized solution similar to FLINK-5017 for the per-partition > watermarks in {{AbstractFetcher}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] yanxiaobin updated FLINK-8500: -- Attachment: image-2018-01-30-14-58-58-167.png > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Major > Attachments: image-2018-01-30-14-58-58-167.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16344591#comment-16344591 ] yanxiaobin commented on FLINK-8500: --- As shown below(Kafka010Fetcher): !image-2018-01-30-14-58-58-167.png! > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Major > Attachments: image-2018-01-30-14-58-58-167.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] yanxiaobin updated FLINK-8500: -- Attachment: image-2018-01-31-10-48-59-633.png > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Major > Attachments: image-2018-01-30-14-58-58-167.png, > image-2018-01-31-10-48-59-633.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16346180#comment-16346180 ] yanxiaobin commented on FLINK-8500: --- hi,[~aljoscha] , thank you for your reply!Please look at the next picture! !image-2018-01-31-10-48-59-633.png! The final eventtime is obtained from “{color:#80}final long {color}newTimestamp = extractAscendingTimestamp(element);“ , and the element was deserialized from "KeyedDeserializationSchema" . Also the parameter "elementPrevTimestamp" is not used! Thanks! > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Major > Attachments: image-2018-01-30-14-58-58-167.png, > image-2018-01-31-10-48-59-633.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16346180#comment-16346180 ] yanxiaobin edited comment on FLINK-8500 at 1/31/18 3:01 AM: hi,[~aljoscha] , thank you for your reply!Please look at the next picture! !image-2018-01-31-10-48-59-633.png! The final eventtime is obtained from “{color:#80}final long {color}newTimestamp = extractAscendingTimestamp(element);“ , and the element was deserialized from "KeyedDeserializationSchema" . Also the parameter "elementPrevTimestamp" that is Kafka timestamp is not used! Thanks! was (Author: backlight): hi,[~aljoscha] , thank you for your reply!Please look at the next picture! !image-2018-01-31-10-48-59-633.png! The final eventtime is obtained from “{color:#80}final long {color}newTimestamp = extractAscendingTimestamp(element);“ , and the element was deserialized from "KeyedDeserializationSchema" . Also the parameter "elementPrevTimestamp" is not used! Thanks! > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Major > Attachments: image-2018-01-30-14-58-58-167.png, > image-2018-01-31-10-48-59-633.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16346180#comment-16346180 ] yanxiaobin edited comment on FLINK-8500 at 1/31/18 3:09 AM: hi,[~aljoscha] , thank you for your reply!Please look at the next picture! !image-2018-01-31-10-48-59-633.png! The final eventtime is obtained from “{color:#80}final long {color}newTimestamp = extractAscendingTimestamp(element);“ , and the element was deserialized from "KeyedDeserializationSchema" . Also the parameter "elementPrevTimestamp" that is Kafka timestamp is not used! So I think that the method deserialize of KeyedDeserializationSchema should add a parameter 'kafka message timestamp' (from ConsumerRecord) .And in some business scenarios, this is useful! Thanks! was (Author: backlight): hi,[~aljoscha] , thank you for your reply!Please look at the next picture! !image-2018-01-31-10-48-59-633.png! The final eventtime is obtained from “{color:#80}final long {color}newTimestamp = extractAscendingTimestamp(element);“ , and the element was deserialized from "KeyedDeserializationSchema" . Also the parameter "elementPrevTimestamp" that is Kafka timestamp is not used! Thanks! > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Major > Attachments: image-2018-01-30-14-58-58-167.png, > image-2018-01-31-10-48-59-633.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16346550#comment-16346550 ] yanxiaobin commented on FLINK-8500: --- hi, [~aljoscha] . What do you think? > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Major > Attachments: image-2018-01-30-14-58-58-167.png, > image-2018-01-31-10-48-59-633.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16347951#comment-16347951 ] yanxiaobin commented on FLINK-8500: --- Greate!I think it's reasonable.But I think it's best to optimize it. The user interface is as unified as possible! thanks , [~aljoscha] . > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Blocker > Fix For: 1.5.0, 1.4.1 > > Attachments: image-2018-01-30-14-58-58-167.png, > image-2018-01-31-10-48-59-633.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16347951#comment-16347951 ] yanxiaobin edited comment on FLINK-8500 at 2/1/18 3:28 AM: --- Greate!I think it's reasonable that the method deserialize of KeyedDeserializationSchema needs a parameter 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, this is useful . And I think it's best to optimize it. The user interface is as unified as possible! thanks , [~aljoscha] . was (Author: backlight): Greate!I think it's reasonable.But I think it's best to optimize it. The user interface is as unified as possible! thanks , [~aljoscha] . > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Blocker > Fix For: 1.5.0, 1.4.1 > > Attachments: image-2018-01-30-14-58-58-167.png, > image-2018-01-31-10-48-59-633.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-5479) Per-partition watermarks in FlinkKafkaConsumer should consider idle partitions
[ https://issues.apache.org/jira/browse/FLINK-5479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16343115#comment-16343115 ] yanxiaobin edited comment on FLINK-5479 at 2/1/18 10:30 AM: I've also met this problem at the moment. This can cause serious delays!{color:#00}Is there a better solution to the problem?{color} was (Author: backlight): I've also met this problem at the moment. > Per-partition watermarks in FlinkKafkaConsumer should consider idle partitions > -- > > Key: FLINK-5479 > URL: https://issues.apache.org/jira/browse/FLINK-5479 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Tzu-Li (Gordon) Tai >Priority: Blocker > Fix For: 1.5.0 > > > Reported in ML: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-topic-partition-skewness-causes-watermark-not-being-emitted-td11008.html > Similar to what's happening to idle sources blocking watermark progression in > downstream operators (see FLINK-5017), the per-partition watermark mechanism > in {{FlinkKafkaConsumer}} is also being blocked of progressing watermarks > when a partition is idle. The watermark of idle partitions is always > {{Long.MIN_VALUE}}, therefore the overall min watermark across all partitions > of a consumer subtask will never proceed. > It's normally not a common case to have Kafka partitions not producing any > data, but it'll probably be good to handle this as well. I think we should > have a localized solution similar to FLINK-5017 for the per-partition > watermarks in {{AbstractFetcher}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16376428#comment-16376428 ] yanxiaobin commented on FLINK-8500: --- hi, [~aljoscha] ! I can solve that problem like that,but I still think the method deserialize of KeyedDeserializationSchema needs a parameter 'kafka message timestamp' (from ConsumerRecord), It is possible that this timestamp will also be used in some business logic processing!What do you think? > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Blocker > Fix For: 1.6.0 > > Attachments: image-2018-01-30-14-58-58-167.png, > image-2018-01-31-10-48-59-633.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-8794) When using BucketingSink, it happens that one of the files is always in the [.in-progress] state
yanxiaobin created FLINK-8794: - Summary: When using BucketingSink, it happens that one of the files is always in the [.in-progress] state Key: FLINK-8794 URL: https://issues.apache.org/jira/browse/FLINK-8794 Project: Flink Issue Type: Bug Components: filesystem-connector Affects Versions: 1.4.0, 1.4.1 Reporter: yanxiaobin When using BucketingSink, it happens that one of the files is always in the [.in-progress] state. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8794) When using BucketingSink, it happens that one of the files is always in the [.in-progress] state
[ https://issues.apache.org/jira/browse/FLINK-8794?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] yanxiaobin updated FLINK-8794: -- Description: When using BucketingSink, it happens that one of the files is always in the [.in-progress] state. And this state has never changed after that. The underlying use of S3 as storage. (was: When using BucketingSink, it happens that one of the files is always in the [.in-progress] state.) > When using BucketingSink, it happens that one of the files is always in the > [.in-progress] state > > > Key: FLINK-8794 > URL: https://issues.apache.org/jira/browse/FLINK-8794 > Project: Flink > Issue Type: Bug > Components: filesystem-connector >Affects Versions: 1.4.0, 1.4.1 >Reporter: yanxiaobin >Priority: Major > > When using BucketingSink, it happens that one of the files is always in the > [.in-progress] state. And this state has never changed after that. The > underlying use of S3 as storage. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12992) All host(s) tried for query failed (tried com.datastax.driver.core.exceptions.TransportException: Error writing...)
[ https://issues.apache.org/jira/browse/FLINK-12992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16876018#comment-16876018 ] yanxiaobin commented on FLINK-12992: At present, I have upgraded the netty version of the datastax Java driver to the netty version that flink depends on, and the problem is solved. > All host(s) tried for query failed (tried > com.datastax.driver.core.exceptions.TransportException: Error writing...) > --- > > Key: FLINK-12992 > URL: https://issues.apache.org/jira/browse/FLINK-12992 > Project: Flink > Issue Type: Bug > Components: Connectors / Cassandra >Affects Versions: 1.7.2, 1.8.0 > Environment: > org.apache.flink > flink-connector-cassandra_2.11 > 1.8.0 > >Reporter: yanxiaobin >Priority: Critical > > We are using flink streming application with cassandra connector providing > sinks that writes data into a [Apache > Cassandra|https://cassandra.apache.org/] database. > At first we found the following exceptions:All host(s) tried for query failed > (tried com.datastax.driver.core.exceptions.TransportException: Error > writing...). This exception will cause the streaming job to fail > > And we have carefully checked that Cassandra service and network are all > normal. Finally, we refer to the source code of DataStax Java Driver that the > connector depends on. We found that the real exception caused the problem is > as follows: > com.datastax.shaded.netty.handler.codec.EncoderException: > java.lang.IllegalAccessError: com/datastax/driver/core/Frame at > com.datastax.shaded.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:107) > at > com.datastax.shaded.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:643) > at > com.datastax.shaded.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:700) > at > com.datastax.shaded.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:636) > at > com.datastax.shaded.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:112) > at > com.datastax.shaded.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:643) > at > com.datastax.shaded.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:700) > at > com.datastax.shaded.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:636) > at > com.datastax.shaded.netty.handler.timeout.IdleStateHandler.write(IdleStateHandler.java:284) > at > com.datastax.shaded.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:643) > at > com.datastax.shaded.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:700) > at > com.datastax.shaded.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:636) > at > com.datastax.shaded.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:622) > at > com.datastax.shaded.netty.channel.DefaultChannelPipeline.write(DefaultChannelPipeline.java:939) > at > com.datastax.shaded.netty.channel.AbstractChannel.write(AbstractChannel.java:234) > at com.datastax.driver.core.Connection$Flusher.run(Connection.java:872) at > com.datastax.shaded.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:358) > at > com.datastax.shaded.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357) > at > com.datastax.shaded.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:112) > at java.lang.Thread.run(Thread.java:748) Caused by: > java.lang.IllegalAccessError: com/datastax/driver/core/Frame at > com.datastax.shaded.netty.util.internal.__matchers__.com.datastax.driver.core.FrameMatcher.match(NoOpTypeParameterMatcher.java) > at > com.datastax.shaded.netty.handler.codec.MessageToMessageEncoder.acceptOutboundMessage(MessageToMessageEncoder.java:77) > at > com.datastax.shaded.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:84) > > Based on this exception, we found relevant information > [https://datastax-oss.atlassian.net/browse/JAVA-1337?page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel] > > > Because I found that the latest version of flink-cassandra-connector uses the > datastax Java driver old version 3.0.0.Perhaps we should upgrade the version > on which the connector depends to Java driver 3.3.0+ to avoid this problem. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-16791) Could not deploy Yarn job cluster when using flink-s3-fs-hadoop-1.10.0.jar
yanxiaobin created FLINK-16791: -- Summary: Could not deploy Yarn job cluster when using flink-s3-fs-hadoop-1.10.0.jar Key: FLINK-16791 URL: https://issues.apache.org/jira/browse/FLINK-16791 Project: Flink Issue Type: Bug Components: Client / Job Submission Affects Versions: 1.10.0 Reporter: yanxiaobin Since aws s3 is needed, so copy the flink-s3-fs-hadoop-1.10.0.jar JAR file to the {{lib}} directory of Flink distribution. But when I submit a single flink job on yarn mode, I found the following problems: Caused by: java.lang.ClassCastException: org.apache.hadoop.yarn.proto.YarnServiceProtos$GetClusterNodesRequestProto cannot be cast to com.google.protobuf.Message at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:225) at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:116) at com.sun.proxy.$Proxy11.getClusterNodes(Unknown Source) at org.apache.hadoop.yarn.api.impl.pb.client.ApplicationClientProtocolPBClientImpl.getClusterNodes(ApplicationClientProtocolPBClientImpl.java:303) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:422) at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:165) at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:157) at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95) at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:359) at com.sun.proxy.$Proxy12.getClusterNodes(Unknown Source) at org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.getNodeReports(YarnClientImpl.java:564) at org.apache.flink.yarn.YarnClientYarnClusterInformationRetriever.getMaxVcores(YarnClientYarnClusterInformationRetriever.java:43) at org.apache.flink.yarn.YarnClusterDescriptor.isReadyForDeployment(YarnClusterDescriptor.java:278) at org.apache.flink.yarn.YarnClusterDescriptor.deployInternal(YarnClusterDescriptor.java:444) at org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:390) ... 24 more -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-16791) Could not deploy Yarn job cluster when using flink-s3-fs-hadoop-1.10.0.jar
[ https://issues.apache.org/jira/browse/FLINK-16791?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] yanxiaobin updated FLINK-16791: --- Description: Since aws s3 is needed, so copy the flink-s3-fs-hadoop-1.10.0.jar JAR file to the {{lib}} directory of Flink distribution. But when I submit a single flink job on yarn mode, I found the following problems: Caused by: java.lang.ClassCastException: org.apache.hadoop.yarn.proto.YarnServiceProtos$GetClusterNodesRequestProto cannot be cast to com.google.protobuf.Message at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:225) at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:116) at com.sun.proxy.$Proxy11.getClusterNodes(Unknown Source) at org.apache.hadoop.yarn.api.impl.pb.client.ApplicationClientProtocolPBClientImpl.getClusterNodes(ApplicationClientProtocolPBClientImpl.java:303) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:422) at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:165) at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:157) at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95) at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:359) at com.sun.proxy.$Proxy12.getClusterNodes(Unknown Source) at org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.getNodeReports(YarnClientImpl.java:564) at org.apache.flink.yarn.YarnClientYarnClusterInformationRetriever.getMaxVcores(YarnClientYarnClusterInformationRetriever.java:43) at org.apache.flink.yarn.YarnClusterDescriptor.isReadyForDeployment(YarnClusterDescriptor.java:278) at org.apache.flink.yarn.YarnClusterDescriptor.deployInternal(YarnClusterDescriptor.java:444) at org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:390) ... 24 more was: Since aws s3 is needed, so copy the flink-s3-fs-hadoop-1.10.0.jar JAR file to the {{lib}} directory of Flink distribution. But when I submit a single flink job on yarn mode, I found the following problems: Caused by: java.lang.ClassCastException: org.apache.hadoop.yarn.proto.YarnServiceProtos$GetClusterNodesRequestProto cannot be cast to com.google.protobuf.Message at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:225) at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:116) at com.sun.proxy.$Proxy11.getClusterNodes(Unknown Source) at org.apache.hadoop.yarn.api.impl.pb.client.ApplicationClientProtocolPBClientImpl.getClusterNodes(ApplicationClientProtocolPBClientImpl.java:303) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:422) at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:165) at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:157) at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95) at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:359) at com.sun.proxy.$Proxy12.getClusterNodes(Unknown Source) at org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.getNodeReports(YarnClientImpl.java:564) at org.apache.flink.yarn.YarnClientYarnClusterInformationRetriever.getMaxVcores(YarnClientYarnClusterInformationRetriever.java:43) at org.apache.flink.yarn.YarnClusterDescriptor.isReadyForDeployment(YarnClusterDescriptor.java:278) at org.apache.flink.yarn.YarnClusterDescriptor.deployInternal(YarnClusterDescriptor.java:444) at org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:390) ... 24 more > Could not deploy Yarn job cluster when using flink-s3-fs-hadoop-1.10.0.jar > --- > > Key: FLINK-16791 > URL: https://issues.apache.org/jira/browse/FLINK-16791 > Project: Flink > Issue Type: Bug > Components: Client / Job Submission >Affects Versions: 1.10.0 >Reporter: yanxiaobin >Pri
[jira] [Commented] (FLINK-16791) Could not deploy Yarn job cluster when using flink-s3-fs-hadoop-1.10.0.jar
[ https://issues.apache.org/jira/browse/FLINK-16791?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17067380#comment-17067380 ] yanxiaobin commented on FLINK-16791: I checked that the application does not contain any hadoop dependencies and there are no related hadoop dependency version conflicts. Because flink-s3-fs-hadoop-1.10.0.jar depends on the hadoop-commons related dependencies, the unshaded com.google.protobuf.Message is loaded. When I removed flink-s3-fs-hadoop-1.10.0.jar from the lib directory, the problem disappeared > Could not deploy Yarn job cluster when using flink-s3-fs-hadoop-1.10.0.jar > --- > > Key: FLINK-16791 > URL: https://issues.apache.org/jira/browse/FLINK-16791 > Project: Flink > Issue Type: Bug > Components: Client / Job Submission >Affects Versions: 1.10.0 >Reporter: yanxiaobin >Priority: Major > > Since aws s3 is needed, so copy the flink-s3-fs-hadoop-1.10.0.jar JAR file to > the {{lib}} directory of Flink distribution. But when I submit a single flink > job on yarn mode, I found the following problems: > > Caused by: java.lang.ClassCastException: > org.apache.hadoop.yarn.proto.YarnServiceProtos$GetClusterNodesRequestProto > cannot be cast to com.google.protobuf.Message > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:225) > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:116) > at com.sun.proxy.$Proxy11.getClusterNodes(Unknown Source) > at > org.apache.hadoop.yarn.api.impl.pb.client.ApplicationClientProtocolPBClientImpl.getClusterNodes(ApplicationClientProtocolPBClientImpl.java:303) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:422) > at > org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:165) > at > org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:157) > at > org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:359) > at com.sun.proxy.$Proxy12.getClusterNodes(Unknown Source) > at > org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.getNodeReports(YarnClientImpl.java:564) > at > org.apache.flink.yarn.YarnClientYarnClusterInformationRetriever.getMaxVcores(YarnClientYarnClusterInformationRetriever.java:43) > at > org.apache.flink.yarn.YarnClusterDescriptor.isReadyForDeployment(YarnClusterDescriptor.java:278) > at > org.apache.flink.yarn.YarnClusterDescriptor.deployInternal(YarnClusterDescriptor.java:444) > at > org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:390) > ... 24 more -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16792) flink-s3-fs-hadoop cannot access s3
yanxiaobin created FLINK-16792: -- Summary: flink-s3-fs-hadoop cannot access s3 Key: FLINK-16792 URL: https://issues.apache.org/jira/browse/FLINK-16792 Project: Flink Issue Type: Bug Components: FileSystems Affects Versions: 1.10.0 Reporter: yanxiaobin When flink-s3-fs-hadoop-1.10.0.jar is placed in the lib directory and flink accesses aws s3, the following exception occurs: Caused by: java.util.concurrent.ExecutionException: java.lang.NoSuchMethodError: org.apache.hadoop.util.SemaphoredDelegatingExecutor.(Lcom/google/common/util/concurrent/ListeningExecutorService;IZ)V at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:461) at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:53) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1143) ... 3 more Caused by: java.lang.NoSuchMethodError: org.apache.hadoop.util.SemaphoredDelegatingExecutor.(Lcom/google/common/util/concurrent/ListeningExecutorService;IZ)V at org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:769) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1169) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1149) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1038) at org.apache.flink.fs.s3hadoop.common.HadoopFileSystem.create(HadoopFileSystem.java:141) at org.apache.flink.fs.s3hadoop.common.HadoopFileSystem.create(HadoopFileSystem.java:37) at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.create(SafetyNetWrapperFileSystem.java:126) at org.apache.flink.core.fs.EntropyInjector.createEntropyAware(EntropyInjector.java:61) at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:356) at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.flush(FsCheckpointStreamFactory.java:234) at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:309) at org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:179) at org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:108) at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:458) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-16792) flink-s3-fs-hadoop cannot access s3
[ https://issues.apache.org/jira/browse/FLINK-16792?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17067389#comment-17067389 ] yanxiaobin commented on FLINK-16792: The issue is related to https://issues.apache.org/jira/browse/HADOOP-16080 > flink-s3-fs-hadoop cannot access s3 > --- > > Key: FLINK-16792 > URL: https://issues.apache.org/jira/browse/FLINK-16792 > Project: Flink > Issue Type: Bug > Components: FileSystems >Affects Versions: 1.10.0 >Reporter: yanxiaobin >Priority: Major > > When flink-s3-fs-hadoop-1.10.0.jar is placed in the lib directory and flink > accesses aws s3, the following exception occurs: > > Caused by: java.util.concurrent.ExecutionException: > java.lang.NoSuchMethodError: > org.apache.hadoop.util.SemaphoredDelegatingExecutor.(Lcom/google/common/util/concurrent/ListeningExecutorService;IZ)V > at java.util.concurrent.FutureTask.report(FutureTask.java:122) at > java.util.concurrent.FutureTask.get(FutureTask.java:192) at > org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:461) > at > org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:53) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1143) > ... 3 more Caused by: java.lang.NoSuchMethodError: > org.apache.hadoop.util.SemaphoredDelegatingExecutor.(Lcom/google/common/util/concurrent/ListeningExecutorService;IZ)V > at org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:769) at > org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1169) at > org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1149) at > org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1038) at > org.apache.flink.fs.s3hadoop.common.HadoopFileSystem.create(HadoopFileSystem.java:141) > at > org.apache.flink.fs.s3hadoop.common.HadoopFileSystem.create(HadoopFileSystem.java:37) > at > org.apache.flink.core.fs.SafetyNetWrapperFileSystem.create(SafetyNetWrapperFileSystem.java:126) > at > org.apache.flink.core.fs.EntropyInjector.createEntropyAware(EntropyInjector.java:61) > at > org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:356) > at > org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.flush(FsCheckpointStreamFactory.java:234) > at > org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:309) > at > org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:179) > at > org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:108) > at > org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) at > org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:458) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-16792) flink-s3-fs-hadoop cannot access s3
[ https://issues.apache.org/jira/browse/FLINK-16792?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] yanxiaobin updated FLINK-16792: --- Description: When flink-s3-fs-hadoop-1.10.0.jar is placed in the lib directory of Flink distribution and flink accesses aws s3, the following exception occurs: Caused by: java.util.concurrent.ExecutionException: java.lang.NoSuchMethodError: org.apache.hadoop.util.SemaphoredDelegatingExecutor.(Lcom/google/common/util/concurrent/ListeningExecutorService;IZ)V at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:461) at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:53) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1143) ... 3 more Caused by: java.lang.NoSuchMethodError: org.apache.hadoop.util.SemaphoredDelegatingExecutor.(Lcom/google/common/util/concurrent/ListeningExecutorService;IZ)V at org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:769) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1169) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1149) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1038) at org.apache.flink.fs.s3hadoop.common.HadoopFileSystem.create(HadoopFileSystem.java:141) at org.apache.flink.fs.s3hadoop.common.HadoopFileSystem.create(HadoopFileSystem.java:37) at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.create(SafetyNetWrapperFileSystem.java:126) at org.apache.flink.core.fs.EntropyInjector.createEntropyAware(EntropyInjector.java:61) at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:356) at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.flush(FsCheckpointStreamFactory.java:234) at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:309) at org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:179) at org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:108) at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:458) was: When flink-s3-fs-hadoop-1.10.0.jar is placed in the lib directory and flink accesses aws s3, the following exception occurs: Caused by: java.util.concurrent.ExecutionException: java.lang.NoSuchMethodError: org.apache.hadoop.util.SemaphoredDelegatingExecutor.(Lcom/google/common/util/concurrent/ListeningExecutorService;IZ)V at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:461) at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:53) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1143) ... 3 more Caused by: java.lang.NoSuchMethodError: org.apache.hadoop.util.SemaphoredDelegatingExecutor.(Lcom/google/common/util/concurrent/ListeningExecutorService;IZ)V at org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:769) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1169) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1149) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1038) at org.apache.flink.fs.s3hadoop.common.HadoopFileSystem.create(HadoopFileSystem.java:141) at org.apache.flink.fs.s3hadoop.common.HadoopFileSystem.create(HadoopFileSystem.java:37) at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.create(SafetyNetWrapperFileSystem.java:126) at org.apache.flink.core.fs.EntropyInjector.createEntropyAware(EntropyInjector.java:61) at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:356) at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.flush(FsCheckpointStreamFactory.java:234) at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:309) at org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:179)
[jira] [Commented] (FLINK-16791) Could not deploy Yarn job cluster when using flink-s3-fs-hadoop-1.10.0.jar
[ https://issues.apache.org/jira/browse/FLINK-16791?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17067576#comment-17067576 ] yanxiaobin commented on FLINK-16791: Hi Yang Wang, thanks. I have already put flink-shaded-hadoop into the lib directory and use {{export HADOOP_CLASSPATH=`hadoop classpath`.And I followed the documentation.}} > Could not deploy Yarn job cluster when using flink-s3-fs-hadoop-1.10.0.jar > --- > > Key: FLINK-16791 > URL: https://issues.apache.org/jira/browse/FLINK-16791 > Project: Flink > Issue Type: Bug > Components: Client / Job Submission >Affects Versions: 1.10.0 >Reporter: yanxiaobin >Priority: Major > > Since aws s3 is needed, so copy the flink-s3-fs-hadoop-1.10.0.jar JAR file to > the {{lib}} directory of Flink distribution. But when I submit a single flink > job on yarn mode, I found the following problems: > > Caused by: java.lang.ClassCastException: > org.apache.hadoop.yarn.proto.YarnServiceProtos$GetClusterNodesRequestProto > cannot be cast to com.google.protobuf.Message > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:225) > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:116) > at com.sun.proxy.$Proxy11.getClusterNodes(Unknown Source) > at > org.apache.hadoop.yarn.api.impl.pb.client.ApplicationClientProtocolPBClientImpl.getClusterNodes(ApplicationClientProtocolPBClientImpl.java:303) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:422) > at > org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:165) > at > org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:157) > at > org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:359) > at com.sun.proxy.$Proxy12.getClusterNodes(Unknown Source) > at > org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.getNodeReports(YarnClientImpl.java:564) > at > org.apache.flink.yarn.YarnClientYarnClusterInformationRetriever.getMaxVcores(YarnClientYarnClusterInformationRetriever.java:43) > at > org.apache.flink.yarn.YarnClusterDescriptor.isReadyForDeployment(YarnClusterDescriptor.java:278) > at > org.apache.flink.yarn.YarnClusterDescriptor.deployInternal(YarnClusterDescriptor.java:444) > at > org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:390) > ... 24 more -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-16791) Could not deploy Yarn job cluster when using flink-s3-fs-hadoop-1.10.0.jar
[ https://issues.apache.org/jira/browse/FLINK-16791?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17067618#comment-17067618 ] yanxiaobin commented on FLINK-16791: It does't work for now. The unshaded com.google.protobuf.Message is still loaded. > Could not deploy Yarn job cluster when using flink-s3-fs-hadoop-1.10.0.jar > --- > > Key: FLINK-16791 > URL: https://issues.apache.org/jira/browse/FLINK-16791 > Project: Flink > Issue Type: Bug > Components: Client / Job Submission >Affects Versions: 1.10.0 >Reporter: yanxiaobin >Priority: Major > > Since aws s3 is needed, so copy the flink-s3-fs-hadoop-1.10.0.jar JAR file to > the {{lib}} directory of Flink distribution. But when I submit a single flink > job on yarn mode, I found the following problems: > > Caused by: java.lang.ClassCastException: > org.apache.hadoop.yarn.proto.YarnServiceProtos$GetClusterNodesRequestProto > cannot be cast to com.google.protobuf.Message > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:225) > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:116) > at com.sun.proxy.$Proxy11.getClusterNodes(Unknown Source) > at > org.apache.hadoop.yarn.api.impl.pb.client.ApplicationClientProtocolPBClientImpl.getClusterNodes(ApplicationClientProtocolPBClientImpl.java:303) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:422) > at > org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:165) > at > org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:157) > at > org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:359) > at com.sun.proxy.$Proxy12.getClusterNodes(Unknown Source) > at > org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.getNodeReports(YarnClientImpl.java:564) > at > org.apache.flink.yarn.YarnClientYarnClusterInformationRetriever.getMaxVcores(YarnClientYarnClusterInformationRetriever.java:43) > at > org.apache.flink.yarn.YarnClusterDescriptor.isReadyForDeployment(YarnClusterDescriptor.java:278) > at > org.apache.flink.yarn.YarnClusterDescriptor.deployInternal(YarnClusterDescriptor.java:444) > at > org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:390) > ... 24 more -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16749861#comment-16749861 ] yanxiaobin commented on FLINK-8500: --- I have changed the title. > Get the timestamp of the Kafka message from kafka consumer > -- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Sub-task > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > Attachments: image-2018-01-30-14-58-58-167.png, > image-2018-01-31-10-48-59-633.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] yanxiaobin updated FLINK-8500: -- Summary: Get the timestamp of the Kafka message from kafka consumer (was: Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)) > Get the timestamp of the Kafka message from kafka consumer > -- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Sub-task > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > Attachments: image-2018-01-30-14-58-58-167.png, > image-2018-01-31-10-48-59-633.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)