[jira] [Created] (FLINK-12992) All host(s) tried for query failed (tried com.datastax.driver.core.exceptions.TransportException: Error writing...)

2019-06-25 Thread yanxiaobin (JIRA)
yanxiaobin created FLINK-12992:
--

 Summary: All host(s) tried for query failed (tried 
com.datastax.driver.core.exceptions.TransportException: Error writing...)
 Key: FLINK-12992
 URL: https://issues.apache.org/jira/browse/FLINK-12992
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Cassandra
Affects Versions: 1.8.0, 1.7.2
 Environment: 
 org.apache.flink
 flink-connector-cassandra_2.11
 1.8.0

Reporter: yanxiaobin


We are using flink streming application with cassandra connector providing 
sinks that writes data into a [Apache Cassandra|https://cassandra.apache.org/] 
database. 

At first we found the following exceptions:All host(s) tried for query failed 
(tried com.datastax.driver.core.exceptions.TransportException: Error 
writing...). This exception will cause the streaming job to fail

 

And we have carefully checked that Cassandra service and network are all 
normal. Finally, we refer to the source code of DataStax Java Driver that the 
connector depends on. We found that the real exception caused the problem is as 
follows:

com.datastax.shaded.netty.handler.codec.EncoderException: 
java.lang.IllegalAccessError: com/datastax/driver/core/Frame at 
com.datastax.shaded.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:107)
 at 
com.datastax.shaded.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:643)
 at 
com.datastax.shaded.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:700)
 at 
com.datastax.shaded.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:636)
 at 
com.datastax.shaded.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:112)
 at 
com.datastax.shaded.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:643)
 at 
com.datastax.shaded.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:700)
 at 
com.datastax.shaded.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:636)
 at 
com.datastax.shaded.netty.handler.timeout.IdleStateHandler.write(IdleStateHandler.java:284)
 at 
com.datastax.shaded.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:643)
 at 
com.datastax.shaded.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:700)
 at 
com.datastax.shaded.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:636)
 at 
com.datastax.shaded.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:622)
 at 
com.datastax.shaded.netty.channel.DefaultChannelPipeline.write(DefaultChannelPipeline.java:939)
 at 
com.datastax.shaded.netty.channel.AbstractChannel.write(AbstractChannel.java:234)
 at com.datastax.driver.core.Connection$Flusher.run(Connection.java:872) at 
com.datastax.shaded.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:358)
 at 
com.datastax.shaded.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357) 
at 
com.datastax.shaded.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:112)
 at java.lang.Thread.run(Thread.java:748) Caused by: 
java.lang.IllegalAccessError: com/datastax/driver/core/Frame at 
com.datastax.shaded.netty.util.internal.__matchers__.com.datastax.driver.core.FrameMatcher.match(NoOpTypeParameterMatcher.java)
 at 
com.datastax.shaded.netty.handler.codec.MessageToMessageEncoder.acceptOutboundMessage(MessageToMessageEncoder.java:77)
 at 
com.datastax.shaded.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:84)

 

Based on this exception, we found relevant information 
[https://datastax-oss.atlassian.net/browse/JAVA-1337?page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel]
  

 

Because I found that the latest version of flink-cassandra-connector uses the 
datastax Java driver old version 3.0.0.Perhaps we should upgrade the version on 
which the connector depends to Java driver 3.3.0+ to avoid this problem.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-16791) Could not deploy Yarn job cluster when using flink-s3-fs-hadoop-1.10.0.jar

2020-03-26 Thread yanxiaobin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16791?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17068293#comment-17068293
 ] 

yanxiaobin commented on FLINK-16791:


Hi,  thanks everyone. Just as [~kkl0u] said, It works to use the plugins 
mechanism with restricted classloaders as described here 
[https://ci.apache.org/projects/flink/flink-docs-master/ops/plugins.html].

>  Could not deploy Yarn job cluster when using flink-s3-fs-hadoop-1.10.0.jar
> ---
>
> Key: FLINK-16791
> URL: https://issues.apache.org/jira/browse/FLINK-16791
> Project: Flink
>  Issue Type: Bug
>  Components: Client / Job Submission
>Affects Versions: 1.10.0
>Reporter: yanxiaobin
>Priority: Major
>
> Since aws s3 is needed, so copy the flink-s3-fs-hadoop-1.10.0.jar JAR file to 
> the {{lib}} directory of Flink distribution. But when I submit a single flink 
> job on yarn mode, I found the following problems:
>  
> Caused by: java.lang.ClassCastException: 
> org.apache.hadoop.yarn.proto.YarnServiceProtos$GetClusterNodesRequestProto 
> cannot be cast to com.google.protobuf.Message
> at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:225)
> at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:116)
> at com.sun.proxy.$Proxy11.getClusterNodes(Unknown Source)
> at 
> org.apache.hadoop.yarn.api.impl.pb.client.ApplicationClientProtocolPBClientImpl.getClusterNodes(ApplicationClientProtocolPBClientImpl.java:303)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:422)
> at 
> org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:165)
> at 
> org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:157)
> at 
> org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95)
> at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:359)
> at com.sun.proxy.$Proxy12.getClusterNodes(Unknown Source)
> at 
> org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.getNodeReports(YarnClientImpl.java:564)
> at 
> org.apache.flink.yarn.YarnClientYarnClusterInformationRetriever.getMaxVcores(YarnClientYarnClusterInformationRetriever.java:43)
> at 
> org.apache.flink.yarn.YarnClusterDescriptor.isReadyForDeployment(YarnClusterDescriptor.java:278)
> at 
> org.apache.flink.yarn.YarnClusterDescriptor.deployInternal(YarnClusterDescriptor.java:444)
> at 
> org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:390)
> ... 24 more



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-16791) Could not deploy Yarn job cluster when using flink-s3-fs-hadoop-1.10.0.jar

2020-03-30 Thread yanxiaobin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16791?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17071439#comment-17071439
 ] 

yanxiaobin commented on FLINK-16791:


ok, thanks.

>  Could not deploy Yarn job cluster when using flink-s3-fs-hadoop-1.10.0.jar
> ---
>
> Key: FLINK-16791
> URL: https://issues.apache.org/jira/browse/FLINK-16791
> Project: Flink
>  Issue Type: Bug
>  Components: Client / Job Submission
>Affects Versions: 1.10.0
>Reporter: yanxiaobin
>Priority: Major
>
> Since aws s3 is needed, so copy the flink-s3-fs-hadoop-1.10.0.jar JAR file to 
> the {{lib}} directory of Flink distribution. But when I submit a single flink 
> job on yarn mode, I found the following problems:
>  
> Caused by: java.lang.ClassCastException: 
> org.apache.hadoop.yarn.proto.YarnServiceProtos$GetClusterNodesRequestProto 
> cannot be cast to com.google.protobuf.Message
> at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:225)
> at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:116)
> at com.sun.proxy.$Proxy11.getClusterNodes(Unknown Source)
> at 
> org.apache.hadoop.yarn.api.impl.pb.client.ApplicationClientProtocolPBClientImpl.getClusterNodes(ApplicationClientProtocolPBClientImpl.java:303)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:422)
> at 
> org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:165)
> at 
> org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:157)
> at 
> org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95)
> at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:359)
> at com.sun.proxy.$Proxy12.getClusterNodes(Unknown Source)
> at 
> org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.getNodeReports(YarnClientImpl.java:564)
> at 
> org.apache.flink.yarn.YarnClientYarnClusterInformationRetriever.getMaxVcores(YarnClientYarnClusterInformationRetriever.java:43)
> at 
> org.apache.flink.yarn.YarnClusterDescriptor.isReadyForDeployment(YarnClusterDescriptor.java:278)
> at 
> org.apache.flink.yarn.YarnClusterDescriptor.deployInternal(YarnClusterDescriptor.java:444)
> at 
> org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:390)
> ... 24 more



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-05-30 Thread yanxiaobin (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16496006#comment-16496006
 ] 

yanxiaobin commented on FLINK-8500:
---

So far, it can solve the current problems, but in the long run, there will 
still be some limitations on the support of future Kafka version. By the way, 
can we fix this problem in the 1.5.x series?

> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
> Fix For: 1.6.0
>
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-05-30 Thread yanxiaobin (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16496136#comment-16496136
 ] 

yanxiaobin commented on FLINK-8500:
---

[~tzulitai]   thanks, greate. 

> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
> Fix For: 1.6.0
>
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-05-30 Thread yanxiaobin (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16496136#comment-16496136
 ] 

yanxiaobin edited comment on FLINK-8500 at 5/31/18 5:42 AM:


[~tzulitai]   thanks. That is great. 


was (Author: backlight):
[~tzulitai]   thanks, greate. 

> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
> Fix For: 1.6.0
>
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-04-25 Thread yanxiaobin (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16451808#comment-16451808
 ] 

yanxiaobin commented on FLINK-8500:
---

hi [~aljoscha] :What is your idea? I now solve the problem by adjusting the 
source code, but it is very inconvenient for future upgrade. I hope it can be 
resolved in the new distribution as soon as possible so that I do not have to 
maintain another set of code myself. 

 

> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
> Fix For: 1.6.0
>
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-05-24 Thread yanxiaobin (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16488779#comment-16488779
 ] 

yanxiaobin commented on FLINK-8500:
---

I prefer to  to expose Kafka's ConsumerRecord directly(DeserializationSchema), 
thus ensuring the flexibility of the interface, otherwise there will always be 
various limitations.

 

> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
> Fix For: 1.6.0
>
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8794) When using BucketingSink, it happens that one of the files is always in the [.in-progress] state

