[jira] [Commented] (FLINK-7990) Strange behavior when configuring Logback for logging
[ https://issues.apache.org/jira/browse/FLINK-7990?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16857604#comment-16857604 ] Juan Miguel Cejuela commented on FLINK-7990: We are observing the same on Flink 1.6.2 > Strange behavior when configuring Logback for logging > - > > Key: FLINK-7990 > URL: https://issues.apache.org/jira/browse/FLINK-7990 > Project: Flink > Issue Type: Bug > Components: Runtime / Configuration >Affects Versions: 1.3.2 >Reporter: Fabian Hueske >Priority: Major > > The following issue was reported on the [user > mailinglist|https://lists.apache.org/thread.html/c06a9f0b1189bf21d946d3d9728631295c88bfc57043cdbe18409d52@%3Cuser.flink.apache.org%3E] > {quote} > I have a single node Flink instance which has the required jars for logback > in the lib folder (logback-classic.jar, logback-core.jar, > log4j-over-slf4j.jar). I have removed the jars for log4j from the lib folder > (log4j-1.2.17.jar, slf4j-log4j12-1.7.7.jar). 'logback.xml' is also correctly > updated in 'conf' folder. I have also included 'logback.xml' in the > classpath, although this does not seem to be considered while the job is run. > Flink refers to logback.xml inside the conf folder only. I have updated > pom.xml as per Flink's documentation in order to exclude log4j. I have some > log entries set inside a few map and flatmap functions and some log entries > outside those functions (eg: "program execution started"). > When I run the job, Flink writes only those logs that are coded outside the > transformations. Those logs that are coded inside the transformations (map, > flatmap etc) are not getting written to the log file. If this was happening > always, I could have assumed that the Task Manager is not writing the logs. > But Flink displays a strange behavior regarding this. Whenever I update the > logback jars inside the the lib folder(due to version changes), during the > next job run, all logs (even those inside map and flatmap) are written > correctly into the log file. But the logs don't get written in any of the > runs after that. This means that my 'logback.xml' file is correct and the > settings are also correct. But I don't understand why the same settings don't > work while the same job is run again. > {quote} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9940) File source continuous monitoring mode: S3 files sometimes missed
[ https://issues.apache.org/jira/browse/FLINK-9940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16648989#comment-16648989 ] Juan Miguel Cejuela commented on FLINK-9940: As far as I understand, this bug has nothing or little to do with S3, and rather only with files that have the very same timestamp. In that case, likely this issue and the following are duplicates: https://issues.apache.org/jira/browse/FLINK-8046 > File source continuous monitoring mode: S3 files sometimes missed > - > > Key: FLINK-9940 > URL: https://issues.apache.org/jira/browse/FLINK-9940 > Project: Flink > Issue Type: Bug > Components: Streaming >Affects Versions: 1.5.1 > Environment: Flink 1.5, EMRFS >Reporter: Huyen Levan >Assignee: Huyen Levan >Priority: Major > Labels: EMRFS, Flink, S3, pull-request-available > Fix For: 1.7.0 > > > When using StreamExecutionEnvironment.readFile() with > FileProcessingMode.PROCESS_CONTINUOUSLY mode to monitor an S3 prefix, if > there is a high amount of new/modified files at the same time, the directory > monitoring process might miss some files. The number of missing files depends > on the monitoring interval. > Cause: Flink tracks which files it has read by remembering the modification > time of the file that was added (or modified) last. So when there are > multiple files having a same last-modified timestamp. > Suggested solution (thanks to [[Fabian > Hueske|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=user_nodes=25]|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=user_nodes=25]): > a hybrid approach that keeps the names of all files that have a mod > timestamp that is larger than the max mod time minus an offset. > _org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction_ -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8046) ContinuousFileMonitoringFunction wrongly ignores files with exact same timestamp
[ https://issues.apache.org/jira/browse/FLINK-8046?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Juan Miguel Cejuela updated FLINK-8046: --- Remaining Estimate: (was: 24h) Original Estimate: (was: 24h) > ContinuousFileMonitoringFunction wrongly ignores files with exact same > timestamp > > > Key: FLINK-8046 > URL: https://issues.apache.org/jira/browse/FLINK-8046 > Project: Flink > Issue Type: Bug > Components: Streaming >Affects Versions: 1.3.2 >Reporter: Juan Miguel Cejuela >Priority: Major > Labels: stream > Fix For: 1.5.5 > > > The current monitoring of files sets the internal variable > `globalModificationTime` to filter out files that are "older". However, the > current test (to check "older") does > `boolean shouldIgnore = modificationTime <= globalModificationTime;` (rom > `shouldIgnore`) > The comparison should strictly be SMALLER (NOT smaller or equal). The method > documentation also states "This happens if the modification time of the file > is _smaller_ than...". > The equality acceptance for "older", makes some files with same exact > timestamp to be ignored. The behavior is also non-deterministic, as the first > file to be accepted ("first" being pretty much random) makes the rest of > files with same exact timestamp to be ignored. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7811) Add support for Scala 2.12
[ https://issues.apache.org/jira/browse/FLINK-7811?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16554689#comment-16554689 ] Juan Miguel Cejuela commented on FLINK-7811: Moved to 1.7.0? So unfortunate :-( By that point scala 2.13 will likely be out... > Add support for Scala 2.12 > -- > > Key: FLINK-7811 > URL: https://issues.apache.org/jira/browse/FLINK-7811 > Project: Flink > Issue Type: Sub-task > Components: Scala API >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek >Priority: Major > Fix For: 1.7.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7270) Add support for dynamic properties to cluster entry point
[ https://issues.apache.org/jira/browse/FLINK-7270?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16406016#comment-16406016 ] Juan Miguel Cejuela commented on FLINK-7270: Thanks [~till.rohrmann] – I'll keep experimenting, and give also 1.4.2 a try. > Add support for dynamic properties to cluster entry point > - > > Key: FLINK-7270 > URL: https://issues.apache.org/jira/browse/FLINK-7270 > Project: Flink > Issue Type: Improvement > Components: Cluster Management >Affects Versions: 1.3.1 >Reporter: Till Rohrmann >Assignee: Fang Yong >Priority: Minor > Labels: flip-6 > > We should respect dynamic properties when starting the {{ClusterEntrypoint}}. > This basically means extracting them from the passed command line arguments > and then adding the to the loaded {{Configuration}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7270) Add support for dynamic properties to cluster entry point
[ https://issues.apache.org/jira/browse/FLINK-7270?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16404832#comment-16404832 ] Juan Miguel Cejuela commented on FLINK-7270: Does this affect setting {{env.java.opts}}? (On Flink 1.4.0) Either with a local cluster, or with a yarn session on AWS EMR, I cannot get at all how to set dynamic properties. This may also be related: https://issues.apache.org/jira/browse/FLINK-6581 > Add support for dynamic properties to cluster entry point > - > > Key: FLINK-7270 > URL: https://issues.apache.org/jira/browse/FLINK-7270 > Project: Flink > Issue Type: Improvement > Components: Cluster Management >Affects Versions: 1.3.1 >Reporter: Till Rohrmann >Assignee: Fang Yong >Priority: Minor > Labels: flip-6 > > We should respect dynamic properties when starting the {{ClusterEntrypoint}}. > This basically means extracting them from the passed command line arguments > and then adding the to the loaded {{Configuration}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-8272) 1.4 & Scala: Exception when querying the state
[ https://issues.apache.org/jira/browse/FLINK-8272?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Juan Miguel Cejuela closed FLINK-8272. -- Resolution: Fixed Fix Version/s: 1.4.0 > 1.4 & Scala: Exception when querying the state > -- > > Key: FLINK-8272 > URL: https://issues.apache.org/jira/browse/FLINK-8272 > Project: Flink > Issue Type: Bug > Components: Queryable State >Affects Versions: 1.4.0 > Environment: mac >Reporter: Juan Miguel Cejuela > Fix For: 1.4.0 > > > With Flink 1.3.2 and basically the same code except for the changes in the > API that happened in 1.4.0, I could query the state finely. > With 1.4.0 (& scala), I get the exception written below. Most important line: > {{java.util.concurrent.CompletionException: > java.lang.IndexOutOfBoundsException: readerIndex(0) + length(4) exceeds > writerIndex(0): PooledUnsafeDirectByteBuf(ridx: 0, widx: 0, cap: 0)}} > Somehow it seems that the connection to the underlying buffer fails. Perhaps > most informing is that, if I give a wrong {{JobId}} (i.e. one that is not > running), I still get the same error. So I guess no connection succeeds. > Also, to avoid possible serialization errors, I am just querying with a key > of type {{String}} and, for testing, a dummy & trivial {{case class}}. > I create the necessary types such as: > {code} > implicit val typeInformationString = createTypeInformation[String] > implicit val typeInformationDummy = createTypeInformation[Dummy] > implicit val valueStateDescriptorDummy = > new ValueStateDescriptor("state", typeInformationDummy)}} > {code} > And then query the like such as: > {code} > val javaCompletableFuture = client > .getKvState(this.flinkJobIdObject, this.queryableState, key, > typeInformationString, valueStateDescriptorDummy) > {code} > Does anybody have any clue on this? Let me know what other information I > should provide. > --- > Full stack trace: > {code} > java.util.concurrent.CompletionException: > java.lang.IndexOutOfBoundsException: readerIndex(0) + length(4) exceeds > writerIndex(0): PooledUnsafeDirectByteBuf(ridx: 0, widx: 0, cap: 0) > at > java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) > at > java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) > at > java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593) > at > java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577) > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) > at > java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) > at > org.apache.flink.queryablestate.network.Client$PendingConnection.lambda$handInChannel$0(Client.java:289) > at > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) > at > java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) > at > java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) > at > org.apache.flink.queryablestate.network.Client$EstablishedConnection.close(Client.java:432) > at > org.apache.flink.queryablestate.network.Client$EstablishedConnection.onFailure(Client.java:505) > at > org.apache.flink.queryablestate.network.ClientHandler.channelRead(ClientHandler.java:92) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324) > at > org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324) > at > org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:242) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324) > at >
[jira] [Commented] (FLINK-8272) 1.4 & Scala: Exception when querying the state
[ https://issues.apache.org/jira/browse/FLINK-8272?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16294165#comment-16294165 ] Juan Miguel Cejuela commented on FLINK-8272: Correction: * I did see the log message: "Started Queryable State Proxy Server" (without the "the") * Inspecting that log, I realized I had wrongly configured the port for the queryable client (if I recall correctly, in 1.3.2 the port was the same as flink's JobManager, but now the port must be of the "Queryable State Proxy Server" and the host must be one of the TaskManager's) Bottom line: my bad, no issue! You can close this issue. Sorry! > 1.4 & Scala: Exception when querying the state > -- > > Key: FLINK-8272 > URL: https://issues.apache.org/jira/browse/FLINK-8272 > Project: Flink > Issue Type: Bug > Components: Queryable State >Affects Versions: 1.4.0 > Environment: mac >Reporter: Juan Miguel Cejuela > > With Flink 1.3.2 and basically the same code except for the changes in the > API that happened in 1.4.0, I could query the state finely. > With 1.4.0 (& scala), I get the exception written below. Most important line: > {{java.util.concurrent.CompletionException: > java.lang.IndexOutOfBoundsException: readerIndex(0) + length(4) exceeds > writerIndex(0): PooledUnsafeDirectByteBuf(ridx: 0, widx: 0, cap: 0)}} > Somehow it seems that the connection to the underlying buffer fails. Perhaps > most informing is that, if I give a wrong {{JobId}} (i.e. one that is not > running), I still get the same error. So I guess no connection succeeds. > Also, to avoid possible serialization errors, I am just querying with a key > of type {{String}} and, for testing, a dummy & trivial {{case class}}. > I create the necessary types such as: > {code} > implicit val typeInformationString = createTypeInformation[String] > implicit val typeInformationDummy = createTypeInformation[Dummy] > implicit val valueStateDescriptorDummy = > new ValueStateDescriptor("state", typeInformationDummy)}} > {code} > And then query the like such as: > {code} > val javaCompletableFuture = client > .getKvState(this.flinkJobIdObject, this.queryableState, key, > typeInformationString, valueStateDescriptorDummy) > {code} > Does anybody have any clue on this? Let me know what other information I > should provide. > --- > Full stack trace: > {code} > java.util.concurrent.CompletionException: > java.lang.IndexOutOfBoundsException: readerIndex(0) + length(4) exceeds > writerIndex(0): PooledUnsafeDirectByteBuf(ridx: 0, widx: 0, cap: 0) > at > java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) > at > java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) > at > java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593) > at > java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577) > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) > at > java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) > at > org.apache.flink.queryablestate.network.Client$PendingConnection.lambda$handInChannel$0(Client.java:289) > at > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) > at > java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) > at > java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) > at > org.apache.flink.queryablestate.network.Client$EstablishedConnection.close(Client.java:432) > at > org.apache.flink.queryablestate.network.Client$EstablishedConnection.onFailure(Client.java:505) > at > org.apache.flink.queryablestate.network.ClientHandler.channelRead(ClientHandler.java:92) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324) > at > org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324) > at >
[jira] [Commented] (FLINK-8272) 1.4 & Scala: Exception when querying the state
[ https://issues.apache.org/jira/browse/FLINK-8272?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16294155#comment-16294155 ] Juan Miguel Cejuela commented on FLINK-8272: More info: I never receive the message "Started the Queryable State Proxy Server" as described in: https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/state/queryable_state.html#activating-queryable-state > 1.4 & Scala: Exception when querying the state > -- > > Key: FLINK-8272 > URL: https://issues.apache.org/jira/browse/FLINK-8272 > Project: Flink > Issue Type: Bug > Components: Queryable State >Affects Versions: 1.4.0 > Environment: mac >Reporter: Juan Miguel Cejuela > > With Flink 1.3.2 and basically the same code except for the changes in the > API that happened in 1.4.0, I could query the state finely. > With 1.4.0 (& scala), I get the exception written below. Most important line: > {{java.util.concurrent.CompletionException: > java.lang.IndexOutOfBoundsException: readerIndex(0) + length(4) exceeds > writerIndex(0): PooledUnsafeDirectByteBuf(ridx: 0, widx: 0, cap: 0)}} > Somehow it seems that the connection to the underlying buffer fails. Perhaps > most informing is that, if I give a wrong {{JobId}} (i.e. one that is not > running), I still get the same error. So I guess no connection succeeds. > Also, to avoid possible serialization errors, I am just querying with a key > of type {{String}} and, for testing, a dummy & trivial {{case class}}. > I create the necessary types such as: > {code} > implicit val typeInformationString = createTypeInformation[String] > implicit val typeInformationDummy = createTypeInformation[Dummy] > implicit val valueStateDescriptorDummy = > new ValueStateDescriptor("state", typeInformationDummy)}} > {code} > And then query the like such as: > {code} > val javaCompletableFuture = client > .getKvState(this.flinkJobIdObject, this.queryableState, key, > typeInformationString, valueStateDescriptorDummy) > {code} > Does anybody have any clue on this? Let me know what other information I > should provide. > --- > Full stack trace: > {code} > java.util.concurrent.CompletionException: > java.lang.IndexOutOfBoundsException: readerIndex(0) + length(4) exceeds > writerIndex(0): PooledUnsafeDirectByteBuf(ridx: 0, widx: 0, cap: 0) > at > java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) > at > java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) > at > java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593) > at > java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577) > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) > at > java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) > at > org.apache.flink.queryablestate.network.Client$PendingConnection.lambda$handInChannel$0(Client.java:289) > at > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) > at > java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) > at > java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) > at > org.apache.flink.queryablestate.network.Client$EstablishedConnection.close(Client.java:432) > at > org.apache.flink.queryablestate.network.Client$EstablishedConnection.onFailure(Client.java:505) > at > org.apache.flink.queryablestate.network.ClientHandler.channelRead(ClientHandler.java:92) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324) > at > org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324) > at > org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:242) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339) > at >
[jira] [Updated] (FLINK-8272) 1.4 & Scala: Exception when querying the state
[ https://issues.apache.org/jira/browse/FLINK-8272?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Juan Miguel Cejuela updated FLINK-8272: --- Description: With Flink 1.3.2 and basically the same code except for the changes in the API that happened in 1.4.0, I could query the state finely. With 1.4.0 (& scala), I get the exception written below. Most important line: {{java.util.concurrent.CompletionException: java.lang.IndexOutOfBoundsException: readerIndex(0) + length(4) exceeds writerIndex(0): PooledUnsafeDirectByteBuf(ridx: 0, widx: 0, cap: 0)}} Somehow it seems that the connection to the underlying buffer fails. Perhaps most informing is that, if I give a wrong {{JobId}} (i.e. one that is not running), I still get the same error. So I guess no connection succeeds. Also, to avoid possible serialization errors, I am just querying with a key of type {{String}} and, for testing, a dummy & trivial {{case class}}. I create the necessary types such as: {code} implicit val typeInformationString = createTypeInformation[String] implicit val typeInformationDummy = createTypeInformation[Dummy] implicit val valueStateDescriptorDummy = new ValueStateDescriptor("state", typeInformationDummy)}} {code} And then query the like such as: {code} val javaCompletableFuture = client .getKvState(this.flinkJobIdObject, this.queryableState, key, typeInformationString, valueStateDescriptorDummy) {code} Does anybody have any clue on this? Let me know what other information I should provide. --- Full stack trace: {code} java.util.concurrent.CompletionException: java.lang.IndexOutOfBoundsException: readerIndex(0) + length(4) exceeds writerIndex(0): PooledUnsafeDirectByteBuf(ridx: 0, widx: 0, cap: 0) at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593) at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) at org.apache.flink.queryablestate.network.Client$PendingConnection.lambda$handInChannel$0(Client.java:289) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) at org.apache.flink.queryablestate.network.Client$EstablishedConnection.close(Client.java:432) at org.apache.flink.queryablestate.network.Client$EstablishedConnection.onFailure(Client.java:505) at org.apache.flink.queryablestate.network.ClientHandler.channelRead(ClientHandler.java:92) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324) at org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324) at org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:242) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324) at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847) at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
[jira] [Updated] (FLINK-8272) 1.4 & Scala: Exception when querying the state
[ https://issues.apache.org/jira/browse/FLINK-8272?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Juan Miguel Cejuela updated FLINK-8272: --- Description: With Flink 1.3.2 and basically the same code except for the changes in the API that happened in 1.4.0, I could query the state finely. With 1.4.0 (& scala), I get the exception written below. Most important line: {{java.util.concurrent.CompletionException: java.lang.IndexOutOfBoundsException: readerIndex(0) + length(4) exceeds writerIndex(0): PooledUnsafeDirectByteBuf(ridx: 0, widx: 0, cap: 0)}} Somehow it seems that the connection to the underlying buffer fails. Perhaps most informing is that, if I give a wrong {{JobId}} (i.e. one that is not running), I still get the same error. So I guess no connection succeeds. Also, to avoid possible serialization errors, I am just querying with a key of type {{String}} and, for testing, a dummy & trivial {{case class}}. I create the necessary types such as: {code:java} implicit val typeInformationString = createTypeInformation[String] implicit val typeInformationDummy = createTypeInformation[Dummy] implicit val valueStateDescriptorDummy = new ValueStateDescriptor("state", typeInformationDummy)}} {code} And then query the like such as: {code} val javaCompletableFuture = client .getKvState(this.flinkJobIdObject, this.queryableState, key, typeInformationString, valueStateDescriptorDummy) {code} Does anybody have any clue on this? Let me know what other information I should provide. --- Full stack trace: {code:scala} java.util.concurrent.CompletionException: java.lang.IndexOutOfBoundsException: readerIndex(0) + length(4) exceeds writerIndex(0): PooledUnsafeDirectByteBuf(ridx: 0, widx: 0, cap: 0) at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593) at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) at org.apache.flink.queryablestate.network.Client$PendingConnection.lambda$handInChannel$0(Client.java:289) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) at org.apache.flink.queryablestate.network.Client$EstablishedConnection.close(Client.java:432) at org.apache.flink.queryablestate.network.Client$EstablishedConnection.onFailure(Client.java:505) at org.apache.flink.queryablestate.network.ClientHandler.channelRead(ClientHandler.java:92) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324) at org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324) at org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:242) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324) at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847) at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) at
[jira] [Updated] (FLINK-8272) 1.4 & Scala: Exception when querying the state
[ https://issues.apache.org/jira/browse/FLINK-8272?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Juan Miguel Cejuela updated FLINK-8272: --- Description: With Flink 1.3.2 and basically the same code except for the changes in the API that happened in 1.4.0, I could query the state finely. With 1.4.0 (& scala), I get the exception written below. Most important line: {{java.util.concurrent.CompletionException: java.lang.IndexOutOfBoundsException: readerIndex(0) + length(4) exceeds writerIndex(0): PooledUnsafeDirectByteBuf(ridx: 0, widx: 0, cap: 0)}} Somehow it seems that the connection to the underlying buffer fails. Perhaps most informing is that, if I give a wrong {{JobId}} (i.e. one that is not running), I still get the same error. So I guess no connection succeeds. Also, to avoid possible serialization errors, I am just querying with a key of type {{String}} and, for testing, a dummy & trivial {{case class}}. I create the necessary types such as: {code:scala} implicit val typeInformationString = createTypeInformation[String] implicit val typeInformationDummy = createTypeInformation[Dummy] implicit val valueStateDescriptorDummy = new ValueStateDescriptor("state", typeInformationDummy)}} {code} And then query the like such as: {code} {{val javaCompletableFuture = client .getKvState(this.flinkJobIdObject, this.queryableState, key, typeInformationString, valueStateDescriptorDummy)}} {code} Does anybody have any clue on this? Let me know what other information I should provide. --- Full stack trace: {code:scala} java.util.concurrent.CompletionException: java.lang.IndexOutOfBoundsException: readerIndex(0) + length(4) exceeds writerIndex(0): PooledUnsafeDirectByteBuf(ridx: 0, widx: 0, cap: 0) at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593) at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) at org.apache.flink.queryablestate.network.Client$PendingConnection.lambda$handInChannel$0(Client.java:289) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) at org.apache.flink.queryablestate.network.Client$EstablishedConnection.close(Client.java:432) at org.apache.flink.queryablestate.network.Client$EstablishedConnection.onFailure(Client.java:505) at org.apache.flink.queryablestate.network.ClientHandler.channelRead(ClientHandler.java:92) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324) at org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324) at org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:242) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324) at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847) at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) at
[jira] [Updated] (FLINK-8272) 1.4 & Scala: Exception when querying the state
[ https://issues.apache.org/jira/browse/FLINK-8272?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Juan Miguel Cejuela updated FLINK-8272: --- Description: With Flink 1.3.2 and basically the same code except for the changes in the API that happened in 1.4.0, I could query the state finely. With 1.4.0 (& scala), I get the exception written below. Most important line: {{java.util.concurrent.CompletionException: java.lang.IndexOutOfBoundsException: readerIndex(0) + length(4) exceeds writerIndex(0): PooledUnsafeDirectByteBuf(ridx: 0, widx: 0, cap: 0)}} Somehow it seems that the connection to the underlying buffer fails. Perhaps most informing is that, if I give a wrong {{JobId}} (i.e. one that is not running), I still get the same error. So I guess no connection succeeds. Also, to avoid possible serialization errors, I am just querying with a key of type {{String}} and, for testing, a dummy & trivial {{case class}}. I create the necessary types such as: {code:scala} implicit val typeInformationString = createTypeInformation[String] implicit val typeInformationDummy = createTypeInformation[Dummy] implicit val valueStateDescriptorDummy = new ValueStateDescriptor("state", typeInformationDummy)}} {code} And then query the like such as: {code} val javaCompletableFuture = client .getKvState(this.flinkJobIdObject, this.queryableState, key, typeInformationString, valueStateDescriptorDummy) {code} Does anybody have any clue on this? Let me know what other information I should provide. --- Full stack trace: {code:scala} java.util.concurrent.CompletionException: java.lang.IndexOutOfBoundsException: readerIndex(0) + length(4) exceeds writerIndex(0): PooledUnsafeDirectByteBuf(ridx: 0, widx: 0, cap: 0) at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593) at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) at org.apache.flink.queryablestate.network.Client$PendingConnection.lambda$handInChannel$0(Client.java:289) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) at org.apache.flink.queryablestate.network.Client$EstablishedConnection.close(Client.java:432) at org.apache.flink.queryablestate.network.Client$EstablishedConnection.onFailure(Client.java:505) at org.apache.flink.queryablestate.network.ClientHandler.channelRead(ClientHandler.java:92) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324) at org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324) at org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:242) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324) at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847) at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) at
[jira] [Updated] (FLINK-8272) 1.4 & Scala: Exception when querying the state
[ https://issues.apache.org/jira/browse/FLINK-8272?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Juan Miguel Cejuela updated FLINK-8272: --- Description: With Flink 1.3.2 and basically the same code except for the changes in the API that happened in 1.4.0, I could query the state finely. With 1.4.0 (& scala), I get the exception written below. Most important line: {{java.util.concurrent.CompletionException: java.lang.IndexOutOfBoundsException: readerIndex(0) + length(4) exceeds writerIndex(0): PooledUnsafeDirectByteBuf(ridx: 0, widx: 0, cap: 0)}} Somehow it seems that the connection to the underlying buffer fails. Perhaps most informing is that, if I give a wrong {{JobId}} (i.e. one that is not running), I still get the same error. So I guess no connection succeeds. Also, to avoid possible serialization errors, I am just querying with a key of type {{String}} and, for testing, a dummy & trivial {{case class}}. I create the necessary types such as: {code:scala} implicit val typeInformationString = createTypeInformation[String] implicit val typeInformationDummy = createTypeInformation[Dummy] implicit val valueStateDescriptorDummy = new ValueStateDescriptor("state", typeInformationDummy)}} {code} And then query the like such as: {code:scala} val javaCompletableFuture = client .getKvState(this.flinkJobIdObject, this.queryableState, key, typeInformationString, valueStateDescriptorDummy) {code} Does anybody have any clue on this? Let me know what other information I should provide. --- Full stack trace: {code:scala} java.util.concurrent.CompletionException: java.lang.IndexOutOfBoundsException: readerIndex(0) + length(4) exceeds writerIndex(0): PooledUnsafeDirectByteBuf(ridx: 0, widx: 0, cap: 0) at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593) at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) at org.apache.flink.queryablestate.network.Client$PendingConnection.lambda$handInChannel$0(Client.java:289) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) at org.apache.flink.queryablestate.network.Client$EstablishedConnection.close(Client.java:432) at org.apache.flink.queryablestate.network.Client$EstablishedConnection.onFailure(Client.java:505) at org.apache.flink.queryablestate.network.ClientHandler.channelRead(ClientHandler.java:92) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324) at org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324) at org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:242) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324) at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847) at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) at
[jira] [Updated] (FLINK-8272) 1.4 & Scala: Exception when querying the state
[ https://issues.apache.org/jira/browse/FLINK-8272?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Juan Miguel Cejuela updated FLINK-8272: --- Description: With Flink 1.3.2 and basically the same code except for the changes in the API that happened in 1.4.0, I could query the state finely. With 1.4.0 (& scala), I get the exception written below. Most important line: {{java.util.concurrent.CompletionException: java.lang.IndexOutOfBoundsException: readerIndex(0) + length(4) exceeds writerIndex(0): PooledUnsafeDirectByteBuf(ridx: 0, widx: 0, cap: 0)}} Somehow it seems that the connection to the underlying buffer fails. Perhaps most informing is that, if I give a wrong {{JobId}} (i.e. one that is not running), I still get the same error. So I guess no connection succeeds. Also, to avoid possible serialization errors, I am just querying with a type {{String}} and, for testing, a dummy & trivial {{case class}}. I create the necessary types such as: {{implicit val typeInformationString = createTypeInformation[String] implicit val typeInformationDummy = createTypeInformation[Dummy] implicit val valueStateDescriptorDummy = new ValueStateDescriptor("state", typeInformationDummy)}} And then query the like such as: {{val javaCompletableFuture = client .getKvState(this.flinkJobIdObject, this.queryableState, key, typeInformationString, valueStateDescriptorDummy)}} Does anybody have any clue on this? Let me know what other information I should provide. --- Full stack trace: {code:scala} {{java.util.concurrent.CompletionException: java.lang.IndexOutOfBoundsException: readerIndex(0) + length(4) exceeds writerIndex(0): PooledUnsafeDirectByteBuf(ridx: 0, widx: 0, cap: 0) at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593) at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) at org.apache.flink.queryablestate.network.Client$PendingConnection.lambda$handInChannel$0(Client.java:289) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) at org.apache.flink.queryablestate.network.Client$EstablishedConnection.close(Client.java:432) at org.apache.flink.queryablestate.network.Client$EstablishedConnection.onFailure(Client.java:505) at org.apache.flink.queryablestate.network.ClientHandler.channelRead(ClientHandler.java:92) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324) at org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324) at org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:242) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324) at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847) at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) at
[jira] [Updated] (FLINK-8272) 1.4 & Scala: Exception when querying the state
[ https://issues.apache.org/jira/browse/FLINK-8272?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Juan Miguel Cejuela updated FLINK-8272: --- Description: With Flink 1.3.2 and basically the same code except for the changes in the API that happened in 1.4.0, I could query the state finely. With 1.4.0 (& scala), I get the exception written below. Most important line: {{java.util.concurrent.CompletionException: java.lang.IndexOutOfBoundsException: readerIndex(0) + length(4) exceeds writerIndex(0): PooledUnsafeDirectByteBuf(ridx: 0, widx: 0, cap: 0)}} Somehow it seems that the connection to the underlying buffer fails. Perhaps most informing is that, if I give a wrong {{JobId}} (i.e. one that is not running), I still get the same error. So I guess no connection succeeds. Also, to avoid possible serialization errors, I am just querying with a type {{String}} and, for testing, a dummy & trivial {{case class}}. I create the necessary types such as: {code:scala} implicit val typeInformationString = createTypeInformation[String] implicit val typeInformationDummy = createTypeInformation[Dummy] implicit val valueStateDescriptorDummy = new ValueStateDescriptor("state", typeInformationDummy)}} {code} And then query the like such as: {code} {{val javaCompletableFuture = client .getKvState(this.flinkJobIdObject, this.queryableState, key, typeInformationString, valueStateDescriptorDummy)}} {code} Does anybody have any clue on this? Let me know what other information I should provide. --- Full stack trace: {code:scala} java.util.concurrent.CompletionException: java.lang.IndexOutOfBoundsException: readerIndex(0) + length(4) exceeds writerIndex(0): PooledUnsafeDirectByteBuf(ridx: 0, widx: 0, cap: 0) at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593) at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) at org.apache.flink.queryablestate.network.Client$PendingConnection.lambda$handInChannel$0(Client.java:289) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) at org.apache.flink.queryablestate.network.Client$EstablishedConnection.close(Client.java:432) at org.apache.flink.queryablestate.network.Client$EstablishedConnection.onFailure(Client.java:505) at org.apache.flink.queryablestate.network.ClientHandler.channelRead(ClientHandler.java:92) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324) at org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324) at org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:242) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324) at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847) at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
[jira] [Updated] (FLINK-8272) 1.4 & Scala: Exception when querying the state
[ https://issues.apache.org/jira/browse/FLINK-8272?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Juan Miguel Cejuela updated FLINK-8272: --- Description: With Flink 1.3.2 and basically the same code except for the changes in the API that happened in 1.4.0, I could query the state finely. With 1.4.0 (& scala), I get the exception written below. Most important line: {{java.util.concurrent.CompletionException: java.lang.IndexOutOfBoundsException: readerIndex(0) + length(4) exceeds writerIndex(0): PooledUnsafeDirectByteBuf(ridx: 0, widx: 0, cap: 0)}} Somehow it seems that the connection to the underlying buffer fails. Perhaps most informing is that, if I give a wrong {{JobId}} (i.e. one that is not running), I still get the same error. So I guess no connection succeeds. Also, to avoid possible serialization errors, I am just querying with a type {{String}} and, for testing, a dummy & trivial {{case class}}. I create the necessary types such as: {{implicit val typeInformationString = createTypeInformation[String] implicit val typeInformationDummy = createTypeInformation[Dummy] implicit val valueStateDescriptorDummy = new ValueStateDescriptor("state", typeInformationDummy)}} And then query the like such as: {{val javaCompletableFuture = client .getKvState(this.flinkJobIdObject, this.queryableState, key, typeInformationString, valueStateDescriptorDummy)}} Does anybody have any clue on this? Let me know what other information I should provide. --- Full stack trace: {code:scala} java.util.concurrent.CompletionException: java.lang.IndexOutOfBoundsException: readerIndex(0) + length(4) exceeds writerIndex(0): PooledUnsafeDirectByteBuf(ridx: 0, widx: 0, cap: 0) at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593) at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) at org.apache.flink.queryablestate.network.Client$PendingConnection.lambda$handInChannel$0(Client.java:289) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) at org.apache.flink.queryablestate.network.Client$EstablishedConnection.close(Client.java:432) at org.apache.flink.queryablestate.network.Client$EstablishedConnection.onFailure(Client.java:505) at org.apache.flink.queryablestate.network.ClientHandler.channelRead(ClientHandler.java:92) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324) at org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324) at org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:242) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324) at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847) at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) at
[jira] [Created] (FLINK-8272) 1.4 & Scala: Exception when querying the state
Juan Miguel Cejuela created FLINK-8272: -- Summary: 1.4 & Scala: Exception when querying the state Key: FLINK-8272 URL: https://issues.apache.org/jira/browse/FLINK-8272 Project: Flink Issue Type: Bug Components: Queryable State Affects Versions: 1.4.0 Environment: mac Reporter: Juan Miguel Cejuela With Flink 1.3.2 and basically the same code except for the changes in the API that happened in 1.4.0, I could query the state finely. With 1.4.0 (& scala), I get the exception written below. Most important line: {{java.util.concurrent.CompletionException: java.lang.IndexOutOfBoundsException: readerIndex(0) + length(4) exceeds writerIndex(0): PooledUnsafeDirectByteBuf(ridx: 0, widx: 0, cap: 0)}} Somehow it seems that the connection to the underlying buffer fails. Perhaps most informing is that, if I give a wrong {{JobId}} (i.e. one that is not running), I still get the same error. So I guess no connection succeeds. Also, to avoid possible serialization errors, I am just querying with a type {{String}} and, for testing, a dummy & trivial {{case class}}. I create the necessary types such as: {{implicit val typeInformationString = createTypeInformation[String] implicit val typeInformationDummy = createTypeInformation[Dummy] implicit val valueStateDescriptorDummy = new ValueStateDescriptor("state", typeInformationDummy)}} And then query the like such as: {{val javaCompletableFuture = client .getKvState(this.flinkJobIdObject, this.queryableState, key, typeInformationString, valueStateDescriptorDummy)}} Does anybody have any clue on this? Let me know what other information I should provide. --- Full stack trace: {{java.util.concurrent.CompletionException: java.lang.IndexOutOfBoundsException: readerIndex(0) + length(4) exceeds writerIndex(0): PooledUnsafeDirectByteBuf(ridx: 0, widx: 0, cap: 0) at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593) at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) at org.apache.flink.queryablestate.network.Client$PendingConnection.lambda$handInChannel$0(Client.java:289) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) at org.apache.flink.queryablestate.network.Client$EstablishedConnection.close(Client.java:432) at org.apache.flink.queryablestate.network.Client$EstablishedConnection.onFailure(Client.java:505) at org.apache.flink.queryablestate.network.ClientHandler.channelRead(ClientHandler.java:92) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324) at org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324) at org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:242) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324) at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847) at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) at
[jira] [Commented] (FLINK-8046) ContinuousFileMonitoringFunction wrongly ignores files with exact same timestamp
[ https://issues.apache.org/jira/browse/FLINK-8046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16251364#comment-16251364 ] Juan Miguel Cejuela commented on FLINK-8046: Note: I can also a limit of max. num. of entries because in my application I always delete the files as soon as I am done processing them. > ContinuousFileMonitoringFunction wrongly ignores files with exact same > timestamp > > > Key: FLINK-8046 > URL: https://issues.apache.org/jira/browse/FLINK-8046 > Project: Flink > Issue Type: Bug > Components: Streaming >Affects Versions: 1.3.2 >Reporter: Juan Miguel Cejuela > Labels: stream > Fix For: 1.5.0 > > Original Estimate: 24h > Remaining Estimate: 24h > > The current monitoring of files sets the internal variable > `globalModificationTime` to filter out files that are "older". However, the > current test (to check "older") does > `boolean shouldIgnore = modificationTime <= globalModificationTime;` (rom > `shouldIgnore`) > The comparison should strictly be SMALLER (NOT smaller or equal). The method > documentation also states "This happens if the modification time of the file > is _smaller_ than...". > The equality acceptance for "older", makes some files with same exact > timestamp to be ignored. The behavior is also non-deterministic, as the first > file to be accepted ("first" being pretty much random) makes the rest of > files with same exact timestamp to be ignored. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8046) ContinuousFileMonitoringFunction wrongly ignores files with exact same timestamp
[ https://issues.apache.org/jira/browse/FLINK-8046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16251363#comment-16251363 ] Juan Miguel Cejuela commented on FLINK-8046: Hi [~kkl0u] ! I also copy here my answer to you through the mailing list. Let's keep the conversation here from now on ;) --- Hi Kostas, thank you very much for your answer. Yes, I proposed the change in https://github.com/apache/flink/pull/4997 to compare as modificationTime < globalModificationTime (without accepting equals). Later, however, I realized, as you correctly point out, that this creates duplicates. The original and now deprecated FileMonitoringFunction.java indeed kept a map of filenames to their timestamps. That works. However, this memory consumption is likely too much for my application, as I may process millions of files. What I’ve done so far is to create my own MyPatchedContinuousFileMonitoringFunction that has a similar map, however implemented with a LinkedHashMap to limit the size of the map to a desired max num of entries, as in: private volatile MapfilenamesAlreadySeen = new LinkedHashMap () { @Override protected boolean removeEldestEntry(Map.Entry eldest) { return size() > MAX_ENTRIES; } }; and then changed shouldIgnore to: private boolean shouldIgnore(Path filePath, long modificationTime) { assert (Thread.holdsLock(checkpointLock)); boolean alreadySeen = filenamesAlreadySeen.containsKey(filePath.getName()); boolean shouldIgnore = alreadySeen || modificationTime < globalModificationTime; filenamesAlreadySeen.put(filePath.getName(), true); if (shouldIgnore && LOG.isDebugEnabled()) { LOG.debug("Ignoring " + filePath + ", with mod time= " + modificationTime + " and global mod time= " + globalModificationTime); } return shouldIgnore; } This is a partial solution that works now for me. However, it’s still a hack and very particular solution. I think the real solution would be also to use the accessTime (not only the modificationTime). However, as I pointed out in the github pull request, as of flink 1.3.2, access time is always 0, at least on my machine and local file system (macOS). > ContinuousFileMonitoringFunction wrongly ignores files with exact same > timestamp > > > Key: FLINK-8046 > URL: https://issues.apache.org/jira/browse/FLINK-8046 > Project: Flink > Issue Type: Bug > Components: Streaming >Affects Versions: 1.3.2 >Reporter: Juan Miguel Cejuela > Labels: stream > Fix For: 1.5.0 > > Original Estimate: 24h > Remaining Estimate: 24h > > The current monitoring of files sets the internal variable > `globalModificationTime` to filter out files that are "older". However, the > current test (to check "older") does > `boolean shouldIgnore = modificationTime <= globalModificationTime;` (rom > `shouldIgnore`) > The comparison should strictly be SMALLER (NOT smaller or equal). The method > documentation also states "This happens if the modification time of the file > is _smaller_ than...". > The equality acceptance for "older", makes some files with same exact > timestamp to be ignored. The behavior is also non-deterministic, as the first > file to be accepted ("first" being pretty much random) makes the rest of > files with same exact timestamp to be ignored. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8046) ContinuousFileMonitoringFunction wrongly ignores files with exact same timestamp
[ https://issues.apache.org/jira/browse/FLINK-8046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16247846#comment-16247846 ] Juan Miguel Cejuela commented on FLINK-8046: Since we are at this, it is in my humble opinion also strange that, when computing the file splits as in `format.createInputSplits(readerParallelism)`, the given `readerParallelism` is used, but not the the format's `unstoppable` field or `.getNumSplits()` method. I don't know if this could be for another issue. > ContinuousFileMonitoringFunction wrongly ignores files with exact same > timestamp > > > Key: FLINK-8046 > URL: https://issues.apache.org/jira/browse/FLINK-8046 > Project: Flink > Issue Type: Bug > Components: Streaming >Affects Versions: 1.3.2 >Reporter: Juan Miguel Cejuela > Labels: stream > Fix For: 1.5.0 > > Original Estimate: 24h > Remaining Estimate: 24h > > The current monitoring of files sets the internal variable > `globalModificationTime` to filter out files that are "older". However, the > current test (to check "older") does > `boolean shouldIgnore = modificationTime <= globalModificationTime;` (rom > `shouldIgnore`) > The comparison should strictly be SMALLER (NOT smaller or equal). The method > documentation also states "This happens if the modification time of the file > is _smaller_ than...". > The equality acceptance for "older", makes some files with same exact > timestamp to be ignored. The behavior is also non-deterministic, as the first > file to be accepted ("first" being pretty much random) makes the rest of > files with same exact timestamp to be ignored. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-8046) ContinuousFileMonitoringFunction wrongly ignores files with exact same timestamp
Juan Miguel Cejuela created FLINK-8046: -- Summary: ContinuousFileMonitoringFunction wrongly ignores files with exact same timestamp Key: FLINK-8046 URL: https://issues.apache.org/jira/browse/FLINK-8046 Project: Flink Issue Type: Bug Components: Streaming Affects Versions: 1.3.2 Reporter: Juan Miguel Cejuela Fix For: 1.5.0 The current monitoring of files sets the internal variable `globalModificationTime` to filter out files that are "older". However, the current test (to check "older") does `boolean shouldIgnore = modificationTime <= globalModificationTime;` (rom `shouldIgnore`) The comparison should strictly be SMALLER (NOT smaller or equal). The method documentation also states "This happens if the modification time of the file is _smaller_ than...". The equality acceptance for "older", makes some files with same exact timestamp to be ignored. The behavior is also non-deterministic, as the first file to be accepted ("first" being pretty much random) makes the rest of files with same exact timestamp to be ignored. -- This message was sent by Atlassian JIRA (v6.4.14#64029)