[jira] [Commented] (FLINK-8046) ContinuousFileMonitoringFunction wrongly ignores files with exact same timestamp

2017-11-14 Thread Juan Miguel Cejuela (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16251364#comment-16251364
 ] 

Juan Miguel Cejuela commented on FLINK-8046:


Note: I can also a limit of max. num. of entries because in my application I 
always delete the files as soon as I am done processing them.

> ContinuousFileMonitoringFunction wrongly ignores files with exact same 
> timestamp
> 
>
> Key: FLINK-8046
> URL: https://issues.apache.org/jira/browse/FLINK-8046
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.3.2
>Reporter: Juan Miguel Cejuela
>  Labels: stream
> Fix For: 1.5.0
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> The current monitoring of files sets the internal variable 
> `globalModificationTime` to filter out files that are "older". However, the 
> current test (to check "older") does 
> `boolean shouldIgnore = modificationTime <= globalModificationTime;` (rom 
> `shouldIgnore`)
> The comparison should strictly be SMALLER (NOT smaller or equal). The method 
> documentation also states "This happens if the modification time of the file 
> is _smaller_ than...".
> The equality acceptance for "older", makes some files with same exact 
> timestamp to be ignored. The behavior is also non-deterministic, as the first 
> file to be accepted ("first" being pretty much random) makes the rest of 
> files with same exact timestamp to be ignored.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8046) ContinuousFileMonitoringFunction wrongly ignores files with exact same timestamp

2017-11-14 Thread Juan Miguel Cejuela (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16251363#comment-16251363
 ] 

Juan Miguel Cejuela commented on FLINK-8046:


Hi [~kkl0u] !

I also copy here my answer to you through the mailing list. Let's keep the 
conversation here from now on ;)

---

Hi Kostas,

thank you very much for your answer.

Yes, I proposed the change in https://github.com/apache/flink/pull/4997 to 
compare as modificationTime < globalModificationTime (without accepting 
equals). Later, however, I realized, as you correctly point out, that this 
creates duplicates.

The original and now deprecated FileMonitoringFunction.java indeed kept a map 
of filenames to their timestamps.

That works. However, this memory consumption is likely too much for my 
application, as I may process millions of files.

What I’ve done so far is to create my own 
MyPatchedContinuousFileMonitoringFunction that has a similar map, however 
implemented with a LinkedHashMap to limit the size of the map to a desired max 
num of entries, as in:

private volatile Map filenamesAlreadySeen = new 
LinkedHashMap() {

@Override
protected boolean removeEldestEntry(Map.Entry eldest) {
return size() > MAX_ENTRIES;
}
};

and then changed shouldIgnore to:

private boolean shouldIgnore(Path filePath, long modificationTime) {
assert (Thread.holdsLock(checkpointLock));
boolean alreadySeen = 
filenamesAlreadySeen.containsKey(filePath.getName());
boolean shouldIgnore = alreadySeen || modificationTime < 
globalModificationTime;
filenamesAlreadySeen.put(filePath.getName(), true);

if (shouldIgnore && LOG.isDebugEnabled()) {
LOG.debug("Ignoring " + filePath + ", with mod time= " + 
modificationTime +
" and global mod time= " + globalModificationTime);
}
return shouldIgnore;
}

This is a partial solution that works now for me. However, it’s still a hack 
and very particular solution.

I think the real solution would be also to use the accessTime (not only the 
modificationTime). However, as I pointed out in the github pull request, as of 
flink 1.3.2, access time is always 0, at least on my machine and local file 
system (macOS).

> ContinuousFileMonitoringFunction wrongly ignores files with exact same 
> timestamp
> 
>
> Key: FLINK-8046
> URL: https://issues.apache.org/jira/browse/FLINK-8046
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.3.2
>Reporter: Juan Miguel Cejuela
>  Labels: stream
> Fix For: 1.5.0
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> The current monitoring of files sets the internal variable 
> `globalModificationTime` to filter out files that are "older". However, the 
> current test (to check "older") does 
> `boolean shouldIgnore = modificationTime <= globalModificationTime;` (rom 
> `shouldIgnore`)
> The comparison should strictly be SMALLER (NOT smaller or equal). The method 
> documentation also states "This happens if the modification time of the file 
> is _smaller_ than...".
> The equality acceptance for "older", makes some files with same exact 
> timestamp to be ignored. The behavior is also non-deterministic, as the first 
> file to be accepted ("first" being pretty much random) makes the rest of 
> files with same exact timestamp to be ignored.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8046) ContinuousFileMonitoringFunction wrongly ignores files with exact same timestamp