2018-02-27 Thread yanxiaobin (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8794?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yanxiaobin updated FLINK-8794:
--
Description: 
When using BucketingSink, it happens that one of the files is always in the 
[.in-progress] state. And this state has never changed after that.  The 
underlying use of S3 as storage.

 
{code:java}
// code placeholder
{code}
2018-02-28 11:58:42  147341619 _part-28-0.in-progress

2018-02-28 12:06:27  147315059 part-0-0

2018-02-28 12:06:27  147462359 part-1-0

2018-02-28 12:06:27  147316006 part-10-0

2018-02-28 12:06:28  147349854 part-100-0

2018-02-28 12:06:27  147421625 part-101-0

2018-02-28 12:06:27  147443830 part-102-0

2018-02-28 12:06:27  147372801 part-103-0

2018-02-28 12:06:27  147343670 part-104-0

..

  was:When using BucketingSink, it happens that one of the files is always in 
the [.in-progress] state. And this state has never changed after that.  The 
underlying use of S3 as storage.


> When using BucketingSink, it happens that one of the files is always in the 
> [.in-progress] state
> 
>
> Key: FLINK-8794
> URL: https://issues.apache.org/jira/browse/FLINK-8794
> Project: Flink
>  Issue Type: Bug
>  Components: filesystem-connector
>Affects Versions: 1.4.0, 1.4.1
>Reporter: yanxiaobin
>Priority: Major
>
> When using BucketingSink, it happens that one of the files is always in the 
> [.in-progress] state. And this state has never changed after that.  The 
> underlying use of S3 as storage.
>  
> {code:java}
> // code placeholder
> {code}
> 2018-02-28 11:58:42  147341619 _part-28-0.in-progress
> 2018-02-28 12:06:27  147315059 part-0-0
> 2018-02-28 12:06:27  147462359 part-1-0
> 2018-02-28 12:06:27  147316006 part-10-0
> 2018-02-28 12:06:28  147349854 part-100-0
> 2018-02-28 12:06:27  147421625 part-101-0
> 2018-02-28 12:06:27  147443830 part-102-0
> 2018-02-28 12:06:27  147372801 part-103-0
> 2018-02-28 12:06:27  147343670 part-104-0
> ..



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8794) When using BucketingSink, it happens that one of the files is always in the [.in-progress] state

