[jira] [Commented] (FLINK-8046) ContinuousFileMonitoringFunction wrongly ignores files with exact same timestamp
[ https://issues.apache.org/jira/browse/FLINK-8046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16251364#comment-16251364 ] Juan Miguel Cejuela commented on FLINK-8046: Note: I can also a limit of max. num. of entries because in my application I always delete the files as soon as I am done processing them. > ContinuousFileMonitoringFunction wrongly ignores files with exact same > timestamp > > > Key: FLINK-8046 > URL: https://issues.apache.org/jira/browse/FLINK-8046 > Project: Flink > Issue Type: Bug > Components: Streaming >Affects Versions: 1.3.2 >Reporter: Juan Miguel Cejuela > Labels: stream > Fix For: 1.5.0 > > Original Estimate: 24h > Remaining Estimate: 24h > > The current monitoring of files sets the internal variable > `globalModificationTime` to filter out files that are "older". However, the > current test (to check "older") does > `boolean shouldIgnore = modificationTime <= globalModificationTime;` (rom > `shouldIgnore`) > The comparison should strictly be SMALLER (NOT smaller or equal). The method > documentation also states "This happens if the modification time of the file > is _smaller_ than...". > The equality acceptance for "older", makes some files with same exact > timestamp to be ignored. The behavior is also non-deterministic, as the first > file to be accepted ("first" being pretty much random) makes the rest of > files with same exact timestamp to be ignored. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8046) ContinuousFileMonitoringFunction wrongly ignores files with exact same timestamp
[ https://issues.apache.org/jira/browse/FLINK-8046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16251363#comment-16251363 ] Juan Miguel Cejuela commented on FLINK-8046: Hi [~kkl0u] ! I also copy here my answer to you through the mailing list. Let's keep the conversation here from now on ;) --- Hi Kostas, thank you very much for your answer. Yes, I proposed the change in https://github.com/apache/flink/pull/4997 to compare as modificationTime < globalModificationTime (without accepting equals). Later, however, I realized, as you correctly point out, that this creates duplicates. The original and now deprecated FileMonitoringFunction.java indeed kept a map of filenames to their timestamps. That works. However, this memory consumption is likely too much for my application, as I may process millions of files. What I’ve done so far is to create my own MyPatchedContinuousFileMonitoringFunction that has a similar map, however implemented with a LinkedHashMap to limit the size of the map to a desired max num of entries, as in: private volatile MapfilenamesAlreadySeen = new LinkedHashMap () { @Override protected boolean removeEldestEntry(Map.Entry eldest) { return size() > MAX_ENTRIES; } }; and then changed shouldIgnore to: private boolean shouldIgnore(Path filePath, long modificationTime) { assert (Thread.holdsLock(checkpointLock)); boolean alreadySeen = filenamesAlreadySeen.containsKey(filePath.getName()); boolean shouldIgnore = alreadySeen || modificationTime < globalModificationTime; filenamesAlreadySeen.put(filePath.getName(), true); if (shouldIgnore && LOG.isDebugEnabled()) { LOG.debug("Ignoring " + filePath + ", with mod time= " + modificationTime + " and global mod time= " + globalModificationTime); } return shouldIgnore; } This is a partial solution that works now for me. However, it’s still a hack and very particular solution. I think the real solution would be also to use the accessTime (not only the modificationTime). However, as I pointed out in the github pull request, as of flink 1.3.2, access time is always 0, at least on my machine and local file system (macOS). > ContinuousFileMonitoringFunction wrongly ignores files with exact same > timestamp > > > Key: FLINK-8046 > URL: https://issues.apache.org/jira/browse/FLINK-8046 > Project: Flink > Issue Type: Bug > Components: Streaming >Affects Versions: 1.3.2 >Reporter: Juan Miguel Cejuela > Labels: stream > Fix For: 1.5.0 > > Original Estimate: 24h > Remaining Estimate: 24h > > The current monitoring of files sets the internal variable > `globalModificationTime` to filter out files that are "older". However, the > current test (to check "older") does > `boolean shouldIgnore = modificationTime <= globalModificationTime;` (rom > `shouldIgnore`) > The comparison should strictly be SMALLER (NOT smaller or equal). The method > documentation also states "This happens if the modification time of the file > is _smaller_ than...". > The equality acceptance for "older", makes some files with same exact > timestamp to be ignored. The behavior is also non-deterministic, as the first > file to be accepted ("first" being pretty much random) makes the rest of > files with same exact timestamp to be ignored. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8046) ContinuousFileMonitoringFunction wrongly ignores files with exact same timestamp
[ https://issues.apache.org/jira/browse/FLINK-8046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16249322#comment-16249322 ] Kostas Kloudas commented on FLINK-8046: --- Hi [~jmcejuela]! Thanks a lot for reporting this and working on it. As I commented in the Mailing List thread you opened, I do not think that the solution is not to remove the {{=}} from the {{modificationTime <= globalModificationTime;}} in the {{ContinuousFileMonitoringFunction}}, as this would lead to duplicates. The solution, in my opinion is to keep a list of the filenames (or hashes) of the files processed for the last {{globalModTimestamp}} (and only for that timestamp) and when there are new with the same timestamp, then check if the name of the file they belong is in that list. This way you pay a bit of memory but you get what you want. What do you think? > ContinuousFileMonitoringFunction wrongly ignores files with exact same > timestamp > > > Key: FLINK-8046 > URL: https://issues.apache.org/jira/browse/FLINK-8046 > Project: Flink > Issue Type: Bug > Components: Streaming >Affects Versions: 1.3.2 >Reporter: Juan Miguel Cejuela > Labels: stream > Fix For: 1.5.0 > > Original Estimate: 24h > Remaining Estimate: 24h > > The current monitoring of files sets the internal variable > `globalModificationTime` to filter out files that are "older". However, the > current test (to check "older") does > `boolean shouldIgnore = modificationTime <= globalModificationTime;` (rom > `shouldIgnore`) > The comparison should strictly be SMALLER (NOT smaller or equal). The method > documentation also states "This happens if the modification time of the file > is _smaller_ than...". > The equality acceptance for "older", makes some files with same exact > timestamp to be ignored. The behavior is also non-deterministic, as the first > file to be accepted ("first" being pretty much random) makes the rest of > files with same exact timestamp to be ignored. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8046) ContinuousFileMonitoringFunction wrongly ignores files with exact same timestamp
[ https://issues.apache.org/jira/browse/FLINK-8046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16248924#comment-16248924 ] ASF GitHub Bot commented on FLINK-8046: --- Github user juanmirocks commented on the issue: https://github.com/apache/flink/pull/4997 Perhaps access time could be leveraged. However, as of Flink 1.3.2 `FileStatus#getAccessTime()` (at least for a local file system), always returns `0` ... > ContinuousFileMonitoringFunction wrongly ignores files with exact same > timestamp > > > Key: FLINK-8046 > URL: https://issues.apache.org/jira/browse/FLINK-8046 > Project: Flink > Issue Type: Bug > Components: Streaming >Affects Versions: 1.3.2 >Reporter: Juan Miguel Cejuela > Labels: stream > Fix For: 1.5.0 > > Original Estimate: 24h > Remaining Estimate: 24h > > The current monitoring of files sets the internal variable > `globalModificationTime` to filter out files that are "older". However, the > current test (to check "older") does > `boolean shouldIgnore = modificationTime <= globalModificationTime;` (rom > `shouldIgnore`) > The comparison should strictly be SMALLER (NOT smaller or equal). The method > documentation also states "This happens if the modification time of the file > is _smaller_ than...". > The equality acceptance for "older", makes some files with same exact > timestamp to be ignored. The behavior is also non-deterministic, as the first > file to be accepted ("first" being pretty much random) makes the rest of > files with same exact timestamp to be ignored. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8046) ContinuousFileMonitoringFunction wrongly ignores files with exact same timestamp
[ https://issues.apache.org/jira/browse/FLINK-8046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16248913#comment-16248913 ] ASF GitHub Bot commented on FLINK-8046: --- Github user juanmirocks commented on the issue: https://github.com/apache/flink/pull/4997 No. I don't think this is going to be a suitable solution, as if = is allowed in the comparison, the very same file will be triggered multiple times. Note that the older and deprecated `FileMonitoringFunction` solves this situation by having a map of filenames to modification times. More robust but also more expensive memory-wise. A limit to a possible map could be given in `LinkedHashMap` with `removeEldestEntry`. > ContinuousFileMonitoringFunction wrongly ignores files with exact same > timestamp > > > Key: FLINK-8046 > URL: https://issues.apache.org/jira/browse/FLINK-8046 > Project: Flink > Issue Type: Bug > Components: Streaming >Affects Versions: 1.3.2 >Reporter: Juan Miguel Cejuela > Labels: stream > Fix For: 1.5.0 > > Original Estimate: 24h > Remaining Estimate: 24h > > The current monitoring of files sets the internal variable > `globalModificationTime` to filter out files that are "older". However, the > current test (to check "older") does > `boolean shouldIgnore = modificationTime <= globalModificationTime;` (rom > `shouldIgnore`) > The comparison should strictly be SMALLER (NOT smaller or equal). The method > documentation also states "This happens if the modification time of the file > is _smaller_ than...". > The equality acceptance for "older", makes some files with same exact > timestamp to be ignored. The behavior is also non-deterministic, as the first > file to be accepted ("first" being pretty much random) makes the rest of > files with same exact timestamp to be ignored. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8046) ContinuousFileMonitoringFunction wrongly ignores files with exact same timestamp
[ https://issues.apache.org/jira/browse/FLINK-8046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16247846#comment-16247846 ] Juan Miguel Cejuela commented on FLINK-8046: Since we are at this, it is in my humble opinion also strange that, when computing the file splits as in `format.createInputSplits(readerParallelism)`, the given `readerParallelism` is used, but not the the format's `unstoppable` field or `.getNumSplits()` method. I don't know if this could be for another issue. > ContinuousFileMonitoringFunction wrongly ignores files with exact same > timestamp > > > Key: FLINK-8046 > URL: https://issues.apache.org/jira/browse/FLINK-8046 > Project: Flink > Issue Type: Bug > Components: Streaming >Affects Versions: 1.3.2 >Reporter: Juan Miguel Cejuela > Labels: stream > Fix For: 1.5.0 > > Original Estimate: 24h > Remaining Estimate: 24h > > The current monitoring of files sets the internal variable > `globalModificationTime` to filter out files that are "older". However, the > current test (to check "older") does > `boolean shouldIgnore = modificationTime <= globalModificationTime;` (rom > `shouldIgnore`) > The comparison should strictly be SMALLER (NOT smaller or equal). The method > documentation also states "This happens if the modification time of the file > is _smaller_ than...". > The equality acceptance for "older", makes some files with same exact > timestamp to be ignored. The behavior is also non-deterministic, as the first > file to be accepted ("first" being pretty much random) makes the rest of > files with same exact timestamp to be ignored. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8046) ContinuousFileMonitoringFunction wrongly ignores files with exact same timestamp
[ https://issues.apache.org/jira/browse/FLINK-8046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16247825#comment-16247825 ] ASF GitHub Bot commented on FLINK-8046: --- GitHub user juanmirocks opened a pull request: https://github.com/apache/flink/pull/4997 [FLINK-8046] [flink-streaming-java] Have filter of timestamp compare with strictly SMALLER (NOT smaller or equal) ## What is the purpose of the change This change fixes the wrong ignoring of files with same exact timestamp. This change also matches the doc header of the method (`shouldIgnore`): "...if the modification time of the file is smaller than...". Without this change, some files with same exact timestamp (because they were written at the same exact long time) will be ignored, which is unexpected by the user. Also you would find the funny log of `Ignoring file:/XXX, with mod time= 1510321363000 and global mod time= 1510321363000` ## Brief change log * Comparison is done with strictly SMALLER (<) ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? JavaDocs You can merge this pull request into a Git repository by running: $ git pull https://github.com/tagtog/flink master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4997.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4997 commit 2db52989fef2455413d42286893c5227983ee74b Author: Juan Miguel Cejuela Date: 2017-11-10T16:57:09Z compare as strictly SMALLER (not SMALLER OR EQUAL) (as per the doc header "if the modification time of the file is smaller than") Otherwise, some files with same exact timestamp (because they were written at the same exact long time) will be ignored. > ContinuousFileMonitoringFunction wrongly ignores files with exact same > timestamp > > > Key: FLINK-8046 > URL: https://issues.apache.org/jira/browse/FLINK-8046 > Project: Flink > Issue Type: Bug > Components: Streaming >Affects Versions: 1.3.2 >Reporter: Juan Miguel Cejuela > Labels: stream > Fix For: 1.5.0 > > Original Estimate: 24h > Remaining Estimate: 24h > > The current monitoring of files sets the internal variable > `globalModificationTime` to filter out files that are "older". However, the > current test (to check "older") does > `boolean shouldIgnore = modificationTime <= globalModificationTime;` (rom > `shouldIgnore`) > The comparison should strictly be SMALLER (NOT smaller or equal). The method > documentation also states "This happens if the modification time of the file > is _smaller_ than...". > The equality acceptance for "older", makes some files with same exact > timestamp to be ignored. The behavior is also non-deterministic, as the first > file to be accepted ("first" being pretty much random) makes the rest of > files with same exact timestamp to be ignored. -- This message was sent by Atlassian JIRA (v6.4.14#64029)