2017-11-13 Thread Kostas Kloudas (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16249322#comment-16249322
 ] 

Kostas Kloudas commented on FLINK-8046:
---

Hi [~jmcejuela]! Thanks a lot for reporting this and working on it. 

As I commented in the Mailing List thread you opened, I do not think that the 
solution is not to remove 
the {{=}} from the {{modificationTime <= globalModificationTime;}} in the 
{{ContinuousFileMonitoringFunction}}, as this 
would lead to duplicates. 

The solution, in my opinion is to keep a list of the filenames (or hashes) of 
the files processed for the last {{globalModTimestamp}} (and only for that 
timestamp) and when there are new with the same timestamp, then check if the 
name of the file they belong is in that list. 

This way you pay a bit of memory but you get what you want.

What do you think?

> ContinuousFileMonitoringFunction wrongly ignores files with exact same 
> timestamp
> 
>
> Key: FLINK-8046
> URL: https://issues.apache.org/jira/browse/FLINK-8046
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.3.2
>Reporter: Juan Miguel Cejuela
>  Labels: stream
> Fix For: 1.5.0
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> The current monitoring of files sets the internal variable 
> `globalModificationTime` to filter out files that are "older". However, the 
> current test (to check "older") does 
> `boolean shouldIgnore = modificationTime <= globalModificationTime;` (rom 
> `shouldIgnore`)
> The comparison should strictly be SMALLER (NOT smaller or equal). The method 
> documentation also states "This happens if the modification time of the file 
> is _smaller_ than...".
> The equality acceptance for "older", makes some files with same exact 
> timestamp to be ignored. The behavior is also non-deterministic, as the first 
> file to be accepted ("first" being pretty much random) makes the rest of 
> files with same exact timestamp to be ignored.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8046) ContinuousFileMonitoringFunction wrongly ignores files with exact same timestamp

2017-11-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16248924#comment-16248924
 ] 

ASF GitHub Bot commented on FLINK-8046:
---

Github user juanmirocks commented on the issue:

https://github.com/apache/flink/pull/4997
  
Perhaps access time could be leveraged. However, as of Flink 1.3.2 
`FileStatus#getAccessTime()` (at least for a local file system), always returns 
`0` ... 


> ContinuousFileMonitoringFunction wrongly ignores files with exact same 
> timestamp
> 
>
> Key: FLINK-8046
> URL: https://issues.apache.org/jira/browse/FLINK-8046
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.3.2
>Reporter: Juan Miguel Cejuela
>  Labels: stream
> Fix For: 1.5.0
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> The current monitoring of files sets the internal variable 
> `globalModificationTime` to filter out files that are "older". However, the 
> current test (to check "older") does 
> `boolean shouldIgnore = modificationTime <= globalModificationTime;` (rom 
> `shouldIgnore`)
> The comparison should strictly be SMALLER (NOT smaller or equal). The method 
> documentation also states "This happens if the modification time of the file 
> is _smaller_ than...".
> The equality acceptance for "older", makes some files with same exact 
> timestamp to be ignored. The behavior is also non-deterministic, as the first 
> file to be accepted ("first" being pretty much random) makes the rest of 
> files with same exact timestamp to be ignored.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8046) ContinuousFileMonitoringFunction wrongly ignores files with exact same timestamp

2017-11-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16248913#comment-16248913
 ] 

ASF GitHub Bot commented on FLINK-8046:
---

Github user juanmirocks commented on the issue:

https://github.com/apache/flink/pull/4997
  
No. I don't think this is going to be a suitable solution, as if = is 
allowed in the comparison, the very same file will be triggered multiple times.

Note that the older and deprecated `FileMonitoringFunction` solves this 
situation by having a map of filenames to modification times. More robust but 
also more expensive memory-wise. A limit to a possible map could be given in 
`LinkedHashMap` with `removeEldestEntry`.


> ContinuousFileMonitoringFunction wrongly ignores files with exact same 
> timestamp
> 
>
> Key: FLINK-8046
> URL: https://issues.apache.org/jira/browse/FLINK-8046
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.3.2
>Reporter: Juan Miguel Cejuela
>  Labels: stream
> Fix For: 1.5.0
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> The current monitoring of files sets the internal variable 
> `globalModificationTime` to filter out files that are "older". However, the 
> current test (to check "older") does 
> `boolean shouldIgnore = modificationTime <= globalModificationTime;` (rom 
> `shouldIgnore`)
> The comparison should strictly be SMALLER (NOT smaller or equal). The method 
> documentation also states "This happens if the modification time of the file 
> is _smaller_ than...".
> The equality acceptance for "older", makes some files with same exact 
> timestamp to be ignored. The behavior is also non-deterministic, as the first 
> file to be accepted ("first" being pretty much random) makes the rest of 
> files with same exact timestamp to be ignored.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8046) ContinuousFileMonitoringFunction wrongly ignores files with exact same timestamp