2018-02-27 Thread yanxiaobin (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8794?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yanxiaobin updated FLINK-8794:
--
Description: 
When using BucketingSink, it happens that one of the files is always in the 
[.in-progress] state. And this state has never changed after that.  The 
underlying use of S3 as storage.

 
{code:java}
// code placeholder
{code}
2018-02-28 11:58:42  147341619 {color:#d04437}_part-28-0.in-progress{color}

2018-02-28 12:06:27  147315059 part-0-0

2018-02-28 12:06:27  147462359 part-1-0

2018-02-28 12:06:27  147316006 part-10-0

2018-02-28 12:06:28  147349854 part-100-0

2018-02-28 12:06:27  147421625 part-101-0

2018-02-28 12:06:27  147443830 part-102-0

2018-02-28 12:06:27  147372801 part-103-0

2018-02-28 12:06:27  147343670 part-104-0

..

  was:
When using BucketingSink, it happens that one of the files is always in the 
[.in-progress] state. And this state has never changed after that.  The 
underlying use of S3 as storage.

 
{code:java}
// code placeholder
{code}
2018-02-28 11:58:42  147341619 _part-28-0.in-progress

2018-02-28 12:06:27  147315059 part-0-0

2018-02-28 12:06:27  147462359 part-1-0

2018-02-28 12:06:27  147316006 part-10-0

2018-02-28 12:06:28  147349854 part-100-0

2018-02-28 12:06:27  147421625 part-101-0

2018-02-28 12:06:27  147443830 part-102-0

2018-02-28 12:06:27  147372801 part-103-0

2018-02-28 12:06:27  147343670 part-104-0

..


> When using BucketingSink, it happens that one of the files is always in the 
> [.in-progress] state
> 
>
> Key: FLINK-8794
> URL: https://issues.apache.org/jira/browse/FLINK-8794
> Project: Flink
>  Issue Type: Bug
>  Components: filesystem-connector
>Affects Versions: 1.4.0, 1.4.1
>Reporter: yanxiaobin
>Priority: Major
>
> When using BucketingSink, it happens that one of the files is always in the 
> [.in-progress] state. And this state has never changed after that.  The 
> underlying use of S3 as storage.
>  
> {code:java}
> // code placeholder
> {code}
> 2018-02-28 11:58:42  147341619 {color:#d04437}_part-28-0.in-progress{color}
> 2018-02-28 12:06:27  147315059 part-0-0
> 2018-02-28 12:06:27  147462359 part-1-0
> 2018-02-28 12:06:27  147316006 part-10-0
> 2018-02-28 12:06:28  147349854 part-100-0
> 2018-02-28 12:06:27  147421625 part-101-0
> 2018-02-28 12:06:27  147443830 part-102-0
> 2018-02-28 12:06:27  147372801 part-103-0
> 2018-02-28 12:06:27  147343670 part-104-0
> ..



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8794) When using BucketingSink, it happens that one of the files is always in the [.in-progress] state

2018-03-01 Thread yanxiaobin (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8794?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383209#comment-16383209
 ] 

yanxiaobin commented on FLINK-8794:
---

hi, [~aljoscha] Thank you for your reply!
 
There are the following points:

1.What I described above is that there will be such a situation when there is 
no failure in this job.

2.This happens when a job has a failure(because one of the taskmanager nodes 
downtime) and recovery. Fault tolerance of a node in distributed computing is 
necessary.Because this is a problem in this case.

3.When recovery, the previous in-progress and pending files are not 
cleared,this causes the downstream processor to read excess dirty data.

5.I think we should first place data in computing nodes' local files, then 
upload them to the distributed file system after the local file is written 
completely, for example, S3, HDFS.

We are blocked of the problem at the moment. and because of this problem, we 
can't use this job.

 

> When using BucketingSink, it happens that one of the files is always in the 
> [.in-progress] state
> 
>
> Key: FLINK-8794
> URL: https://issues.apache.org/jira/browse/FLINK-8794
> Project: Flink
>  Issue Type: Bug
>  Components: filesystem-connector
>Affects Versions: 1.4.0, 1.4.1
>Reporter: yanxiaobin
>Priority: Major
>
> When using BucketingSink, it happens that one of the files is always in the 
> [.in-progress] state. And this state has never changed after that.  The 
> underlying use of S3 as storage.
>  
> {code:java}
> // code placeholder
> {code}
> 2018-02-28 11:58:42  147341619 {color:#d04437}_part-28-0.in-progress{color}
> 2018-02-28 12:06:27  147315059 part-0-0
> 2018-02-28 12:06:27  147462359 part-1-0
> 2018-02-28 12:06:27  147316006 part-10-0
> 2018-02-28 12:06:28  147349854 part-100-0
> 2018-02-28 12:06:27  147421625 part-101-0
> 2018-02-28 12:06:27  147443830 part-102-0
> 2018-02-28 12:06:27  147372801 part-103-0
> 2018-02-28 12:06:27  147343670 part-104-0
> ..



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8794) When using BucketingSink, it happens that one of the files is always in the [.in-progress] state

2018-03-02 Thread yanxiaobin (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8794?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383356#comment-16383356
 ] 

yanxiaobin commented on FLINK-8794:
---

hi, [~pnowojski] ! Thank you for your suggestion! The downstream processor can 
ignore the files with "*pending" or "*in-progress" sufixes and "_" prefix, but 
I don't think it's a good way to deal with it. We can change this behaviour/add 
an option for BucketingSink to use temporary "in-progress" and "pending" 
directories instead of prefixes, but the temporary "in-progress" and "pending" 
directories is still also a subdirectory of the base directory, and the 
downstream processor may still read the base directory recursively, It also 
results in reading redundant dirty data. I think the temporary data produced 
during the program should be isolated from the final output data. Thanks!

Also [~kkl0u] could you elaborate why rescaling forced us to keep lingering 
files? 

 

> When using BucketingSink, it happens that one of the files is always in the 
> [.in-progress] state
> 
>
> Key: FLINK-8794
> URL: https://issues.apache.org/jira/browse/FLINK-8794
> Project: Flink
>  Issue Type: Bug
>  Components: filesystem-connector
>Affects Versions: 1.4.0, 1.4.1
>Reporter: yanxiaobin
>Priority: Major
>
> When using BucketingSink, it happens that one of the files is always in the 
> [.in-progress] state. And this state has never changed after that.  The 
> underlying use of S3 as storage.
>  
> {code:java}
> // code placeholder
> {code}
> 2018-02-28 11:58:42  147341619 {color:#d04437}_part-28-0.in-progress{color}
> 2018-02-28 12:06:27  147315059 part-0-0
> 2018-02-28 12:06:27  147462359 part-1-0
> 2018-02-28 12:06:27  147316006 part-10-0
> 2018-02-28 12:06:28  147349854 part-100-0
> 2018-02-28 12:06:27  147421625 part-101-0
> 2018-02-28 12:06:27  147443830 part-102-0
> 2018-02-28 12:06:27  147372801 part-103-0
> 2018-02-28 12:06:27  147343670 part-104-0
> ..



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-8794) When using BucketingSink, it happens that one of the files is always in the [.in-progress] state

2018-03-02 Thread yanxiaobin (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8794?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383356#comment-16383356
 ] 

yanxiaobin edited comment on FLINK-8794 at 3/2/18 8:40 AM:
---

hi, [~pnowojski] Thank you for your suggestion! The downstream processor can 
ignore the files with "*pending" or "*in-progress" sufixes and "_" prefix, but 
I don't think it's a good way to deal with it. We can change this behaviour/add 
an option for BucketingSink to use temporary "in-progress" and "pending" 
directories instead of prefixes, but the temporary "in-progress" and "pending" 
directories is still also a subdirectory of the base directory, and the 
downstream processor may still read the base directory recursively, It also 
results in reading redundant dirty data. I think the temporary data produced 
during the program should be isolated from the final output data. Thanks!


Also [~kkl0u] could you elaborate why rescaling forced us to keep lingering 
files? 

 


was (Author: backlight):
hi, [~pnowojski] ! Thank you for your suggestion! The downstream processor can 
ignore the files with "*pending" or "*in-progress" sufixes and "_" prefix, but 
I don't think it's a good way to deal with it. We can change this behaviour/add 
an option for BucketingSink to use temporary "in-progress" and "pending" 
directories instead of prefixes, but the temporary "in-progress" and "pending" 
directories is still also a subdirectory of the base directory, and the 
downstream processor may still read the base directory recursively, It also 
results in reading redundant dirty data. I think the temporary data produced 
during the program should be isolated from the final output data. Thanks!

Also [~kkl0u] could you elaborate why rescaling forced us to keep lingering 
files? 

 

> When using BucketingSink, it happens that one of the files is always in the 
> [.in-progress] state
> 
>
> Key: FLINK-8794
> URL: https://issues.apache.org/jira/browse/FLINK-8794
> Project: Flink
>  Issue Type: Bug
>  Components: filesystem-connector
>Affects Versions: 1.4.0, 1.4.1
>Reporter: yanxiaobin
>Priority: Major
>
> When using BucketingSink, it happens that one of the files is always in the 
> [.in-progress] state. And this state has never changed after that.  The 
> underlying use of S3 as storage.
>  
> {code:java}
> // code placeholder
> {code}
> 2018-02-28 11:58:42  147341619 {color:#d04437}_part-28-0.in-progress{color}
> 2018-02-28 12:06:27  147315059 part-0-0
> 2018-02-28 12:06:27  147462359 part-1-0
> 2018-02-28 12:06:27  147316006 part-10-0
> 2018-02-28 12:06:28  147349854 part-100-0
> 2018-02-28 12:06:27  147421625 part-101-0
> 2018-02-28 12:06:27  147443830 part-102-0
> 2018-02-28 12:06:27  147372801 part-103-0
> 2018-02-28 12:06:27  147343670 part-104-0
> ..



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8794) When using BucketingSink, it happens that one of the files is always in the [.in-progress] state

2018-03-03 Thread yanxiaobin (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8794?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16384954#comment-16384954
 ] 

yanxiaobin commented on FLINK-8794:
---

yes. If we allow for different directory that should be already enough.So how 
do we solve this problem? Adn when using BucketingSink, it happens that one of 
the files is always in the [.in-progress] state. And this state has never 
changed after that.

> When using BucketingSink, it happens that one of the files is always in the 
> [.in-progress] state
> 
>
> Key: FLINK-8794
> URL: https://issues.apache.org/jira/browse/FLINK-8794
> Project: Flink
>  Issue Type: Bug
>  Components: filesystem-connector
>Affects Versions: 1.4.0, 1.4.1
>Reporter: yanxiaobin
>Priority: Major
>
> When using BucketingSink, it happens that one of the files is always in the 
> [.in-progress] state. And this state has never changed after that.  The 
> underlying use of S3 as storage.
>  
> {code:java}
> // code placeholder
> {code}
> 2018-02-28 11:58:42  147341619 {color:#d04437}_part-28-0.in-progress{color}
> 2018-02-28 12:06:27  147315059 part-0-0
> 2018-02-28 12:06:27  147462359 part-1-0
> 2018-02-28 12:06:27  147316006 part-10-0
> 2018-02-28 12:06:28  147349854 part-100-0
> 2018-02-28 12:06:27  147421625 part-101-0
> 2018-02-28 12:06:27  147443830 part-102-0
> 2018-02-28 12:06:27  147372801 part-103-0
> 2018-02-28 12:06:27  147343670 part-104-0
> ..



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-8794) When using BucketingSink, it happens that one of the files is always in the [.in-progress] state

2018-03-03 Thread yanxiaobin (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8794?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16384954#comment-16384954
 ] 

yanxiaobin edited comment on FLINK-8794 at 3/4/18 4:13 AM:
---

yes. If we allow for different directory that should be already enough.So how 
do we solve this problem? And when using BucketingSink, it happens that one of 
the files is always in the [.in-progress] state. And this state has never 
changed after that.


was (Author: backlight):
yes. If we allow for different directory that should be already enough.So how 
do we solve this problem? Adn when using BucketingSink, it happens that one of 
the files is always in the [.in-progress] state. And this state has never 
changed after that.

> When using BucketingSink, it happens that one of the files is always in the 
> [.in-progress] state
> 
>
> Key: FLINK-8794
> URL: https://issues.apache.org/jira/browse/FLINK-8794
> Project: Flink
>  Issue Type: Bug
>  Components: filesystem-connector
>Affects Versions: 1.4.0, 1.4.1
>Reporter: yanxiaobin
>Priority: Major
>
> When using BucketingSink, it happens that one of the files is always in the 
> [.in-progress] state. And this state has never changed after that.  The 
> underlying use of S3 as storage.
>  
> {code:java}
> // code placeholder
> {code}
> 2018-02-28 11:58:42  147341619 {color:#d04437}_part-28-0.in-progress{color}
> 2018-02-28 12:06:27  147315059 part-0-0
> 2018-02-28 12:06:27  147462359 part-1-0
> 2018-02-28 12:06:27  147316006 part-10-0
> 2018-02-28 12:06:28  147349854 part-100-0
> 2018-02-28 12:06:27  147421625 part-101-0
> 2018-02-28 12:06:27  147443830 part-102-0
> 2018-02-28 12:06:27  147372801 part-103-0
> 2018-02-28 12:06:27  147343670 part-104-0
> ..



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-03-08 Thread yanxiaobin (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16390914#comment-16390914
 ] 

yanxiaobin commented on FLINK-8500:
---

Sorry to reply so late. I see. We really should be careful here. I want to find 
out how I should get Kafka timestamp in like flatmap and map methods of 
datastream.

> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
> Fix For: 1.6.0
>
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-03-08 Thread yanxiaobin (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16390914#comment-16390914
 ] 

yanxiaobin edited comment on FLINK-8500 at 3/8/18 8:47 AM:
---

Sorry to reply so late. I see. We really should be careful here. I want to find 
out how I should get Kafka timestamp in like flatmap and map methods of 
datastream using scala programming language.


was (Author: backlight):
Sorry to reply so late. I see. We really should be careful here. I want to find 
out how I should get Kafka timestamp in like flatmap and map methods of 
datastream.

> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
> Fix For: 1.6.0
>
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8794) When using BucketingSink, it happens that one of the files is always in the [.in-progress] state

2018-03-09 Thread yanxiaobin (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8794?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yanxiaobin updated FLINK-8794:
--
Issue Type: Improvement  (was: Bug)

> When using BucketingSink, it happens that one of the files is always in the 
> [.in-progress] state
> 
>
> Key: FLINK-8794
> URL: https://issues.apache.org/jira/browse/FLINK-8794
> Project: Flink
>  Issue Type: Improvement
>  Components: filesystem-connector
>Affects Versions: 1.4.0, 1.4.1
>Reporter: yanxiaobin
>Priority: Major
>
> When using BucketingSink, it happens that one of the files is always in the 
> [.in-progress] state. And this state has never changed after that.  The 
> underlying use of S3 as storage.
>  
> {code:java}
> // code placeholder
> {code}
> 2018-02-28 11:58:42  147341619 {color:#d04437}_part-28-0.in-progress{color}
> 2018-02-28 12:06:27  147315059 part-0-0
> 2018-02-28 12:06:27  147462359 part-1-0
> 2018-02-28 12:06:27  147316006 part-10-0
> 2018-02-28 12:06:28  147349854 part-100-0
> 2018-02-28 12:06:27  147421625 part-101-0
> 2018-02-28 12:06:27  147443830 part-102-0
> 2018-02-28 12:06:27  147372801 part-103-0
> 2018-02-28 12:06:27  147343670 part-104-0
> ..



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8794) When using BucketingSink, it happens that one of the files is always in the [.in-progress] state

2018-03-09 Thread yanxiaobin (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8794?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16393956#comment-16393956
 ] 

yanxiaobin commented on FLINK-8794:
---

About : 1.What I described above is that there will be such a situation when 
there is no failure in this job.

I found through log that filesystem's rename method has been executed without 
any exception, but the filename hasn't changed, so I think it should be S3's 
problem. This should not be a problem with Flink.

> When using BucketingSink, it happens that one of the files is always in the 
> [.in-progress] state
> 
>
> Key: FLINK-8794
> URL: https://issues.apache.org/jira/browse/FLINK-8794
> Project: Flink
>  Issue Type: Improvement
>  Components: filesystem-connector
>Affects Versions: 1.4.0, 1.4.1
>Reporter: yanxiaobin
>Priority: Major
>
> When using BucketingSink, it happens that one of the files is always in the 
> [.in-progress] state. And this state has never changed after that.  The 
> underlying use of S3 as storage.
>  
> {code:java}
> // code placeholder
> {code}
> 2018-02-28 11:58:42  147341619 {color:#d04437}_part-28-0.in-progress{color}
> 2018-02-28 12:06:27  147315059 part-0-0
> 2018-02-28 12:06:27  147462359 part-1-0
> 2018-02-28 12:06:27  147316006 part-10-0
> 2018-02-28 12:06:28  147349854 part-100-0
> 2018-02-28 12:06:27  147421625 part-101-0
> 2018-02-28 12:06:27  147443830 part-102-0
> 2018-02-28 12:06:27  147372801 part-103-0
> 2018-02-28 12:06:27  147343670 part-104-0
> ..



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-8794) When using BucketingSink, it happens that one of the files is always in the [.in-progress] state

2018-03-09 Thread yanxiaobin (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8794?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16393956#comment-16393956
 ] 

yanxiaobin edited comment on FLINK-8794 at 3/10/18 1:53 AM:


About : 1.What I described above is that there will be such a situation when 
there is no failure in this job.
 
I think I've found the problem.

I found through log that filesystem's rename method has been executed without 
any exception, but the filename hasn't changed, so I think it should be S3's 
problem. This should not be a problem with Flink.


was (Author: backlight):
About : 1.What I described above is that there will be such a situation when 
there is no failure in this job.

I found through log that filesystem's rename method has been executed without 
any exception, but the filename hasn't changed, so I think it should be S3's 
problem. This should not be a problem with Flink.

> When using BucketingSink, it happens that one of the files is always in the 
> [.in-progress] state
> 
>
> Key: FLINK-8794
> URL: https://issues.apache.org/jira/browse/FLINK-8794
> Project: Flink
>  Issue Type: Improvement
>  Components: filesystem-connector
>Affects Versions: 1.4.0, 1.4.1
>Reporter: yanxiaobin
>Priority: Major
>
> When using BucketingSink, it happens that one of the files is always in the 
> [.in-progress] state. And this state has never changed after that.  The 
> underlying use of S3 as storage.
>  
> {code:java}
> // code placeholder
> {code}
> 2018-02-28 11:58:42  147341619 {color:#d04437}_part-28-0.in-progress{color}
> 2018-02-28 12:06:27  147315059 part-0-0
> 2018-02-28 12:06:27  147462359 part-1-0
> 2018-02-28 12:06:27  147316006 part-10-0
> 2018-02-28 12:06:28  147349854 part-100-0
> 2018-02-28 12:06:27  147421625 part-101-0
> 2018-02-28 12:06:27  147443830 part-102-0
> 2018-02-28 12:06:27  147372801 part-103-0
> 2018-02-28 12:06:27  147343670 part-104-0
> ..



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8794) When using BucketingSink, it happens that one of the files is always in the [.in-progress] state

2018-03-12 Thread yanxiaobin (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8794?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16395286#comment-16395286
 ] 

yanxiaobin commented on FLINK-8794:
---

I will open a case with AWS for the root cause of the problem.

> When using BucketingSink, it happens that one of the files is always in the 
> [.in-progress] state
> 
>
> Key: FLINK-8794
> URL: https://issues.apache.org/jira/browse/FLINK-8794
> Project: Flink
>  Issue Type: Improvement
>  Components: filesystem-connector
>Affects Versions: 1.4.0, 1.4.1
>Reporter: yanxiaobin
>Priority: Major
>
> When using BucketingSink, it happens that one of the files is always in the 
> [.in-progress] state. And this state has never changed after that.  The 
> underlying use of S3 as storage.
>  
> {code:java}
> // code placeholder
> {code}
> 2018-02-28 11:58:42  147341619 {color:#d04437}_part-28-0.in-progress{color}
> 2018-02-28 12:06:27  147315059 part-0-0
> 2018-02-28 12:06:27  147462359 part-1-0
> 2018-02-28 12:06:27  147316006 part-10-0
> 2018-02-28 12:06:28  147349854 part-100-0
> 2018-02-28 12:06:27  147421625 part-101-0
> 2018-02-28 12:06:27  147443830 part-102-0
> 2018-02-28 12:06:27  147372801 part-103-0
> 2018-02-28 12:06:27  147343670 part-104-0
> ..



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8939) Provide better support for saving streaming data to s3

2018-03-14 Thread yanxiaobin (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8939?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16398211#comment-16398211
 ] 

yanxiaobin commented on FLINK-8939:
---

I am also using BucketingSink API for saving streaming data to s3.  Have you 
ever met this problem?see : https://issues.apache.org/jira/browse/FLINK-8794

> Provide better support for saving streaming data to s3
> --
>
> Key: FLINK-8939
> URL: https://issues.apache.org/jira/browse/FLINK-8939
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: chris snow
>Priority: Major
> Attachments: 18652BC0-DD67-42C3-9A33-12F7BC10F9F3.png
>
>
> Flink seems to struggle saving data to s3 due to the lack of a truncate 
> method, and in my test this resulted in lots of files with a .valid-length 
> suffix
> I’m using a bucketing sink:
> {code:java}
> return new BucketingSink>(path)
> .setWriter(writer)
> .setBucketer(new DateTimeBucketer Object>>(formatString));{code}
>  
> !18652BC0-DD67-42C3-9A33-12F7BC10F9F3.png!
> See also, the discussion in the comments here: 
> https://issues.apache.org/jira/browse/FLINK-8543?filter=-2



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-8939) Provide better support for saving streaming data to s3

2018-03-14 Thread yanxiaobin (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8939?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16398211#comment-16398211
 ] 

yanxiaobin edited comment on FLINK-8939 at 3/14/18 8:07 AM:


I am also using BucketingSink API for saving streaming data to s3.  Have you 
ever met this problem?see : 
[FLINK-8794|https://issues.apache.org/jira/browse/FLINK-8794]


was (Author: backlight):
I am also using BucketingSink API for saving streaming data to s3.  Have you 
ever met this problem?see : https://issues.apache.org/jira/browse/FLINK-8794

> Provide better support for saving streaming data to s3
> --
>
> Key: FLINK-8939
> URL: https://issues.apache.org/jira/browse/FLINK-8939
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: chris snow
>Priority: Major
> Attachments: 18652BC0-DD67-42C3-9A33-12F7BC10F9F3.png
>
>
> Flink seems to struggle saving data to s3 due to the lack of a truncate 
> method, and in my test this resulted in lots of files with a .valid-length 
> suffix
> I’m using a bucketing sink:
> {code:java}
> return new BucketingSink>(path)
> .setWriter(writer)
> .setBucketer(new DateTimeBucketer Object>>(formatString));{code}
>  
> !18652BC0-DD67-42C3-9A33-12F7BC10F9F3.png!
> See also, the discussion in the comments here: 
> https://issues.apache.org/jira/browse/FLINK-8543?filter=-2



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8794) When using BucketingSink, it happens that one of the files is always in the [.in-progress] state

2018-03-19 Thread yanxiaobin (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8794?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16404584#comment-16404584
 ] 

yanxiaobin commented on FLINK-8794:
---

S3 follows the *eventually consistent* principle, so this problem S3 has no 
good solution for the time being.

> When using BucketingSink, it happens that one of the files is always in the 
> [.in-progress] state
> 
>
> Key: FLINK-8794
> URL: https://issues.apache.org/jira/browse/FLINK-8794
> Project: Flink
>  Issue Type: Improvement
>  Components: filesystem-connector
>Affects Versions: 1.4.0, 1.4.1
>Reporter: yanxiaobin
>Priority: Major
>
> When using BucketingSink, it happens that one of the files is always in the 
> [.in-progress] state. And this state has never changed after that.  The 
> underlying use of S3 as storage.
>  
> {code:java}
> // code placeholder
> {code}
> 2018-02-28 11:58:42  147341619 {color:#d04437}_part-28-0.in-progress{color}
> 2018-02-28 12:06:27  147315059 part-0-0
> 2018-02-28 12:06:27  147462359 part-1-0
> 2018-02-28 12:06:27  147316006 part-10-0
> 2018-02-28 12:06:28  147349854 part-100-0
> 2018-02-28 12:06:27  147421625 part-101-0
> 2018-02-28 12:06:27  147443830 part-102-0
> 2018-02-28 12:06:27  147372801 part-103-0
> 2018-02-28 12:06:27  147343670 part-104-0
> ..



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8939) Provide better support for saving streaming data to s3

2018-03-25 Thread yanxiaobin (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8939?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16413319#comment-16413319
 ] 

yanxiaobin commented on FLINK-8939:
---

thanks! Because S3 follows the *eventually consistent* principle. When using 
S3(not enable consistent view), sometimes the file is not renamed from 
the.inprogress suffix to the.pending suffix.

> Provide better support for saving streaming data to s3
> --
>
> Key: FLINK-8939
> URL: https://issues.apache.org/jira/browse/FLINK-8939
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API, Streaming Connectors
>Reporter: chris snow
>Priority: Major
> Attachments: 18652BC0-DD67-42C3-9A33-12F7BC10F9F3.png
>
>
> Flink seems to struggle saving data to s3 due to the lack of a truncate 
> method, and in my test this resulted in lots of files with a .valid-length 
> suffix
> I’m using a bucketing sink:
> {code:java}
> return new BucketingSink>(path)
> .setWriter(writer)
> .setBucketer(new DateTimeBucketer Object>>(formatString));{code}
>  
> !18652BC0-DD67-42C3-9A33-12F7BC10F9F3.png!
> See also, the discussion in the comments here: 
> https://issues.apache.org/jira/browse/FLINK-8543?filter=-2



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8939) Provide better support for saving streaming data to s3

2018-03-26 Thread yanxiaobin (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8939?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16415112#comment-16415112
 ] 

yanxiaobin commented on FLINK-8939:
---

I'm using AWS S3. This problem has been plaguing me all the time.

> Provide better support for saving streaming data to s3
> --
>
> Key: FLINK-8939
> URL: https://issues.apache.org/jira/browse/FLINK-8939
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API, Streaming Connectors
>Reporter: chris snow
>Priority: Major
> Attachments: 18652BC0-DD67-42C3-9A33-12F7BC10F9F3.png
>
>
> Flink seems to struggle saving data to s3 due to the lack of a truncate 
> method, and in my test this resulted in lots of files with a .valid-length 
> suffix
> I’m using a bucketing sink:
> {code:java}
> return new BucketingSink>(path)
> .setWriter(writer)
> .setBucketer(new DateTimeBucketer Object>>(formatString));{code}
>  
> !18652BC0-DD67-42C3-9A33-12F7BC10F9F3.png!
> See also, the discussion in the comments here: 
> https://issues.apache.org/jira/browse/FLINK-8543?filter=-2



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8794) When using BucketingSink, it happens that one of the files is always in the [.in-progress] state

2018-03-27 Thread yanxiaobin (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8794?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16415131#comment-16415131
 ] 

yanxiaobin commented on FLINK-8794:
---

The underlying implementation is 
com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem. I am using hadoop 2.7.3.I 
haven't thought of a good solution for the time being.

> When using BucketingSink, it happens that one of the files is always in the 
> [.in-progress] state
> 
>
> Key: FLINK-8794
> URL: https://issues.apache.org/jira/browse/FLINK-8794
> Project: Flink
>  Issue Type: Improvement
>  Components: filesystem-connector
>Affects Versions: 1.4.0, 1.4.1
>Reporter: yanxiaobin
>Priority: Major
>
> When using BucketingSink, it happens that one of the files is always in the 
> [.in-progress] state. And this state has never changed after that.  The 
> underlying use of S3 as storage.
>  
> {code:java}
> // code placeholder
> {code}
> 2018-02-28 11:58:42  147341619 {color:#d04437}_part-28-0.in-progress{color}
> 2018-02-28 12:06:27  147315059 part-0-0
> 2018-02-28 12:06:27  147462359 part-1-0
> 2018-02-28 12:06:27  147316006 part-10-0
> 2018-02-28 12:06:28  147349854 part-100-0
> 2018-02-28 12:06:27  147421625 part-101-0
> 2018-02-28 12:06:27  147443830 part-102-0
> 2018-02-28 12:06:27  147372801 part-103-0
> 2018-02-28 12:06:27  147343670 part-104-0
> ..



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8794) When using BucketingSink, it happens that one of the files is always in the [.in-progress] state