2017-11-10 Thread Juan Miguel Cejuela (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16247846#comment-16247846
 ] 

Juan Miguel Cejuela commented on FLINK-8046:


Since we are at this, it is in my humble opinion also strange that, when 
computing the file splits as in `format.createInputSplits(readerParallelism)`, 
the given `readerParallelism` is used, but not the the format's `unstoppable` 
field or `.getNumSplits()` method.

I don't know if this could be for another issue.

> ContinuousFileMonitoringFunction wrongly ignores files with exact same 
> timestamp
> 
>
> Key: FLINK-8046
> URL: https://issues.apache.org/jira/browse/FLINK-8046
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.3.2
>Reporter: Juan Miguel Cejuela
>  Labels: stream
> Fix For: 1.5.0
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> The current monitoring of files sets the internal variable 
> `globalModificationTime` to filter out files that are "older". However, the 
> current test (to check "older") does 
> `boolean shouldIgnore = modificationTime <= globalModificationTime;` (rom 
> `shouldIgnore`)
> The comparison should strictly be SMALLER (NOT smaller or equal). The method 
> documentation also states "This happens if the modification time of the file 
> is _smaller_ than...".
> The equality acceptance for "older", makes some files with same exact 
> timestamp to be ignored. The behavior is also non-deterministic, as the first 
> file to be accepted ("first" being pretty much random) makes the rest of 
> files with same exact timestamp to be ignored.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8046) ContinuousFileMonitoringFunction wrongly ignores files with exact same timestamp

2017-11-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16247825#comment-16247825
 ] 

ASF GitHub Bot commented on FLINK-8046:
---

GitHub user juanmirocks opened a pull request:

https://github.com/apache/flink/pull/4997

[FLINK-8046] [flink-streaming-java] Have filter of timestamp compare with 
strictly SMALLER (NOT smaller or equal)

## What is the purpose of the change

This change fixes the wrong ignoring of files with same exact timestamp. 
This change also matches the doc header of the method (`shouldIgnore`): "...if 
the modification time of the file is smaller than...".

Without this change, some files with same exact timestamp (because they 
were written at the same exact long time) will be ignored, which is unexpected 
by the user.

Also you would find the funny log of `Ignoring file:/XXX, with mod time= 
1510321363000 and global mod time= 1510321363000`

## Brief change log

* Comparison is done with strictly SMALLER (<)

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): no
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
  - The serializers: no
  - The runtime per-record code paths (performance sensitive): no
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
  - The S3 file system connector: no

## Documentation

  - Does this pull request introduce a new feature? no
  - If yes, how is the feature documented? JavaDocs

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tagtog/flink master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4997.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4997


commit 2db52989fef2455413d42286893c5227983ee74b
Author: Juan Miguel Cejuela 
Date:   2017-11-10T16:57:09Z

compare as strictly SMALLER (not SMALLER OR EQUAL) (as per the doc header 
"if the modification time of the file is smaller than")

Otherwise, some files with same exact timestamp (because they were written 
at the same exact long time) will be ignored.




> ContinuousFileMonitoringFunction wrongly ignores files with exact same 
> timestamp
> 
>
> Key: FLINK-8046
> URL: https://issues.apache.org/jira/browse/FLINK-8046
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.3.2
>Reporter: Juan Miguel Cejuela
>  Labels: stream
> Fix For: 1.5.0
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> The current monitoring of files sets the internal variable 
> `globalModificationTime` to filter out files that are "older". However, the 
> current test (to check "older") does 
> `boolean shouldIgnore = modificationTime <= globalModificationTime;` (rom 
> `shouldIgnore`)
> The comparison should strictly be SMALLER (NOT smaller or equal). The method 
> documentation also states "This happens if the modification time of the file 
> is _smaller_ than...".
> The equality acceptance for "older", makes some files with same exact 
> timestamp to be ignored. The behavior is also non-deterministic, as the first 
> file to be accepted ("first" being pretty much random) makes the rest of 
> files with same exact timestamp to be ignored.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)