2018-03-27 Thread yanxiaobin (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8794?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16416734#comment-16416734
 ] 

yanxiaobin commented on FLINK-8794:
---

thanks! indeed so.  Enable consistent-view can cause other problems.

> When using BucketingSink, it happens that one of the files is always in the 
> [.in-progress] state
> 
>
> Key: FLINK-8794
> URL: https://issues.apache.org/jira/browse/FLINK-8794
> Project: Flink
>  Issue Type: Improvement
>  Components: filesystem-connector
>Affects Versions: 1.4.0, 1.4.1
>Reporter: yanxiaobin
>Priority: Major
>
> When using BucketingSink, it happens that one of the files is always in the 
> [.in-progress] state. And this state has never changed after that.  The 
> underlying use of S3 as storage.
>  
> {code:java}
> // code placeholder
> {code}
> 2018-02-28 11:58:42  147341619 {color:#d04437}_part-28-0.in-progress{color}
> 2018-02-28 12:06:27  147315059 part-0-0
> 2018-02-28 12:06:27  147462359 part-1-0
> 2018-02-28 12:06:27  147316006 part-10-0
> 2018-02-28 12:06:28  147349854 part-100-0
> 2018-02-28 12:06:27  147421625 part-101-0
> 2018-02-28 12:06:27  147443830 part-102-0
> 2018-02-28 12:06:27  147372801 part-103-0
> 2018-02-28 12:06:27  147343670 part-104-0
> ..



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8794) When using BucketingSink, it happens that one of the files is always in the [.in-progress] state

2018-03-28 Thread yanxiaobin (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8794?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16417174#comment-16417174
 ] 

yanxiaobin commented on FLINK-8794:
---

Other problems have nothing to do with Flink.

> When using BucketingSink, it happens that one of the files is always in the 
> [.in-progress] state
> 
>
> Key: FLINK-8794
> URL: https://issues.apache.org/jira/browse/FLINK-8794
> Project: Flink
>  Issue Type: Improvement
>  Components: filesystem-connector
>Affects Versions: 1.4.0, 1.4.1
>Reporter: yanxiaobin
>Priority: Major
>
> When using BucketingSink, it happens that one of the files is always in the 
> [.in-progress] state. And this state has never changed after that.  The 
> underlying use of S3 as storage.
>  
> {code:java}
> // code placeholder
> {code}
> 2018-02-28 11:58:42  147341619 {color:#d04437}_part-28-0.in-progress{color}
> 2018-02-28 12:06:27  147315059 part-0-0
> 2018-02-28 12:06:27  147462359 part-1-0
> 2018-02-28 12:06:27  147316006 part-10-0
> 2018-02-28 12:06:28  147349854 part-100-0
> 2018-02-28 12:06:27  147421625 part-101-0
> 2018-02-28 12:06:27  147443830 part-102-0
> 2018-02-28 12:06:27  147372801 part-103-0
> 2018-02-28 12:06:27  147343670 part-104-0
> ..



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8321) Generate _SUCCESS (map-reduce style) when folder has been written!

2017-12-26 Thread yanxiaobin (JIRA)
yanxiaobin created FLINK-8321:
-

 Summary: Generate _SUCCESS (map-reduce style) when folder has been 
written!
 Key: FLINK-8321
 URL: https://issues.apache.org/jira/browse/FLINK-8321
 Project: Flink
  Issue Type: Bug
  Components: filesystem-connector
Affects Versions: 1.4.0
 Environment: How to write the data that is processed in
each time window to each single HDFS directory correspondingly when using 
DataStream API in streaming processing application, and generate _SUCCESS 
(map-reduce style) when folder has been written!
Reporter: yanxiaobin






--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-8321) Generate _SUCCESS (map-reduce style) when folder has been written!

2017-12-26 Thread yanxiaobin (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8321?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yanxiaobin updated FLINK-8321:
--
Description: There is no success file generation, and the downstream 
processing can't determine when the directory is finished

> Generate _SUCCESS (map-reduce style) when folder has been written!
> --
>
> Key: FLINK-8321
> URL: https://issues.apache.org/jira/browse/FLINK-8321
> Project: Flink
>  Issue Type: Bug
>  Components: filesystem-connector
>Affects Versions: 1.4.0
> Environment: How to write the data that is processed in
> each time window to each single HDFS directory correspondingly when using 
> DataStream API in streaming processing application, and generate _SUCCESS 
> (map-reduce style) when folder has been written!
>Reporter: yanxiaobin
>
> There is no success file generation, and the downstream processing can't 
> determine when the directory is finished



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-01-23 Thread yanxiaobin (JIRA)
yanxiaobin created FLINK-8500:
-

 Summary: Get the timestamp of the Kafka message from kafka 
consumer(Kafka010Fetcher)
 Key: FLINK-8500
 URL: https://issues.apache.org/jira/browse/FLINK-8500
 Project: Flink
  Issue Type: Improvement
  Components: Kafka Connector
Affects Versions: 1.4.0
Reporter: yanxiaobin


The method deserialize of KeyedDeserializationSchema  needs a parameter 'kafka 
message timestamp' (from ConsumerRecord) .In some business scenarios, this is 
useful!

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-01-25 Thread yanxiaobin (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16338958#comment-16338958
 ] 

yanxiaobin commented on FLINK-8500:
---

I mean that I want to use Kafka message timestamp as event time!  Thanks!

> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-5479) Per-partition watermarks in FlinkKafkaConsumer should consider idle partitions

2018-01-29 Thread yanxiaobin (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16343115#comment-16343115
 ] 

yanxiaobin commented on FLINK-5479:
---

I've also met this problem at the moment.

> Per-partition watermarks in FlinkKafkaConsumer should consider idle partitions
> --
>
> Key: FLINK-5479
> URL: https://issues.apache.org/jira/browse/FLINK-5479
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.5.0
>
>
> Reported in ML: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-topic-partition-skewness-causes-watermark-not-being-emitted-td11008.html
> Similar to what's happening to idle sources blocking watermark progression in 
> downstream operators (see FLINK-5017), the per-partition watermark mechanism 
> in {{FlinkKafkaConsumer}} is also being blocked of progressing watermarks 
> when a partition is idle. The watermark of idle partitions is always 
> {{Long.MIN_VALUE}}, therefore the overall min watermark across all partitions 
> of a consumer subtask will never proceed.
> It's normally not a common case to have Kafka partitions not producing any 
> data, but it'll probably be good to handle this as well. I think we should 
> have a localized solution similar to FLINK-5017 for the per-partition 
> watermarks in {{AbstractFetcher}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-01-29 Thread yanxiaobin (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yanxiaobin updated FLINK-8500:
--
Attachment: image-2018-01-30-14-58-58-167.png

> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
> Attachments: image-2018-01-30-14-58-58-167.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-01-29 Thread yanxiaobin (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16344591#comment-16344591
 ] 

yanxiaobin commented on FLINK-8500:
---

As shown below(Kafka010Fetcher): !image-2018-01-30-14-58-58-167.png!

> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
> Attachments: image-2018-01-30-14-58-58-167.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-01-30 Thread yanxiaobin (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yanxiaobin updated FLINK-8500:
--
Attachment: image-2018-01-31-10-48-59-633.png

> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-01-30 Thread yanxiaobin (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16346180#comment-16346180
 ] 

yanxiaobin commented on FLINK-8500:
---

hi,[~aljoscha] , thank you for your reply!Please look at the next picture!

!image-2018-01-31-10-48-59-633.png!

The final eventtime is obtained from  “{color:#80}final long 
{color}newTimestamp = extractAscendingTimestamp(element);“  , and   the element 
was deserialized from "KeyedDeserializationSchema" . Also the  parameter 
"elementPrevTimestamp" is not used!  Thanks!

> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-01-30 Thread yanxiaobin (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16346180#comment-16346180
 ] 

yanxiaobin edited comment on FLINK-8500 at 1/31/18 3:01 AM:


hi,[~aljoscha] , thank you for your reply!Please look at the next picture!

!image-2018-01-31-10-48-59-633.png!

The final eventtime is obtained from  “{color:#80}final long 
{color}newTimestamp = extractAscendingTimestamp(element);“  , and   the element 
was deserialized from "KeyedDeserializationSchema" . Also the  parameter 
"elementPrevTimestamp" that is Kafka timestamp is not used!  Thanks!


was (Author: backlight):
hi,[~aljoscha] , thank you for your reply!Please look at the next picture!

!image-2018-01-31-10-48-59-633.png!

The final eventtime is obtained from  “{color:#80}final long 
{color}newTimestamp = extractAscendingTimestamp(element);“  , and   the element 
was deserialized from "KeyedDeserializationSchema" . Also the  parameter 
"elementPrevTimestamp" is not used!  Thanks!

> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-01-30 Thread yanxiaobin (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16346180#comment-16346180
 ] 

yanxiaobin edited comment on FLINK-8500 at 1/31/18 3:09 AM:


hi,[~aljoscha] , thank you for your reply!Please look at the next picture!

!image-2018-01-31-10-48-59-633.png!

The final eventtime is obtained from  “{color:#80}final long 
{color}newTimestamp = extractAscendingTimestamp(element);“  , and   the element 
was deserialized from "KeyedDeserializationSchema" . Also the  parameter 
"elementPrevTimestamp" that is Kafka timestamp is not used!  So I think that 
the method deserialize of KeyedDeserializationSchema  should add a parameter 
'kafka message timestamp' (from ConsumerRecord) .And in some business 
scenarios, this is useful! Thanks! 


was (Author: backlight):
hi,[~aljoscha] , thank you for your reply!Please look at the next picture!

!image-2018-01-31-10-48-59-633.png!

The final eventtime is obtained from  “{color:#80}final long 
{color}newTimestamp = extractAscendingTimestamp(element);“  , and   the element 
was deserialized from "KeyedDeserializationSchema" . Also the  parameter 
"elementPrevTimestamp" that is Kafka timestamp is not used!  Thanks!

> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-01-31 Thread yanxiaobin (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16346550#comment-16346550
 ] 

yanxiaobin commented on FLINK-8500:
---

hi, [~aljoscha] .  What do you think?

> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-01-31 Thread yanxiaobin (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16347951#comment-16347951
 ] 

yanxiaobin commented on FLINK-8500:
---

Greate!I think it's reasonable.But I think it's best to optimize it. The user 
interface is as unified as possible! thanks , [~aljoscha] .

> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Blocker
> Fix For: 1.5.0, 1.4.1
>
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-01-31 Thread yanxiaobin (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16347951#comment-16347951
 ] 

yanxiaobin edited comment on FLINK-8500 at 2/1/18 3:28 AM:
---

Greate!I think it's reasonable that the method deserialize of 
KeyedDeserializationSchema  needs a parameter 'kafka message timestamp' (from 
ConsumerRecord) .In some business scenarios, this is useful . And I think it's 
best to optimize it. The user interface is as unified as possible! thanks , 
[~aljoscha] .


was (Author: backlight):
Greate!I think it's reasonable.But I think it's best to optimize it. The user 
interface is as unified as possible! thanks , [~aljoscha] .

> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Blocker
> Fix For: 1.5.0, 1.4.1
>
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-5479) Per-partition watermarks in FlinkKafkaConsumer should consider idle partitions

2018-02-01 Thread yanxiaobin (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16343115#comment-16343115
 ] 

yanxiaobin edited comment on FLINK-5479 at 2/1/18 10:30 AM:


I've also met this problem at the moment. This can cause serious 
delays!{color:#00}Is there a better solution to the problem?{color}


was (Author: backlight):
I've also met this problem at the moment.

> Per-partition watermarks in FlinkKafkaConsumer should consider idle partitions
> --
>
> Key: FLINK-5479
> URL: https://issues.apache.org/jira/browse/FLINK-5479
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.5.0
>
>
> Reported in ML: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-topic-partition-skewness-causes-watermark-not-being-emitted-td11008.html
> Similar to what's happening to idle sources blocking watermark progression in 
> downstream operators (see FLINK-5017), the per-partition watermark mechanism 
> in {{FlinkKafkaConsumer}} is also being blocked of progressing watermarks 
> when a partition is idle. The watermark of idle partitions is always 
> {{Long.MIN_VALUE}}, therefore the overall min watermark across all partitions 
> of a consumer subtask will never proceed.
> It's normally not a common case to have Kafka partitions not producing any 
> data, but it'll probably be good to handle this as well. I think we should 
> have a localized solution similar to FLINK-5017 for the per-partition 
> watermarks in {{AbstractFetcher}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-02-25 Thread yanxiaobin (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16376428#comment-16376428
 ] 

yanxiaobin commented on FLINK-8500:
---

hi, [~aljoscha] ! I can solve that problem like that,but I still think the 
method deserialize of KeyedDeserializationSchema  needs a parameter 'kafka 
message timestamp' (from ConsumerRecord), It is possible that this timestamp 
will also be used in some business logic processing!What do you think?

> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Blocker
> Fix For: 1.6.0
>
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8794) When using BucketingSink, it happens that one of the files is always in the [.in-progress] state

2018-02-27 Thread yanxiaobin (JIRA)
yanxiaobin created FLINK-8794:
-

 Summary: When using BucketingSink, it happens that one of the 
files is always in the [.in-progress] state
 Key: FLINK-8794
 URL: https://issues.apache.org/jira/browse/FLINK-8794
 Project: Flink
  Issue Type: Bug
  Components: filesystem-connector
Affects Versions: 1.4.0, 1.4.1
Reporter: yanxiaobin


When using BucketingSink, it happens that one of the files is always in the 
[.in-progress] state.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8794) When using BucketingSink, it happens that one of the files is always in the [.in-progress] state

2018-02-27 Thread yanxiaobin (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8794?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yanxiaobin updated FLINK-8794:
--
Description: When using BucketingSink, it happens that one of the files is 
always in the [.in-progress] state. And this state has never changed after 
that.  The underlying use of S3 as storage.  (was: When using BucketingSink, it 
happens that one of the files is always in the [.in-progress] state.)

> When using BucketingSink, it happens that one of the files is always in the 
> [.in-progress] state
> 
>
> Key: FLINK-8794
> URL: https://issues.apache.org/jira/browse/FLINK-8794
> Project: Flink
>  Issue Type: Bug
>  Components: filesystem-connector
>Affects Versions: 1.4.0, 1.4.1
>Reporter: yanxiaobin
>Priority: Major
>
> When using BucketingSink, it happens that one of the files is always in the 
> [.in-progress] state. And this state has never changed after that.  The 
> underlying use of S3 as storage.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12992) All host(s) tried for query failed (tried com.datastax.driver.core.exceptions.TransportException: Error writing...)

2019-07-01 Thread yanxiaobin (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16876018#comment-16876018
 ] 

yanxiaobin commented on FLINK-12992:


At present, I have upgraded the netty version of the datastax Java driver to 
the netty version that flink depends on, and the problem is solved.

> All host(s) tried for query failed (tried 
> com.datastax.driver.core.exceptions.TransportException: Error writing...)
> ---
>
> Key: FLINK-12992
> URL: https://issues.apache.org/jira/browse/FLINK-12992
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Cassandra
>Affects Versions: 1.7.2, 1.8.0
> Environment: 
>  org.apache.flink
>  flink-connector-cassandra_2.11
>  1.8.0
> 
>Reporter: yanxiaobin
>Priority: Critical
>
> We are using flink streming application with cassandra connector providing 
> sinks that writes data into a [Apache 
> Cassandra|https://cassandra.apache.org/] database. 
> At first we found the following exceptions:All host(s) tried for query failed 
> (tried com.datastax.driver.core.exceptions.TransportException: Error 
> writing...). This exception will cause the streaming job to fail
>  
> And we have carefully checked that Cassandra service and network are all 
> normal. Finally, we refer to the source code of DataStax Java Driver that the 
> connector depends on. We found that the real exception caused the problem is 
> as follows:
> com.datastax.shaded.netty.handler.codec.EncoderException: 
> java.lang.IllegalAccessError: com/datastax/driver/core/Frame at 
> com.datastax.shaded.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:107)
>  at 
> com.datastax.shaded.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:643)
>  at 
> com.datastax.shaded.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:700)
>  at 
> com.datastax.shaded.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:636)
>  at 
> com.datastax.shaded.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:112)
>  at 
> com.datastax.shaded.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:643)
>  at 
> com.datastax.shaded.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:700)
>  at 
> com.datastax.shaded.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:636)
>  at 
> com.datastax.shaded.netty.handler.timeout.IdleStateHandler.write(IdleStateHandler.java:284)
>  at 
> com.datastax.shaded.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:643)
>  at 
> com.datastax.shaded.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:700)
>  at 
> com.datastax.shaded.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:636)
>  at 
> com.datastax.shaded.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:622)
>  at 
> com.datastax.shaded.netty.channel.DefaultChannelPipeline.write(DefaultChannelPipeline.java:939)
>  at 
> com.datastax.shaded.netty.channel.AbstractChannel.write(AbstractChannel.java:234)
>  at com.datastax.driver.core.Connection$Flusher.run(Connection.java:872) at 
> com.datastax.shaded.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:358)
>  at 
> com.datastax.shaded.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357) 
> at 
> com.datastax.shaded.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:112)
>  at java.lang.Thread.run(Thread.java:748) Caused by: 
> java.lang.IllegalAccessError: com/datastax/driver/core/Frame at 
> com.datastax.shaded.netty.util.internal.__matchers__.com.datastax.driver.core.FrameMatcher.match(NoOpTypeParameterMatcher.java)
>  at 
> com.datastax.shaded.netty.handler.codec.MessageToMessageEncoder.acceptOutboundMessage(MessageToMessageEncoder.java:77)
>  at 
> com.datastax.shaded.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:84)
>  
> Based on this exception, we found relevant information 
> [https://datastax-oss.atlassian.net/browse/JAVA-1337?page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel]
>   
>  
> Because I found that the latest version of flink-cassandra-connector uses the 
> datastax Java driver old version 3.0.0.Perhaps we should upgrade the version 
> on which the connector depends to Java driver 3.3.0+ to avoid this problem.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-16791) Could not deploy Yarn job cluster when using flink-s3-fs-hadoop-1.10.0.jar

2020-03-25 Thread yanxiaobin (Jira)
yanxiaobin created FLINK-16791:
--

 Summary:  Could not deploy Yarn job cluster when using 
flink-s3-fs-hadoop-1.10.0.jar
 Key: FLINK-16791
 URL: https://issues.apache.org/jira/browse/FLINK-16791
 Project: Flink
  Issue Type: Bug
  Components: Client / Job Submission
Affects Versions: 1.10.0
Reporter: yanxiaobin


Since aws s3 is needed, so copy the flink-s3-fs-hadoop-1.10.0.jar JAR file to 
the {{lib}} directory of Flink distribution. But when I submit a single flink 
job on yarn mode, I found the following problems:

Caused by: java.lang.ClassCastException: 
org.apache.hadoop.yarn.proto.YarnServiceProtos$GetClusterNodesRequestProto 
cannot be cast to com.google.protobuf.Message

 at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:225)

 at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:116)

 at com.sun.proxy.$Proxy11.getClusterNodes(Unknown Source)

 at 
org.apache.hadoop.yarn.api.impl.pb.client.ApplicationClientProtocolPBClientImpl.getClusterNodes(ApplicationClientProtocolPBClientImpl.java:303)

 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

 at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

 at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

 at java.lang.reflect.Method.invoke(Method.java:498)

 at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:422)

 at 
org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:165)

 at 
org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:157)

 at 
org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95)

 at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:359)

 at com.sun.proxy.$Proxy12.getClusterNodes(Unknown Source)

 at 
org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.getNodeReports(YarnClientImpl.java:564)

 at 
org.apache.flink.yarn.YarnClientYarnClusterInformationRetriever.getMaxVcores(YarnClientYarnClusterInformationRetriever.java:43)

 at 
org.apache.flink.yarn.YarnClusterDescriptor.isReadyForDeployment(YarnClusterDescriptor.java:278)

 at 
org.apache.flink.yarn.YarnClusterDescriptor.deployInternal(YarnClusterDescriptor.java:444)

 at 
org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:390)

 ... 24 more



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-16791) Could not deploy Yarn job cluster when using flink-s3-fs-hadoop-1.10.0.jar

2020-03-25 Thread yanxiaobin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16791?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yanxiaobin updated FLINK-16791:
---
Description: 
Since aws s3 is needed, so copy the flink-s3-fs-hadoop-1.10.0.jar JAR file to 
the {{lib}} directory of Flink distribution. But when I submit a single flink 
job on yarn mode, I found the following problems:

 

Caused by: java.lang.ClassCastException: 
org.apache.hadoop.yarn.proto.YarnServiceProtos$GetClusterNodesRequestProto 
cannot be cast to com.google.protobuf.Message

at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:225)

at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:116)

at com.sun.proxy.$Proxy11.getClusterNodes(Unknown Source)

at 
org.apache.hadoop.yarn.api.impl.pb.client.ApplicationClientProtocolPBClientImpl.getClusterNodes(ApplicationClientProtocolPBClientImpl.java:303)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:498)

at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:422)

at 
org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:165)

at 
org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:157)

at 
org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95)

at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:359)

at com.sun.proxy.$Proxy12.getClusterNodes(Unknown Source)

at 
org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.getNodeReports(YarnClientImpl.java:564)

at 
org.apache.flink.yarn.YarnClientYarnClusterInformationRetriever.getMaxVcores(YarnClientYarnClusterInformationRetriever.java:43)

at 
org.apache.flink.yarn.YarnClusterDescriptor.isReadyForDeployment(YarnClusterDescriptor.java:278)

at 
org.apache.flink.yarn.YarnClusterDescriptor.deployInternal(YarnClusterDescriptor.java:444)

at 
org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:390)

... 24 more

  was:
Since aws s3 is needed, so copy the flink-s3-fs-hadoop-1.10.0.jar JAR file to 
the {{lib}} directory of Flink distribution. But when I submit a single flink 
job on yarn mode, I found the following problems:

Caused by: java.lang.ClassCastException: 
org.apache.hadoop.yarn.proto.YarnServiceProtos$GetClusterNodesRequestProto 
cannot be cast to com.google.protobuf.Message

 at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:225)

 at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:116)

 at com.sun.proxy.$Proxy11.getClusterNodes(Unknown Source)

 at 
org.apache.hadoop.yarn.api.impl.pb.client.ApplicationClientProtocolPBClientImpl.getClusterNodes(ApplicationClientProtocolPBClientImpl.java:303)

 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

 at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

 at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

 at java.lang.reflect.Method.invoke(Method.java:498)

 at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:422)

 at 
org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:165)

 at 
org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:157)

 at 
org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95)

 at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:359)

 at com.sun.proxy.$Proxy12.getClusterNodes(Unknown Source)

 at 
org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.getNodeReports(YarnClientImpl.java:564)

 at 
org.apache.flink.yarn.YarnClientYarnClusterInformationRetriever.getMaxVcores(YarnClientYarnClusterInformationRetriever.java:43)

 at 
org.apache.flink.yarn.YarnClusterDescriptor.isReadyForDeployment(YarnClusterDescriptor.java:278)

 at 
org.apache.flink.yarn.YarnClusterDescriptor.deployInternal(YarnClusterDescriptor.java:444)

 at 
org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:390)

 ... 24 more


>  Could not deploy Yarn job cluster when using flink-s3-fs-hadoop-1.10.0.jar
> ---
>
> Key: FLINK-16791
> URL: https://issues.apache.org/jira/browse/FLINK-16791
> Project: Flink
>  Issue Type: Bug
>  Components: Client / Job Submission
>Affects Versions: 1.10.0
>Reporter: yanxiaobin
>Pri

[jira] [Commented] (FLINK-16791) Could not deploy Yarn job cluster when using flink-s3-fs-hadoop-1.10.0.jar

2020-03-25 Thread yanxiaobin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16791?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17067380#comment-17067380
 ] 

yanxiaobin commented on FLINK-16791:


I checked that the application does not contain any hadoop dependencies and 
there are no related hadoop dependency version conflicts. Because 
flink-s3-fs-hadoop-1.10.0.jar depends on the hadoop-commons related 
dependencies, the unshaded com.google.protobuf.Message is loaded. When I 
removed flink-s3-fs-hadoop-1.10.0.jar from the lib directory, the problem 
disappeared

>  Could not deploy Yarn job cluster when using flink-s3-fs-hadoop-1.10.0.jar
> ---
>
> Key: FLINK-16791
> URL: https://issues.apache.org/jira/browse/FLINK-16791
> Project: Flink
>  Issue Type: Bug
>  Components: Client / Job Submission
>Affects Versions: 1.10.0
>Reporter: yanxiaobin
>Priority: Major
>
> Since aws s3 is needed, so copy the flink-s3-fs-hadoop-1.10.0.jar JAR file to 
> the {{lib}} directory of Flink distribution. But when I submit a single flink 
> job on yarn mode, I found the following problems:
>  
> Caused by: java.lang.ClassCastException: 
> org.apache.hadoop.yarn.proto.YarnServiceProtos$GetClusterNodesRequestProto 
> cannot be cast to com.google.protobuf.Message
> at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:225)
> at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:116)
> at com.sun.proxy.$Proxy11.getClusterNodes(Unknown Source)
> at 
> org.apache.hadoop.yarn.api.impl.pb.client.ApplicationClientProtocolPBClientImpl.getClusterNodes(ApplicationClientProtocolPBClientImpl.java:303)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:422)
> at 
> org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:165)
> at 
> org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:157)
> at 
> org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95)
> at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:359)
> at com.sun.proxy.$Proxy12.getClusterNodes(Unknown Source)
> at 
> org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.getNodeReports(YarnClientImpl.java:564)
> at 
> org.apache.flink.yarn.YarnClientYarnClusterInformationRetriever.getMaxVcores(YarnClientYarnClusterInformationRetriever.java:43)
> at 
> org.apache.flink.yarn.YarnClusterDescriptor.isReadyForDeployment(YarnClusterDescriptor.java:278)
> at 
> org.apache.flink.yarn.YarnClusterDescriptor.deployInternal(YarnClusterDescriptor.java:444)
> at 
> org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:390)
> ... 24 more



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-16792) flink-s3-fs-hadoop cannot access s3

2020-03-25 Thread yanxiaobin (Jira)
yanxiaobin created FLINK-16792:
--

 Summary: flink-s3-fs-hadoop cannot access s3
 Key: FLINK-16792
 URL: https://issues.apache.org/jira/browse/FLINK-16792
 Project: Flink
  Issue Type: Bug
  Components: FileSystems
Affects Versions: 1.10.0
Reporter: yanxiaobin


When flink-s3-fs-hadoop-1.10.0.jar is placed in the lib directory and flink 
accesses aws s3, the following exception occurs:

 

Caused by: java.util.concurrent.ExecutionException: 
java.lang.NoSuchMethodError: 
org.apache.hadoop.util.SemaphoredDelegatingExecutor.(Lcom/google/common/util/concurrent/ListeningExecutorService;IZ)V
 at java.util.concurrent.FutureTask.report(FutureTask.java:122) at 
java.util.concurrent.FutureTask.get(FutureTask.java:192) at 
org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:461)
 at 
org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:53)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1143)
 ... 3 more Caused by: java.lang.NoSuchMethodError: 
org.apache.hadoop.util.SemaphoredDelegatingExecutor.(Lcom/google/common/util/concurrent/ListeningExecutorService;IZ)V
 at org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:769) at 
org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1169) at 
org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1149) at 
org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1038) at 
org.apache.flink.fs.s3hadoop.common.HadoopFileSystem.create(HadoopFileSystem.java:141)
 at 
org.apache.flink.fs.s3hadoop.common.HadoopFileSystem.create(HadoopFileSystem.java:37)
 at 
org.apache.flink.core.fs.SafetyNetWrapperFileSystem.create(SafetyNetWrapperFileSystem.java:126)
 at 
org.apache.flink.core.fs.EntropyInjector.createEntropyAware(EntropyInjector.java:61)
 at 
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:356)
 at 
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.flush(FsCheckpointStreamFactory.java:234)
 at 
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:309)
 at 
org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:179)
 at 
org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:108)
 at 
org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
 at java.util.concurrent.FutureTask.run(FutureTask.java:266) at 
org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:458)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-16792) flink-s3-fs-hadoop cannot access s3

2020-03-25 Thread yanxiaobin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16792?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17067389#comment-17067389
 ] 

yanxiaobin commented on FLINK-16792:


The issue is related to https://issues.apache.org/jira/browse/HADOOP-16080

> flink-s3-fs-hadoop cannot access s3
> ---
>
> Key: FLINK-16792
> URL: https://issues.apache.org/jira/browse/FLINK-16792
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystems
>Affects Versions: 1.10.0
>Reporter: yanxiaobin
>Priority: Major
>
> When flink-s3-fs-hadoop-1.10.0.jar is placed in the lib directory and flink 
> accesses aws s3, the following exception occurs:
>  
> Caused by: java.util.concurrent.ExecutionException: 
> java.lang.NoSuchMethodError: 
> org.apache.hadoop.util.SemaphoredDelegatingExecutor.(Lcom/google/common/util/concurrent/ListeningExecutorService;IZ)V
>  at java.util.concurrent.FutureTask.report(FutureTask.java:122) at 
> java.util.concurrent.FutureTask.get(FutureTask.java:192) at 
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:461)
>  at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:53)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1143)
>  ... 3 more Caused by: java.lang.NoSuchMethodError: 
> org.apache.hadoop.util.SemaphoredDelegatingExecutor.(Lcom/google/common/util/concurrent/ListeningExecutorService;IZ)V
>  at org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:769) at 
> org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1169) at 
> org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1149) at 
> org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1038) at 
> org.apache.flink.fs.s3hadoop.common.HadoopFileSystem.create(HadoopFileSystem.java:141)
>  at 
> org.apache.flink.fs.s3hadoop.common.HadoopFileSystem.create(HadoopFileSystem.java:37)
>  at 
> org.apache.flink.core.fs.SafetyNetWrapperFileSystem.create(SafetyNetWrapperFileSystem.java:126)
>  at 
> org.apache.flink.core.fs.EntropyInjector.createEntropyAware(EntropyInjector.java:61)
>  at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:356)
>  at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.flush(FsCheckpointStreamFactory.java:234)
>  at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:309)
>  at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:179)
>  at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:108)
>  at 
> org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266) at 
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:458)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-16792) flink-s3-fs-hadoop cannot access s3

2020-03-25 Thread yanxiaobin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16792?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yanxiaobin updated FLINK-16792:
---
Description: 
When flink-s3-fs-hadoop-1.10.0.jar is placed in the lib directory of Flink 
distribution and flink accesses aws s3, the following exception occurs:

 

Caused by: java.util.concurrent.ExecutionException: 
java.lang.NoSuchMethodError: 
org.apache.hadoop.util.SemaphoredDelegatingExecutor.(Lcom/google/common/util/concurrent/ListeningExecutorService;IZ)V
 at java.util.concurrent.FutureTask.report(FutureTask.java:122) at 
java.util.concurrent.FutureTask.get(FutureTask.java:192) at 
org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:461)
 at 
org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:53)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1143)
 ... 3 more Caused by: java.lang.NoSuchMethodError: 
org.apache.hadoop.util.SemaphoredDelegatingExecutor.(Lcom/google/common/util/concurrent/ListeningExecutorService;IZ)V
 at org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:769) at 
org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1169) at 
org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1149) at 
org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1038) at 
org.apache.flink.fs.s3hadoop.common.HadoopFileSystem.create(HadoopFileSystem.java:141)
 at 
org.apache.flink.fs.s3hadoop.common.HadoopFileSystem.create(HadoopFileSystem.java:37)
 at 
org.apache.flink.core.fs.SafetyNetWrapperFileSystem.create(SafetyNetWrapperFileSystem.java:126)
 at 
org.apache.flink.core.fs.EntropyInjector.createEntropyAware(EntropyInjector.java:61)
 at 
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:356)
 at 
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.flush(FsCheckpointStreamFactory.java:234)
 at 
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:309)
 at 
org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:179)
 at 
org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:108)
 at 
org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
 at java.util.concurrent.FutureTask.run(FutureTask.java:266) at 
org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:458)

  was:
When flink-s3-fs-hadoop-1.10.0.jar is placed in the lib directory and flink 
accesses aws s3, the following exception occurs:

 

Caused by: java.util.concurrent.ExecutionException: 
java.lang.NoSuchMethodError: 
org.apache.hadoop.util.SemaphoredDelegatingExecutor.(Lcom/google/common/util/concurrent/ListeningExecutorService;IZ)V
 at java.util.concurrent.FutureTask.report(FutureTask.java:122) at 
java.util.concurrent.FutureTask.get(FutureTask.java:192) at 
org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:461)
 at 
org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:53)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1143)
 ... 3 more Caused by: java.lang.NoSuchMethodError: 
org.apache.hadoop.util.SemaphoredDelegatingExecutor.(Lcom/google/common/util/concurrent/ListeningExecutorService;IZ)V
 at org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:769) at 
org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1169) at 
org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1149) at 
org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1038) at 
org.apache.flink.fs.s3hadoop.common.HadoopFileSystem.create(HadoopFileSystem.java:141)
 at 
org.apache.flink.fs.s3hadoop.common.HadoopFileSystem.create(HadoopFileSystem.java:37)
 at 
org.apache.flink.core.fs.SafetyNetWrapperFileSystem.create(SafetyNetWrapperFileSystem.java:126)
 at 
org.apache.flink.core.fs.EntropyInjector.createEntropyAware(EntropyInjector.java:61)
 at 
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:356)
 at 
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.flush(FsCheckpointStreamFactory.java:234)
 at 
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:309)
 at 
org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:179)

[jira] [Commented] (FLINK-16791) Could not deploy Yarn job cluster when using flink-s3-fs-hadoop-1.10.0.jar

2020-03-26 Thread yanxiaobin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16791?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17067576#comment-17067576
 ] 

yanxiaobin commented on FLINK-16791:


Hi Yang Wang, thanks. I have already put flink-shaded-hadoop into the lib 
directory and use {{export HADOOP_CLASSPATH=`hadoop classpath`.And I followed 
the documentation.}}

>  Could not deploy Yarn job cluster when using flink-s3-fs-hadoop-1.10.0.jar
> ---
>
> Key: FLINK-16791
> URL: https://issues.apache.org/jira/browse/FLINK-16791
> Project: Flink
>  Issue Type: Bug
>  Components: Client / Job Submission
>Affects Versions: 1.10.0
>Reporter: yanxiaobin
>Priority: Major
>
> Since aws s3 is needed, so copy the flink-s3-fs-hadoop-1.10.0.jar JAR file to 
> the {{lib}} directory of Flink distribution. But when I submit a single flink 
> job on yarn mode, I found the following problems:
>  
> Caused by: java.lang.ClassCastException: 
> org.apache.hadoop.yarn.proto.YarnServiceProtos$GetClusterNodesRequestProto 
> cannot be cast to com.google.protobuf.Message
> at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:225)
> at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:116)
> at com.sun.proxy.$Proxy11.getClusterNodes(Unknown Source)
> at 
> org.apache.hadoop.yarn.api.impl.pb.client.ApplicationClientProtocolPBClientImpl.getClusterNodes(ApplicationClientProtocolPBClientImpl.java:303)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:422)
> at 
> org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:165)
> at 
> org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:157)
> at 
> org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95)
> at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:359)
> at com.sun.proxy.$Proxy12.getClusterNodes(Unknown Source)
> at 
> org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.getNodeReports(YarnClientImpl.java:564)
> at 
> org.apache.flink.yarn.YarnClientYarnClusterInformationRetriever.getMaxVcores(YarnClientYarnClusterInformationRetriever.java:43)
> at 
> org.apache.flink.yarn.YarnClusterDescriptor.isReadyForDeployment(YarnClusterDescriptor.java:278)
> at 
> org.apache.flink.yarn.YarnClusterDescriptor.deployInternal(YarnClusterDescriptor.java:444)
> at 
> org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:390)
> ... 24 more



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-16791) Could not deploy Yarn job cluster when using flink-s3-fs-hadoop-1.10.0.jar

2020-03-26 Thread yanxiaobin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16791?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17067618#comment-17067618
 ] 

yanxiaobin commented on FLINK-16791:


It does't work for now. The unshaded com.google.protobuf.Message is still 
loaded.

>  Could not deploy Yarn job cluster when using flink-s3-fs-hadoop-1.10.0.jar
> ---
>
> Key: FLINK-16791
> URL: https://issues.apache.org/jira/browse/FLINK-16791
> Project: Flink
>  Issue Type: Bug
>  Components: Client / Job Submission
>Affects Versions: 1.10.0
>Reporter: yanxiaobin
>Priority: Major
>
> Since aws s3 is needed, so copy the flink-s3-fs-hadoop-1.10.0.jar JAR file to 
> the {{lib}} directory of Flink distribution. But when I submit a single flink 
> job on yarn mode, I found the following problems:
>  
> Caused by: java.lang.ClassCastException: 
> org.apache.hadoop.yarn.proto.YarnServiceProtos$GetClusterNodesRequestProto 
> cannot be cast to com.google.protobuf.Message
> at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:225)
> at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:116)
> at com.sun.proxy.$Proxy11.getClusterNodes(Unknown Source)
> at 
> org.apache.hadoop.yarn.api.impl.pb.client.ApplicationClientProtocolPBClientImpl.getClusterNodes(ApplicationClientProtocolPBClientImpl.java:303)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:422)
> at 
> org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:165)
> at 
> org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:157)
> at 
> org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95)
> at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:359)
> at com.sun.proxy.$Proxy12.getClusterNodes(Unknown Source)
> at 
> org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.getNodeReports(YarnClientImpl.java:564)
> at 
> org.apache.flink.yarn.YarnClientYarnClusterInformationRetriever.getMaxVcores(YarnClientYarnClusterInformationRetriever.java:43)
> at 
> org.apache.flink.yarn.YarnClusterDescriptor.isReadyForDeployment(YarnClusterDescriptor.java:278)
> at 
> org.apache.flink.yarn.YarnClusterDescriptor.deployInternal(YarnClusterDescriptor.java:444)
> at 
> org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:390)
> ... 24 more



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer

2019-01-23 Thread yanxiaobin (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16749861#comment-16749861
 ] 

yanxiaobin commented on FLINK-8500:
---

I have changed the title.

> Get the timestamp of the Kafka message from kafka consumer
> --
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer

2019-01-23 Thread yanxiaobin (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yanxiaobin updated FLINK-8500:
--
Summary: Get the timestamp of the Kafka message from kafka consumer  (was: 
Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher))

> Get the timestamp of the Kafka message from kafka consumer
> --
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)