[jira] [Comment Edited] (APEXCORE-796) Docker based deployment
[ https://issues.apache.org/jira/browse/APEXCORE-796?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16553655#comment-16553655 ] Chandni Singh edited comment on APEXCORE-796 at 7/24/18 1:41 AM: - {quote}I thought the goal is to not have to depend on YARN in the first place. {quote} Don't see that mention anywhere in this Jira. APEX is very tightly coupled to YARN. It is a YARN application that uses the YARN abstractions. Decoupling from YARN will be a substantial effort for APEX. The description says {quote}This will be particularly helpful when applications depend on native, non-JVM packages like Python and R, that otherwise need to be installed separately. {quote} YARN supports running docker containers, which is via YARN-3611, it will be helpful for running dockerized APEX operators. This, in my opinion, is easier and faster to accomplish. was (Author: csingh): {quote}I thought the goal is to not have to depend on YARN in the first place. {quote} Don't see that mention anywhere in this Jira. APEX is very tightly coupled to YARN. It is a YARN client that uses the YARN abstractions. Decoupling from YARN will be a substantial effort for APEX. The description says {quote}This will be particularly helpful when applications depend on native, non-JVM packages like Python and R, that otherwise need to be installed separately. {quote} YARN supports running docker containers, which is via YARN-3611, it will be helpful for running dockerized APEX operators. This, in my opinion, is easier and faster to accomplish. > Docker based deployment > --- > > Key: APEXCORE-796 > URL: https://issues.apache.org/jira/browse/APEXCORE-796 > Project: Apache Apex Core > Issue Type: New Feature >Reporter: Thomas Weise >Priority: Major > Labels: roadmap > > Apex should support deployment using Docker as alternative to application > packages. Docker images provide a simple and standard way to package > dependencies and solve isolation from the host environment. This will be > particularly helpful when applications depend on native, non-JVM packages > like Python and R, that otherwise need to be installed separately. Docker > support will also be a step towards supporting other cluster managers like > Kubernetes, Mesos and Swarm. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (APEXCORE-796) Docker based deployment
[ https://issues.apache.org/jira/browse/APEXCORE-796?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16553655#comment-16553655 ] Chandni Singh commented on APEXCORE-796: {quote}I thought the goal is to not have to depend on YARN in the first place. {quote} Don't see that mention anywhere in this Jira. APEX is very tightly coupled to YARN. It is a YARN client that uses the YARN abstractions. Decoupling from YARN will be a substantial effort for APEX. The description says {quote}This will be particularly helpful when applications depend on native, non-JVM packages like Python and R, that otherwise need to be installed separately. {quote} YARN supports running docker containers, which is via YARN-3611, it will be helpful for running dockerized APEX operators. This, in my opinion, is easier and faster to accomplish. > Docker based deployment > --- > > Key: APEXCORE-796 > URL: https://issues.apache.org/jira/browse/APEXCORE-796 > Project: Apache Apex Core > Issue Type: New Feature >Reporter: Thomas Weise >Priority: Major > Labels: roadmap > > Apex should support deployment using Docker as alternative to application > packages. Docker images provide a simple and standard way to package > dependencies and solve isolation from the host environment. This will be > particularly helpful when applications depend on native, non-JVM packages > like Python and R, that otherwise need to be installed separately. Docker > support will also be a step towards supporting other cluster managers like > Kubernetes, Mesos and Swarm. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (APEXCORE-796) Docker based deployment
[ https://issues.apache.org/jira/browse/APEXCORE-796?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16553507#comment-16553507 ] Chandni Singh commented on APEXCORE-796: YARN supports running docker containers : YARN-3611 > Docker based deployment > --- > > Key: APEXCORE-796 > URL: https://issues.apache.org/jira/browse/APEXCORE-796 > Project: Apache Apex Core > Issue Type: New Feature >Reporter: Thomas Weise >Priority: Major > Labels: roadmap > > Apex should support deployment using Docker as alternative to application > packages. Docker images provide a simple and standard way to package > dependencies and solve isolation from the host environment. This will be > particularly helpful when applications depend on native, non-JVM packages > like Python and R, that otherwise need to be installed separately. Docker > support will also be a step towards supporting other cluster managers like > Kubernetes, Mesos and Swarm. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (APEXMALHAR-2252) Document AbstractManagedStateImpl subclasses
[ https://issues.apache.org/jira/browse/APEXMALHAR-2252?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chandni Singh reassigned APEXMALHAR-2252: - Assignee: (was: Chandni Singh) > Document AbstractManagedStateImpl subclasses > > > Key: APEXMALHAR-2252 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2252 > Project: Apache Apex Malhar > Issue Type: Task >Reporter: Thomas Weise > > For the user the time bucketing concept is very important to understand. We > should highlight the difference in the documentation and also consider > renaming the subclasses for clarify. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (APEXMALHAR-2321) Improve Buckets memory management
[ https://issues.apache.org/jira/browse/APEXMALHAR-2321?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15626117#comment-15626117 ] Chandni Singh commented on APEXMALHAR-2321: --- >>AbstractManagedStateImpl.maxMemorySize probably will be misunderstand as the >>max memory size of total managed state, but in fact it was used as memory >>size of each bucket. Better to rename it. This is incorrect. This setting is not used per bucket. Please look at lines 94-98 in StateTracker. Bytes are summed over all buckets and then compared to maxMemorySize. > Improve Buckets memory management > - > > Key: APEXMALHAR-2321 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2321 > Project: Apache Apex Malhar > Issue Type: Improvement >Reporter: bright chen >Assignee: bright chen > > Currently buckets were managed as an array. Each bucket have memory > limitation, and free memory will be triggered if the bucket memory usage over > the limitation. > - For ManagedTimeUnifiedStateImpl, the default bucket number is 345600, which > probably too large. > - AbstractManagedStateImpl.maxMemorySize probably will be misunderstand as > the max memory size of total managed state, but in fact it was used as memory > size of each bucket. Better to rename it. > - The default maxMemorySize is zero. It's better to give a default reasonable > value to avoid too much garbage collection -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (APEXMALHAR-2284) POJOInnerJoinOperatorTest fails in Travis CI
[ https://issues.apache.org/jira/browse/APEXMALHAR-2284?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15588941#comment-15588941 ] Chandni Singh edited comment on APEXMALHAR-2284 at 10/19/16 2:36 PM: - [~bhup...@apache.org], [~chaithu] I disagree. There are ways that Spillable... can be enhanced to support event time. ManagedTimeStateMultiValue is not even a Managed State Implementation so don't really know why it is with other managed state implementations. was (Author: csingh): [~bhup...@apache.org], [~chaithu] I disagree. There are ways that Spillable... be enhanced to support event time. ManagedTimeStateMultiValue is not even a Managed State Implementation so don't really know why it is with other managed state implementations. > POJOInnerJoinOperatorTest fails in Travis CI > > > Key: APEXMALHAR-2284 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2284 > Project: Apache Apex Malhar > Issue Type: Bug >Reporter: Thomas Weise >Assignee: Chaitanya >Priority: Blocker > Fix For: 3.6.0 > > > https://s3.amazonaws.com/archive.travis-ci.org/jobs/166322754/log.txt > {code} > Failed tests: > POJOInnerJoinOperatorTest.testEmitMultipleTuplesFromStream2:337 Number of > tuple emitted expected:<2> but was:<4> > POJOInnerJoinOperatorTest.testInnerJoinOperator:184 Number of tuple emitted > expected:<1> but was:<2> > POJOInnerJoinOperatorTest.testMultipleValues:236 Number of tuple emitted > expected:<2> but was:<3> > POJOInnerJoinOperatorTest.testUpdateStream1Values:292 Number of tuple > emitted expected:<1> but was:<2> > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (APEXMALHAR-2284) POJOInnerJoinOperatorTest fails in Travis CI
[ https://issues.apache.org/jira/browse/APEXMALHAR-2284?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15588941#comment-15588941 ] Chandni Singh commented on APEXMALHAR-2284: --- [~bhup...@apache.org], [~chaithu] I disagree. There are ways that Spillable... be enhanced to support event time. ManagedTimeStateMultiValue is not even a Managed State Implementation so don't really know why it is with other managed state implementations. > POJOInnerJoinOperatorTest fails in Travis CI > > > Key: APEXMALHAR-2284 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2284 > Project: Apache Apex Malhar > Issue Type: Bug >Reporter: Thomas Weise >Assignee: Chaitanya >Priority: Blocker > Fix For: 3.6.0 > > > https://s3.amazonaws.com/archive.travis-ci.org/jobs/166322754/log.txt > {code} > Failed tests: > POJOInnerJoinOperatorTest.testEmitMultipleTuplesFromStream2:337 Number of > tuple emitted expected:<2> but was:<4> > POJOInnerJoinOperatorTest.testInnerJoinOperator:184 Number of tuple emitted > expected:<1> but was:<2> > POJOInnerJoinOperatorTest.testMultipleValues:236 Number of tuple emitted > expected:<2> but was:<3> > POJOInnerJoinOperatorTest.testUpdateStream1Values:292 Number of tuple > emitted expected:<1> but was:<2> > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (APEXMALHAR-2299) TimeBasedDedupOperator throws exception during time bucket assignment in certain edge cases
[ https://issues.apache.org/jira/browse/APEXMALHAR-2299?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chandni Singh resolved APEXMALHAR-2299. --- Resolution: Fixed > TimeBasedDedupOperator throws exception during time bucket assignment in > certain edge cases > --- > > Key: APEXMALHAR-2299 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2299 > Project: Apache Apex Malhar > Issue Type: Bug >Affects Versions: 3.5.0 >Reporter: Francis Fernandes >Assignee: Francis Fernandes > Fix For: 3.6.0 > > Original Estimate: 96h > Remaining Estimate: 96h > > The following exception is thrown under certain edge cases: > {noformat} > Stopped running due to an exception. java.lang.IllegalArgumentException: new > time bucket should have a value greater than the old time bucket > at > com.google.common.base.Preconditions.checkArgument(Preconditions.java:88) > at > org.apache.apex.malhar.lib.state.managed.ManagedTimeUnifiedStateImpl.handleBucketConflict(ManagedTimeUnifiedStateImpl.java:126) > at > org.apache.apex.malhar.lib.state.managed.AbstractManagedStateImpl.prepareBucket(AbstractManagedStateImpl.java:265) > at > org.apache.apex.malhar.lib.state.managed.AbstractManagedStateImpl.putInBucket(AbstractManagedStateImpl.java:276) > at > org.apache.apex.malhar.lib.state.managed.ManagedTimeUnifiedStateImpl.put(ManagedTimeUnifiedStateImpl.java:71) > at > org.apache.apex.malhar.lib.dedup.TimeBasedDedupOperator.putManagedState(TimeBasedDedupOperator.java:191) > {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (APEXMALHAR-2284) POJOInnerJoinOperatorTest fails in Travis CI
[ https://issues.apache.org/jira/browse/APEXMALHAR-2284?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15585751#comment-15585751 ] Chandni Singh edited comment on APEXMALHAR-2284 at 10/18/16 3:40 PM: - One concept that I think I should point is how getAsync improves performance is that if the value is not available immediately, then the client can cache the future and work on other keys. Look at the implementations of AbstractDeduper. 1. If SpillableArrayListMultimap only lacks asynchronous get, then that can be added to it. That should not be the only reason to duplicate the functionality. 2. ManagedTimeMultiValueState has no tests and is full of warnings. We should not create a complete separate implementation which duplicates a larger part of functionality provided by something existing otherwise we will have too many slightly different variations of the same thing and that will be very difficult to manage. IMO this is a good opportunity to improve SpillableArrayListMultimapImpl and use that. was (Author: csingh): One concept that I think I should point is that getAsync improves performance is that if the value is not available immediately, then the client can cache the future and work on other keys. Look at the implementations of AbstractDeduper. 1. If SpillableArrayListMultimap only lacks asynchronous get, then that can be added to it. That should not be the only reason to duplicate the functionality. 2. ManagedTimeMultiValueState has no tests and is full of warnings. We should not create a complete separate implementation which duplicates a larger part of functionality provided by something existing otherwise we will have too many slightly different variations of the same thing and that will be very difficult to manage. IMO this is a good opportunity to improve SpillableArrayListMultimapImpl and use that. > POJOInnerJoinOperatorTest fails in Travis CI > > > Key: APEXMALHAR-2284 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2284 > Project: Apache Apex Malhar > Issue Type: Bug >Reporter: Thomas Weise >Assignee: Chaitanya >Priority: Blocker > Fix For: 3.6.0 > > > https://s3.amazonaws.com/archive.travis-ci.org/jobs/166322754/log.txt > {code} > Failed tests: > POJOInnerJoinOperatorTest.testEmitMultipleTuplesFromStream2:337 Number of > tuple emitted expected:<2> but was:<4> > POJOInnerJoinOperatorTest.testInnerJoinOperator:184 Number of tuple emitted > expected:<1> but was:<2> > POJOInnerJoinOperatorTest.testMultipleValues:236 Number of tuple emitted > expected:<2> but was:<3> > POJOInnerJoinOperatorTest.testUpdateStream1Values:292 Number of tuple > emitted expected:<1> but was:<2> > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (APEXMALHAR-2284) POJOInnerJoinOperatorTest fails in Travis CI
[ https://issues.apache.org/jira/browse/APEXMALHAR-2284?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15585751#comment-15585751 ] Chandni Singh commented on APEXMALHAR-2284: --- One concept that I think I should point is that getAsync improves performance is that if the value is not available immediately, then the client can cache the future and work on other keys. Look at the implementations of AbstractDeduper. 1. If SpillableArrayListMultimap only lacks asynchronous get, then that can be added to it. That should not be the only reason to duplicate the functionality. 2. ManagedTimeMultiValueState has no tests and is full of warnings. We should not create a complete separate implementation which duplicates a larger part of functionality provided by something existing otherwise we will have too many slightly different variations of the same thing and that will be very difficult to manage. IMO this is a good opportunity to improve SpillableArrayListMultimapImpl and use that. > POJOInnerJoinOperatorTest fails in Travis CI > > > Key: APEXMALHAR-2284 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2284 > Project: Apache Apex Malhar > Issue Type: Bug >Reporter: Thomas Weise >Assignee: Chaitanya >Priority: Blocker > Fix For: 3.6.0 > > > https://s3.amazonaws.com/archive.travis-ci.org/jobs/166322754/log.txt > {code} > Failed tests: > POJOInnerJoinOperatorTest.testEmitMultipleTuplesFromStream2:337 Number of > tuple emitted expected:<2> but was:<4> > POJOInnerJoinOperatorTest.testInnerJoinOperator:184 Number of tuple emitted > expected:<1> but was:<2> > POJOInnerJoinOperatorTest.testMultipleValues:236 Number of tuple emitted > expected:<2> but was:<3> > POJOInnerJoinOperatorTest.testUpdateStream1Values:292 Number of tuple > emitted expected:<1> but was:<2> > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (APEXMALHAR-2284) POJOInnerJoinOperatorTest fails in Travis CI
[ https://issues.apache.org/jira/browse/APEXMALHAR-2284?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15582672#comment-15582672 ] Chandni Singh commented on APEXMALHAR-2284: --- Can you look into using an implementation of SpillableArrayListMultimap directly? Initially I think that is what was discussed to use for join. > POJOInnerJoinOperatorTest fails in Travis CI > > > Key: APEXMALHAR-2284 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2284 > Project: Apache Apex Malhar > Issue Type: Bug >Reporter: Thomas Weise >Assignee: Chaitanya >Priority: Blocker > Fix For: 3.6.0 > > > https://s3.amazonaws.com/archive.travis-ci.org/jobs/166322754/log.txt > {code} > Failed tests: > POJOInnerJoinOperatorTest.testEmitMultipleTuplesFromStream2:337 Number of > tuple emitted expected:<2> but was:<4> > POJOInnerJoinOperatorTest.testInnerJoinOperator:184 Number of tuple emitted > expected:<1> but was:<2> > POJOInnerJoinOperatorTest.testMultipleValues:236 Number of tuple emitted > expected:<2> but was:<3> > POJOInnerJoinOperatorTest.testUpdateStream1Values:292 Number of tuple > emitted expected:<1> but was:<2> > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (APEXMALHAR-2284) POJOInnerJoinOperatorTest fails in Travis CI
[ https://issues.apache.org/jira/browse/APEXMALHAR-2284?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15582633#comment-15582633 ] Chandni Singh commented on APEXMALHAR-2284: --- I don't understand what constraint you are talking about. Consider any concurrent data structure like a concurrent hashmap. If you do a simultaneous put and get, you don't what value you get from the get call- it can be the old value or the new one. > POJOInnerJoinOperatorTest fails in Travis CI > > > Key: APEXMALHAR-2284 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2284 > Project: Apache Apex Malhar > Issue Type: Bug >Reporter: Thomas Weise >Assignee: Chaitanya >Priority: Blocker > Fix For: 3.6.0 > > > https://s3.amazonaws.com/archive.travis-ci.org/jobs/166322754/log.txt > {code} > Failed tests: > POJOInnerJoinOperatorTest.testEmitMultipleTuplesFromStream2:337 Number of > tuple emitted expected:<2> but was:<4> > POJOInnerJoinOperatorTest.testInnerJoinOperator:184 Number of tuple emitted > expected:<1> but was:<2> > POJOInnerJoinOperatorTest.testMultipleValues:236 Number of tuple emitted > expected:<2> but was:<3> > POJOInnerJoinOperatorTest.testUpdateStream1Values:292 Number of tuple > emitted expected:<1> but was:<2> > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (APEXMALHAR-2284) POJOInnerJoinOperatorTest fails in Travis CI
[ https://issues.apache.org/jira/browse/APEXMALHAR-2284?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15572183#comment-15572183 ] Chandni Singh commented on APEXMALHAR-2284: --- I think the proper fix for this may take time. IMO we should disable POJOInnerJoinOperatorTest but keep the JIRA open. > POJOInnerJoinOperatorTest fails in Travis CI > > > Key: APEXMALHAR-2284 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2284 > Project: Apache Apex Malhar > Issue Type: Bug >Reporter: Thomas Weise >Assignee: Chaitanya >Priority: Blocker > > https://s3.amazonaws.com/archive.travis-ci.org/jobs/166322754/log.txt > {code} > Failed tests: > POJOInnerJoinOperatorTest.testEmitMultipleTuplesFromStream2:337 Number of > tuple emitted expected:<2> but was:<4> > POJOInnerJoinOperatorTest.testInnerJoinOperator:184 Number of tuple emitted > expected:<1> but was:<2> > POJOInnerJoinOperatorTest.testMultipleValues:236 Number of tuple emitted > expected:<2> but was:<3> > POJOInnerJoinOperatorTest.testUpdateStream1Values:292 Number of tuple > emitted expected:<1> but was:<2> > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (APEXMALHAR-2284) POJOInnerJoinOperatorTest fails in Travis CI
[ https://issues.apache.org/jira/browse/APEXMALHAR-2284?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15572169#comment-15572169 ] Chandni Singh commented on APEXMALHAR-2284: --- 1. put needs to done only after you retrieve the value from getAsync(k1). You may have to wait for the future to finish. 2. it is an async call back and tries to retrieve the latest value. Whatever is put in the bucket is the latest. So if you are doing a put, without completing get then it is a problem. > POJOInnerJoinOperatorTest fails in Travis CI > > > Key: APEXMALHAR-2284 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2284 > Project: Apache Apex Malhar > Issue Type: Bug >Reporter: Thomas Weise >Assignee: Chaitanya >Priority: Blocker > > https://s3.amazonaws.com/archive.travis-ci.org/jobs/166322754/log.txt > {code} > Failed tests: > POJOInnerJoinOperatorTest.testEmitMultipleTuplesFromStream2:337 Number of > tuple emitted expected:<2> but was:<4> > POJOInnerJoinOperatorTest.testInnerJoinOperator:184 Number of tuple emitted > expected:<1> but was:<2> > POJOInnerJoinOperatorTest.testMultipleValues:236 Number of tuple emitted > expected:<2> but was:<3> > POJOInnerJoinOperatorTest.testUpdateStream1Values:292 Number of tuple > emitted expected:<1> but was:<2> > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (APEXMALHAR-2284) POJOInnerJoinOperatorTest fails in Travis CI
[ https://issues.apache.org/jira/browse/APEXMALHAR-2284?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15571119#comment-15571119 ] Chandni Singh edited comment on APEXMALHAR-2284 at 10/13/16 7:27 AM: - Chaitanya, Please feel free to assign the ticket to me if my PR broke these tests. Having a variable {code}asyncReadSource{code} didn't really make much sense to me. Changing how reader thread fetch value from a bucket doesn't need to be exposed and configured. Anyways if this was done for tests, then there are better ways of doing it. was (Author: csingh): Chaitanya, Please feel free to assign the ticket to me if my PR broke these tests. Having a variable {code}asyncReadSource{code} didn't really make much sense to me. Changing how reader thread fetch value from a bucket doesn't need to be altered. Anyways if this was done for tests, then there are better ways of doing it. > POJOInnerJoinOperatorTest fails in Travis CI > > > Key: APEXMALHAR-2284 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2284 > Project: Apache Apex Malhar > Issue Type: Bug >Reporter: Thomas Weise >Assignee: Chaitanya >Priority: Blocker > > https://s3.amazonaws.com/archive.travis-ci.org/jobs/166322754/log.txt > {code} > Failed tests: > POJOInnerJoinOperatorTest.testEmitMultipleTuplesFromStream2:337 Number of > tuple emitted expected:<2> but was:<4> > POJOInnerJoinOperatorTest.testInnerJoinOperator:184 Number of tuple emitted > expected:<1> but was:<2> > POJOInnerJoinOperatorTest.testMultipleValues:236 Number of tuple emitted > expected:<2> but was:<3> > POJOInnerJoinOperatorTest.testUpdateStream1Values:292 Number of tuple > emitted expected:<1> but was:<2> > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (APEXMALHAR-2284) POJOInnerJoinOperatorTest fails in Travis CI
[ https://issues.apache.org/jira/browse/APEXMALHAR-2284?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15571119#comment-15571119 ] Chandni Singh commented on APEXMALHAR-2284: --- Chaitanya, Please feel free to assign the ticket to me if my PR broke these tests. Having a variable {code}asyncReadSource{code} didn't really make much sense to me. Changing how reader thread fetch value from a bucket doesn't need to be altered. Anyways if this was done for tests, then there are better ways of doing it. > POJOInnerJoinOperatorTest fails in Travis CI > > > Key: APEXMALHAR-2284 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2284 > Project: Apache Apex Malhar > Issue Type: Bug >Reporter: Thomas Weise >Assignee: Chaitanya >Priority: Blocker > > https://s3.amazonaws.com/archive.travis-ci.org/jobs/166322754/log.txt > {code} > Failed tests: > POJOInnerJoinOperatorTest.testEmitMultipleTuplesFromStream2:337 Number of > tuple emitted expected:<2> but was:<4> > POJOInnerJoinOperatorTest.testInnerJoinOperator:184 Number of tuple emitted > expected:<1> but was:<2> > POJOInnerJoinOperatorTest.testMultipleValues:236 Number of tuple emitted > expected:<2> but was:<3> > POJOInnerJoinOperatorTest.testUpdateStream1Values:292 Number of tuple > emitted expected:<1> but was:<2> > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (APEXMALHAR-2284) POJOInnerJoinOperatorTest fails in Travis CI
[ https://issues.apache.org/jira/browse/APEXMALHAR-2284?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15571074#comment-15571074 ] Chandni Singh edited comment on APEXMALHAR-2284 at 10/13/16 6:57 AM: - Are you sure about that? The tests were failing before merging the PR on a different PR as well, if I remember correctly. Also why do they fail intermittently? was (Author: csingh): Are you sure about that? The tests were failing before merging the PR on a different PR as well, if I remember correctly? Also why do they fail intermittently? > POJOInnerJoinOperatorTest fails in Travis CI > > > Key: APEXMALHAR-2284 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2284 > Project: Apache Apex Malhar > Issue Type: Bug >Reporter: Thomas Weise >Assignee: Chaitanya >Priority: Blocker > > https://s3.amazonaws.com/archive.travis-ci.org/jobs/166322754/log.txt > {code} > Failed tests: > POJOInnerJoinOperatorTest.testEmitMultipleTuplesFromStream2:337 Number of > tuple emitted expected:<2> but was:<4> > POJOInnerJoinOperatorTest.testInnerJoinOperator:184 Number of tuple emitted > expected:<1> but was:<2> > POJOInnerJoinOperatorTest.testMultipleValues:236 Number of tuple emitted > expected:<2> but was:<3> > POJOInnerJoinOperatorTest.testUpdateStream1Values:292 Number of tuple > emitted expected:<1> but was:<2> > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (APEXMALHAR-2284) POJOInnerJoinOperatorTest fails in Travis CI
[ https://issues.apache.org/jira/browse/APEXMALHAR-2284?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15571074#comment-15571074 ] Chandni Singh commented on APEXMALHAR-2284: --- Are you sure about that? The tests were failing before merging the PR on a different PR as well, if I remember correctly? Also why do they fail intermittently? > POJOInnerJoinOperatorTest fails in Travis CI > > > Key: APEXMALHAR-2284 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2284 > Project: Apache Apex Malhar > Issue Type: Bug >Reporter: Thomas Weise >Assignee: Chaitanya >Priority: Blocker > > https://s3.amazonaws.com/archive.travis-ci.org/jobs/166322754/log.txt > {code} > Failed tests: > POJOInnerJoinOperatorTest.testEmitMultipleTuplesFromStream2:337 Number of > tuple emitted expected:<2> but was:<4> > POJOInnerJoinOperatorTest.testInnerJoinOperator:184 Number of tuple emitted > expected:<1> but was:<2> > POJOInnerJoinOperatorTest.testMultipleValues:236 Number of tuple emitted > expected:<2> but was:<3> > POJOInnerJoinOperatorTest.testUpdateStream1Values:292 Number of tuple > emitted expected:<1> but was:<2> > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (APEXMALHAR-2285) Add a property to TimeBasedDirectoryScanner in FileSplitterInput that ignores missing directories instead of propagating exception
Chandni Singh created APEXMALHAR-2285: - Summary: Add a property to TimeBasedDirectoryScanner in FileSplitterInput that ignores missing directories instead of propagating exception Key: APEXMALHAR-2285 URL: https://issues.apache.org/jira/browse/APEXMALHAR-2285 Project: Apache Apex Malhar Issue Type: Improvement Reporter: Chandni Singh The {code}TimeBasedDirectoryScanner{code} in {code}FileSplitterInput{code} fails the container if it is unable to open a directory (list all the paths in the directory). Some use cases may want to ignore missing directories, so we should be able to control it using a property. This was discussed here: https://github.com/apache/apex-malhar/pull/446 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (APEXMALHAR-1852) File Splitter Test Failing On My Machine
[ https://issues.apache.org/jira/browse/APEXMALHAR-1852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15560419#comment-15560419 ] Chandni Singh commented on APEXMALHAR-1852: --- Found the problem with the test. There are 12 test files being created file 0 -11. In the file content lines that get added are f{file-num}l{line-num}, where 0 >= line-num < 2. Files 0 - 9 are smaller than files 10 - 11 because the file-num in the content is just a single digit. This results in different number of blocks. The test relied on a file having 5 blocks, but instead it sometimes picks a file of 12 blocks. So fix is simple- made every file have 6 blocks and modified test There was a race condition with testRecursive(). Fixed that too. > File Splitter Test Failing On My Machine > > > Key: APEXMALHAR-1852 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-1852 > Project: Apache Apex Malhar > Issue Type: Bug >Reporter: Timothy Farkas >Assignee: Chandni Singh > > FileSplitterInputTest.testRecoveryOfPartialFile:408 New file expected:<1> but > was:<0> -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (APEXMALHAR-1852) File Splitter Test Failing On My Machine
[ https://issues.apache.org/jira/browse/APEXMALHAR-1852?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chandni Singh reassigned APEXMALHAR-1852: - Assignee: Chandni Singh > File Splitter Test Failing On My Machine > > > Key: APEXMALHAR-1852 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-1852 > Project: Apache Apex Malhar > Issue Type: Bug >Reporter: Timothy Farkas >Assignee: Chandni Singh > > FileSplitterInputTest.testRecoveryOfPartialFile:408 New file expected:<1> but > was:<0> -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (APEXMALHAR-2281) ManagedState: race condition with put & asyncGet
[ https://issues.apache.org/jira/browse/APEXMALHAR-2281?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chandni Singh updated APEXMALHAR-2281: -- Summary: ManagedState: race condition with put & asyncGet (was: ManagedState: race condition with put & asyncGet and bucket size doesn't account for entries in file cache) > ManagedState: race condition with put & asyncGet > > > Key: APEXMALHAR-2281 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2281 > Project: Apache Apex Malhar > Issue Type: Bug >Reporter: Chandni Singh >Assignee: Chandni Singh > > There are 2 bugs: > 1. Race condition with put and asyncGet. put(...) inserts a bucketedValue in > the flash with value = null first. It sets the value afterwords. There is a > possibility that the reader thread will get BucketedValue from flash and read > the value as null. > 2. Bucket size doesn't account for entries in fileCache -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (APEXMALHAR-2281) ManagedState: race condition with put & asyncGet
[ https://issues.apache.org/jira/browse/APEXMALHAR-2281?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chandni Singh updated APEXMALHAR-2281: -- Description: Race condition with put and asyncGet. put(...) inserts a bucketedValue in the flash with value = null first. It sets the value afterwords. There is a possibility that the reader thread will get BucketedValue from flash and read the value as null. (was: There are 2 bugs: 1. Race condition with put and asyncGet. put(...) inserts a bucketedValue in the flash with value = null first. It sets the value afterwords. There is a possibility that the reader thread will get BucketedValue from flash and read the value as null. 2. Bucket size doesn't account for entries in fileCache ) > ManagedState: race condition with put & asyncGet > > > Key: APEXMALHAR-2281 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2281 > Project: Apache Apex Malhar > Issue Type: Bug >Reporter: Chandni Singh >Assignee: Chandni Singh > > Race condition with put and asyncGet. put(...) inserts a bucketedValue in the > flash with value = null first. It sets the value afterwords. There is a > possibility that the reader thread will get BucketedValue from flash and read > the value as null. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (APEXMALHAR-2276) ManagedState: value of a key does not get over-written in the same time bucket
[ https://issues.apache.org/jira/browse/APEXMALHAR-2276?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chandni Singh resolved APEXMALHAR-2276. --- Resolution: Fixed > ManagedState: value of a key does not get over-written in the same time bucket > -- > > Key: APEXMALHAR-2276 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2276 > Project: Apache Apex Malhar > Issue Type: Bug >Reporter: Siyuan Hua >Assignee: Chandni Singh > Fix For: 3.6.0 > > > For example: > ManagedTimeUnifiedStateImpl mtus; > mtus.put(1, key1, val1) > mtus.put(1, key1, val2) > mtus.get(1, key1).equals(val2) will return false -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (APEXMALHAR-2281) ManagedState: race condition with put & asyncGet and bucket size doesn't account for entries in file cache
[ https://issues.apache.org/jira/browse/APEXMALHAR-2281?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chandni Singh updated APEXMALHAR-2281: -- Description: There are 2 bugs: 1. Race condition with put and asyncGet. put(...) inserts a bucketedValue in the flash with value = null first. It sets the value afterwords. There is a possibility that the reader thread will get BucketedValue from flash and read the value as null. 2. Bucket size doesn't account for entries in fileCache was: There are 2 bugs: 1. Race condition with put and asyncGet. put(...) inserts a bucketedValue in the flash with value = null first. It sets the value afterwords. There is a possibility that the reader thread will get BucketedValue from flash and read the value as null. > ManagedState: race condition with put & asyncGet and bucket size doesn't > account for entries in file cache > -- > > Key: APEXMALHAR-2281 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2281 > Project: Apache Apex Malhar > Issue Type: Bug >Reporter: Chandni Singh >Assignee: Chandni Singh > > There are 2 bugs: > 1. Race condition with put and asyncGet. put(...) inserts a bucketedValue in > the flash with value = null first. It sets the value afterwords. There is a > possibility that the reader thread will get BucketedValue from flash and read > the value as null. > 2. Bucket size doesn't account for entries in fileCache -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (APEXMALHAR-2281) ManagedState: race condition with put & asyncGet and bucket size doesn't account for entries in file cache
Chandni Singh created APEXMALHAR-2281: - Summary: ManagedState: race condition with put & asyncGet and bucket size doesn't account for entries in file cache Key: APEXMALHAR-2281 URL: https://issues.apache.org/jira/browse/APEXMALHAR-2281 Project: Apache Apex Malhar Issue Type: Bug Reporter: Chandni Singh Assignee: Chandni Singh There are 2 bugs: 1. Race condition with put and asyncGet. put(...) inserts a bucketedValue in the flash with value = null first. It sets the value afterwords. There is a possibility that the reader thread will get BucketedValue from flash and read the value as null. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (APEXMALHAR-2276) ManagedState: value of a key does not get over-written in the same time bucket
[ https://issues.apache.org/jira/browse/APEXMALHAR-2276?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15552532#comment-15552532 ] Chandni Singh edited comment on APEXMALHAR-2276 at 10/6/16 5:28 PM: I don't see a problem. In Scenario A, it is evident right at the time of insertion that update is old and is ignored. In Scenario B, it is unknown at the time of insertion that update is old. Which is why I mentioned, that the client needs to do a get() and then put(). was (Author: csingh): I don't see a problem. In Scenario A, it is evident right at the time of insertion that update is old and is ignored. In Scenario B, it is unknown at the time of insertion that update is old. Which is why I mentioned, that the client needs to do a get() and then put(). ManagedStateImpl and ManagedTimeStateImpl are not for maintaining time series in memory. > ManagedState: value of a key does not get over-written in the same time bucket > -- > > Key: APEXMALHAR-2276 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2276 > Project: Apache Apex Malhar > Issue Type: Bug >Reporter: Siyuan Hua >Assignee: Chandni Singh > Fix For: 3.6.0 > > > For example: > ManagedTimeUnifiedStateImpl mtus; > mtus.put(1, key1, val1) > mtus.put(1, key1, val2) > mtus.get(1, key1).equals(val2) will return false -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (APEXMALHAR-2276) ManagedState: value of a key does not get over-written in the same time bucket
[ https://issues.apache.org/jira/browse/APEXMALHAR-2276?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15552532#comment-15552532 ] Chandni Singh commented on APEXMALHAR-2276: --- I don't see a problem. In Scenario A, it is evident right at the time of insertion that update is old and is ignored. In Scenario B, it is unknown at the time of insertion that update is old. Which is why I mentioned, that the client needs to do a get() and then put(). ManagedStateImpl and ManagedTimeStateImpl are not for maintaining time series in memory. > ManagedState: value of a key does not get over-written in the same time bucket > -- > > Key: APEXMALHAR-2276 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2276 > Project: Apache Apex Malhar > Issue Type: Bug >Reporter: Siyuan Hua >Assignee: Chandni Singh > Fix For: 3.6.0 > > > For example: > ManagedTimeUnifiedStateImpl mtus; > mtus.put(1, key1, val1) > mtus.put(1, key1, val2) > mtus.get(1, key1).equals(val2) will return false -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (APEXMALHAR-2276) ManagedState: value of a key does not get over-written in the same time bucket
[ https://issues.apache.org/jira/browse/APEXMALHAR-2276?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15550649#comment-15550649 ] Chandni Singh commented on APEXMALHAR-2276: --- I agree with [~thw] too. This creates un-necessary overhead for every use case. Additional space for timestamp is required per key both in memory and disk. While making the changes though, I discovered a race condition and an issue with free space. I will create another ticket for that. > ManagedState: value of a key does not get over-written in the same time bucket > -- > > Key: APEXMALHAR-2276 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2276 > Project: Apache Apex Malhar > Issue Type: Bug >Reporter: Siyuan Hua >Assignee: Chandni Singh > Fix For: 3.6.0 > > > For example: > ManagedTimeUnifiedStateImpl mtus; > mtus.put(1, key1, val1) > mtus.put(1, key1, val2) > mtus.get(1, key1).equals(val2) will return false -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (APEXMALHAR-2276) ManagedState: value of a key does not get over-written in the same time bucket
[ https://issues.apache.org/jira/browse/APEXMALHAR-2276?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15546623#comment-15546623 ] Chandni Singh commented on APEXMALHAR-2276: --- Yes time gets mapped to a time bucket, however the exact value of time is lost and currently we cannot derive time from the timebucket. This affects cases where we want to save the most recent value of a key. In use-cases, like de-duplication/aggregation this is not the case. In De-duplication we check whether there is a key present and drop the event if its duplicate. In Aggregation, we use the existing value in the time-bucket and aggregate it with new value. > ManagedState: value of a key does not get over-written in the same time bucket > -- > > Key: APEXMALHAR-2276 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2276 > Project: Apache Apex Malhar > Issue Type: Bug >Reporter: Siyuan Hua >Assignee: Chandni Singh > Fix For: 3.6.0 > > > For example: > ManagedTimeUnifiedStateImpl mtus; > mtus.put(1, key1, val1) > mtus.put(1, key1, val2) > mtus.get(1, key1).equals(val2) will return false -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Issue Comment Deleted] (APEXMALHAR-2276) ManagedState: value of a key does not get over-written in the same time bucket
[ https://issues.apache.org/jira/browse/APEXMALHAR-2276?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chandni Singh updated APEXMALHAR-2276: -- Comment: was deleted (was: Yes time gets mapped to a time bucket, however the exact value of time is lost and currently we cannot derive time from the timebucket. This affects cases where we want to save the most recent value of a key. In use-cases, like de-duplication/aggregation this is not the case. In De-duplication we check whether there is a key present and drop the event if its duplicate. In Aggregation, we use the existing value in the time-bucket and aggregate it with new value. ) > ManagedState: value of a key does not get over-written in the same time bucket > -- > > Key: APEXMALHAR-2276 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2276 > Project: Apache Apex Malhar > Issue Type: Bug >Reporter: Siyuan Hua >Assignee: Chandni Singh > Fix For: 3.6.0 > > > For example: > ManagedTimeUnifiedStateImpl mtus; > mtus.put(1, key1, val1) > mtus.put(1, key1, val2) > mtus.get(1, key1).equals(val2) will return false -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (APEXMALHAR-2276) ManagedState: value of a key does not get over-written in the same time bucket
[ https://issues.apache.org/jira/browse/APEXMALHAR-2276?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15546592#comment-15546592 ] Chandni Singh commented on APEXMALHAR-2276: --- As [~chaithu] pointed out, we need to compare time and not time buckets. The fix for that will not be simple. For each key/value now we have to remember the latest time. So far bucket didn't have reference to time which means the bucket API will probably change as well. > ManagedState: value of a key does not get over-written in the same time bucket > -- > > Key: APEXMALHAR-2276 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2276 > Project: Apache Apex Malhar > Issue Type: Bug >Reporter: Siyuan Hua >Assignee: Chandni Singh > Fix For: 3.6.0 > > > For example: > ManagedTimeUnifiedStateImpl mtus; > mtus.put(1, key1, val1) > mtus.put(1, key1, val2) > mtus.get(1, key1).equals(val2) will return false -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Reopened] (APEXMALHAR-2276) ManagedState: value of a key does not get over-written in the same time bucket
[ https://issues.apache.org/jira/browse/APEXMALHAR-2276?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chandni Singh reopened APEXMALHAR-2276: --- > ManagedState: value of a key does not get over-written in the same time bucket > -- > > Key: APEXMALHAR-2276 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2276 > Project: Apache Apex Malhar > Issue Type: Bug >Reporter: Siyuan Hua >Assignee: Chandni Singh > Fix For: 3.6.0 > > > For example: > ManagedTimeUnifiedStateImpl mtus; > mtus.put(1, key1, val1) > mtus.put(1, key1, val2) > mtus.get(1, key1).equals(val2) will return false -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (APEXMALHAR-2223) Managed state should parallelize WAL writes
[ https://issues.apache.org/jira/browse/APEXMALHAR-2223?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15537595#comment-15537595 ] Chandni Singh commented on APEXMALHAR-2223: --- IncrementalCheckpointManager used to extend FSWindowDataManager but this change requires saving multiple artifacts per window and the api of IncrementalCheckpointManager was quite different from FSWindowDataManager. So abstracted out the common code to `AbstractFSWindowStateManager`. > Managed state should parallelize WAL writes > --- > > Key: APEXMALHAR-2223 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2223 > Project: Apache Apex Malhar > Issue Type: Improvement >Affects Versions: 3.4.0 >Reporter: Thomas Weise >Assignee: Chandni Singh > > Currently, data is accumulated in memory and written to the WAL on checkpoint > only. This causes a write spike on checkpoint and does not utilize the HDFS > write pipeline. The other extreme is writing to the WAL as soon as data > arrives and then only flush in beforeCheckpoint. The downside of this is that > when the same key is written many times, all duplicates will be in the WAL. > Need to find a balances approach, that the user can potentially fine tune. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (APEXMALHAR-2276) ManagedState: value of a key does not get over-written in the same time bucket
[ https://issues.apache.org/jira/browse/APEXMALHAR-2276?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chandni Singh updated APEXMALHAR-2276: -- Summary: ManagedState: value of a key does not get over-written in the same time bucket (was: ManagedTimeUnifiedStateImpl: value of a key does not get over-written in the same time bucket) > ManagedState: value of a key does not get over-written in the same time bucket > -- > > Key: APEXMALHAR-2276 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2276 > Project: Apache Apex Malhar > Issue Type: Bug >Reporter: Siyuan Hua >Assignee: Chandni Singh > > For example: > ManagedTimeUnifiedStateImpl mtus; > mtus.put(1, key1, val1) > mtus.put(1, key1, val2) > mtus.get(1, key1).equals(val2) will return false -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (APEXMALHAR-2276) ManagedTimeUnifiedStateImpl: value of a key does not get over-written in the same time bucket
[ https://issues.apache.org/jira/browse/APEXMALHAR-2276?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chandni Singh updated APEXMALHAR-2276: -- Summary: ManagedTimeUnifiedStateImpl: value of a key does not get over-written in the same time bucket (was: ManagedTimeUnifiedStateImpl doesn't support time events properly) > ManagedTimeUnifiedStateImpl: value of a key does not get over-written in the > same time bucket > - > > Key: APEXMALHAR-2276 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2276 > Project: Apache Apex Malhar > Issue Type: Bug >Reporter: Siyuan Hua >Assignee: Chandni Singh > > For example: > ManagedTimeUnifiedStateImpl mtus; > mtus.put(1, key1, val1) > mtus.put(1, key1, val2) > mtus.get(1, key1).equals(val2) will return false -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (APEXMALHAR-2252) Document AbstractManagedStateImpl subclasses
[ https://issues.apache.org/jira/browse/APEXMALHAR-2252?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chandni Singh reassigned APEXMALHAR-2252: - Assignee: Chandni Singh > Document AbstractManagedStateImpl subclasses > > > Key: APEXMALHAR-2252 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2252 > Project: Apache Apex Malhar > Issue Type: Task >Reporter: Thomas Weise >Assignee: Chandni Singh > > For the user the time bucketing concept is very important to understand. We > should highlight the difference in the documentation and also consider > renaming the subclasses for clarify. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (APEXMALHAR-2223) Managed state should parallelize WAL writes
[ https://issues.apache.org/jira/browse/APEXMALHAR-2223?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15496811#comment-15496811 ] Chandni Singh commented on APEXMALHAR-2223: --- The reason to delay to endWindow is that with every event will have to compare the bucket size with the threshold. This comparison, though not very expensive, can be done at intervals because it will take some time to reach the threshold. The interval of an application window seemed fair to me. When buffer of the outputStream is full, data is automatically flushed. Explicitly calling flush forces any buffered data so for us this forced flush can be invoked in beforeCheckpoint(). We do not need to call it every endWindow. This component relies on checkpoint state, that is, the wal is truncated to the offset saved in the state after failures. > Managed state should parallelize WAL writes > --- > > Key: APEXMALHAR-2223 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2223 > Project: Apache Apex Malhar > Issue Type: Improvement >Affects Versions: 3.4.0 >Reporter: Thomas Weise >Assignee: Chandni Singh > > Currently, data is accumulated in memory and written to the WAL on checkpoint > only. This causes a write spike on checkpoint and does not utilize the HDFS > write pipeline. The other extreme is writing to the WAL as soon as data > arrives and then only flush in beforeCheckpoint. The downside of this is that > when the same key is written many times, all duplicates will be in the WAL. > Need to find a balances approach, that the user can potentially fine tune. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (APEXMALHAR-2223) Managed state should parallelize WAL writes
[ https://issues.apache.org/jira/browse/APEXMALHAR-2223?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15496811#comment-15496811 ] Chandni Singh edited comment on APEXMALHAR-2223 at 9/16/16 4:59 PM: The reason to delay to endWindow is that otherwise with every event will have to compare the bucket size with the threshold. This comparison, though not very expensive, can be done at intervals because it will take some time to reach the threshold. The interval of an application window seemed fair to me. When buffer of the outputStream is full, data is automatically flushed. Explicitly calling flush forces any buffered data so for us this forced flush can be invoked in beforeCheckpoint(). We do not need to call it every endWindow. This component relies on checkpoint state, that is, the wal is truncated to the offset saved in the state after failures. was (Author: csingh): The reason to delay to endWindow is that with every event will have to compare the bucket size with the threshold. This comparison, though not very expensive, can be done at intervals because it will take some time to reach the threshold. The interval of an application window seemed fair to me. When buffer of the outputStream is full, data is automatically flushed. Explicitly calling flush forces any buffered data so for us this forced flush can be invoked in beforeCheckpoint(). We do not need to call it every endWindow. This component relies on checkpoint state, that is, the wal is truncated to the offset saved in the state after failures. > Managed state should parallelize WAL writes > --- > > Key: APEXMALHAR-2223 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2223 > Project: Apache Apex Malhar > Issue Type: Improvement >Affects Versions: 3.4.0 >Reporter: Thomas Weise >Assignee: Chandni Singh > > Currently, data is accumulated in memory and written to the WAL on checkpoint > only. This causes a write spike on checkpoint and does not utilize the HDFS > write pipeline. The other extreme is writing to the WAL as soon as data > arrives and then only flush in beforeCheckpoint. The downside of this is that > when the same key is written many times, all duplicates will be in the WAL. > Need to find a balances approach, that the user can potentially fine tune. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Issue Comment Deleted] (APEXMALHAR-2227) Error while connecting with Kafka using Apache Apex
[ https://issues.apache.org/jira/browse/APEXMALHAR-2227?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chandni Singh updated APEXMALHAR-2227: -- Comment: was deleted (was: Hi, We will need to make the fix in the Malhar 3.5.0 release branch and push it to maven repo. So yes you will need to download the malhar jar again from maven repo. Another quick way is to get the source code by cloning the Malhar repo. Switch to 3.5.0 release branch. Making the fix and building it. The fix is quite trivial. You can see the change that needs to be done here: https://github.com/apache/apex-malhar/pull/402 Thanks, Chandni) > Error while connecting with Kafka using Apache Apex > --- > > Key: APEXMALHAR-2227 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2227 > Project: Apache Apex Malhar > Issue Type: Bug >Affects Versions: 3.4.0 >Reporter: Rambrij Chauhan >Assignee: Chandni Singh >Priority: Blocker > > Hi, > I am working on Apache Apex and trying to implement KafkaInputOperator > provided by apex-malhar but I am hitting with following exception as soon as > i am running the application:- > Strange thing is, the same application working for other zookeeper ip but not > for that IP i need it to work so i went and check the version of respective > zookeeper and found that, the one which is working belongs to 0.8.1.1 and > other one is 0.8.2.0. Could you kindly help me out for some work arround, i > really need to work on with the IP which belongs to 0.8.2.0. > java.lang.RuntimeException: Error creating local cluster > at > com.datatorrent.stram.LocalModeImpl.getController(LocalModeImpl.java:78) > at > org.capitalone.apex.main.ApplicationTest.testSomeMethod(ApplicationTest.java:14) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229) > at org.junit.runners.ParentRunner.run(ParentRunner.java:309) > at > org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:86) > at > org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38) > at > org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:459) > at > org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:678) > at > org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:382) > at > org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:192) > Caused by: java.lang.IllegalStateException: Partitioner returns null or empty. > at > com.datatorrent.stram.plan.physical.PhysicalPlan.initPartitioning(PhysicalPlan.java:755) > at > com.datatorrent.stram.plan.physical.PhysicalPlan.addLogicalOperator(PhysicalPlan.java:1676) > at > com.datatorrent.stram.plan.physical.PhysicalPlan.(PhysicalPlan.java:378) > at > com.datatorrent.stram.StreamingContainerManager.(StreamingContainerManager.java:418) > at > com.datatorrent.stram.StreamingContainerManager.(StreamingContainerManager.java:406) > at > com.datatorrent.stram.StramLocalCluster.(StramLocalCluster.java:299) > at > com.datatorrent.stram.LocalModeImpl.getController(LocalModeImpl.java:76) > ... 24 more -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (APEXMALHAR-2227) Error while connecting with Kafka using Apache Apex
[ https://issues.apache.org/jira/browse/APEXMALHAR-2227?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15471182#comment-15471182 ] Chandni Singh commented on APEXMALHAR-2227: --- Hi, We will need to make the fix in the Malhar 3.5.0 release branch and push it to maven repo. So yes you will need to download the malhar jar again from maven repo. Another quick way is to get the source code by cloning the Malhar repo. Switch to 3.5.0 release branch. Making the fix and building it. The fix is quite trivial. You can see the change that needs to be done here: https://github.com/apache/apex-malhar/pull/402 Thanks, Chandni > Error while connecting with Kafka using Apache Apex > --- > > Key: APEXMALHAR-2227 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2227 > Project: Apache Apex Malhar > Issue Type: Bug >Affects Versions: 3.4.0 >Reporter: Rambrij Chauhan >Assignee: Chandni Singh >Priority: Blocker > > Hi, > I am working on Apache Apex and trying to implement KafkaInputOperator > provided by apex-malhar but I am hitting with following exception as soon as > i am running the application:- > Strange thing is, the same application working for other zookeeper ip but not > for that IP i need it to work so i went and check the version of respective > zookeeper and found that, the one which is working belongs to 0.8.1.1 and > other one is 0.8.2.0. Could you kindly help me out for some work arround, i > really need to work on with the IP which belongs to 0.8.2.0. > java.lang.RuntimeException: Error creating local cluster > at > com.datatorrent.stram.LocalModeImpl.getController(LocalModeImpl.java:78) > at > org.capitalone.apex.main.ApplicationTest.testSomeMethod(ApplicationTest.java:14) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229) > at org.junit.runners.ParentRunner.run(ParentRunner.java:309) > at > org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:86) > at > org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38) > at > org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:459) > at > org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:678) > at > org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:382) > at > org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:192) > Caused by: java.lang.IllegalStateException: Partitioner returns null or empty. > at > com.datatorrent.stram.plan.physical.PhysicalPlan.initPartitioning(PhysicalPlan.java:755) > at > com.datatorrent.stram.plan.physical.PhysicalPlan.addLogicalOperator(PhysicalPlan.java:1676) > at > com.datatorrent.stram.plan.physical.PhysicalPlan.(PhysicalPlan.java:378) > at > com.datatorrent.stram.StreamingContainerManager.(StreamingContainerManager.java:418) > at > com.datatorrent.stram.StreamingContainerManager.(StreamingContainerManager.java:406) > at > com.datatorrent.stram.StramLocalCluster.(StramLocalCluster.java:299) > at > com.datatorrent.stram.LocalModeImpl.getController(LocalModeImpl.java:76) > ... 24 more -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (APEXMALHAR-2227) Error while connecting with Kafka using Apache Apex
[ https://issues.apache.org/jira/browse/APEXMALHAR-2227?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15470994#comment-15470994 ] Chandni Singh commented on APEXMALHAR-2227: --- This was caused by recent changes to WindowDataManager. Will create a pull request to fix it. > Error while connecting with Kafka using Apache Apex > --- > > Key: APEXMALHAR-2227 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2227 > Project: Apache Apex Malhar > Issue Type: Bug >Affects Versions: 3.4.0 >Reporter: Rambrij Chauhan >Assignee: Chandni Singh >Priority: Blocker > > Hi, > I am working on Apache Apex and trying to implement KafkaInputOperator > provided by apex-malhar but I am hitting with following exception as soon as > i am running the application:- > Strange thing is, the same application working for other zookeeper ip but not > for that IP i need it to work so i went and check the version of respective > zookeeper and found that, the one which is working belongs to 0.8.1.1 and > other one is 0.8.2.0. Could you kindly help me out for some work arround, i > really need to work on with the IP which belongs to 0.8.2.0. > java.lang.RuntimeException: Error creating local cluster > at > com.datatorrent.stram.LocalModeImpl.getController(LocalModeImpl.java:78) > at > org.capitalone.apex.main.ApplicationTest.testSomeMethod(ApplicationTest.java:14) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229) > at org.junit.runners.ParentRunner.run(ParentRunner.java:309) > at > org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:86) > at > org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38) > at > org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:459) > at > org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:678) > at > org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:382) > at > org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:192) > Caused by: java.lang.IllegalStateException: Partitioner returns null or empty. > at > com.datatorrent.stram.plan.physical.PhysicalPlan.initPartitioning(PhysicalPlan.java:755) > at > com.datatorrent.stram.plan.physical.PhysicalPlan.addLogicalOperator(PhysicalPlan.java:1676) > at > com.datatorrent.stram.plan.physical.PhysicalPlan.(PhysicalPlan.java:378) > at > com.datatorrent.stram.StreamingContainerManager.(StreamingContainerManager.java:418) > at > com.datatorrent.stram.StreamingContainerManager.(StreamingContainerManager.java:406) > at > com.datatorrent.stram.StramLocalCluster.(StramLocalCluster.java:299) > at > com.datatorrent.stram.LocalModeImpl.getController(LocalModeImpl.java:76) > ... 24 more -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (APEXMALHAR-2227) Error while connecting with Kafka using Apache Apex
[ https://issues.apache.org/jira/browse/APEXMALHAR-2227?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chandni Singh reassigned APEXMALHAR-2227: - Assignee: Chandni Singh > Error while connecting with Kafka using Apache Apex > --- > > Key: APEXMALHAR-2227 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2227 > Project: Apache Apex Malhar > Issue Type: Bug >Affects Versions: 3.4.0 >Reporter: Rambrij Chauhan >Assignee: Chandni Singh >Priority: Blocker > > Hi, > I am working on Apache Apex and trying to implement KafkaInputOperator > provided by apex-malhar but I am hitting with following exception as soon as > i am running the application:- > Strange thing is, the same application working for other zookeeper ip but not > for that IP i need it to work so i went and check the version of respective > zookeeper and found that, the one which is working belongs to 0.8.1.1 and > other one is 0.8.2.0. Could you kindly help me out for some work arround, i > really need to work on with the IP which belongs to 0.8.2.0. > java.lang.RuntimeException: Error creating local cluster > at > com.datatorrent.stram.LocalModeImpl.getController(LocalModeImpl.java:78) > at > org.capitalone.apex.main.ApplicationTest.testSomeMethod(ApplicationTest.java:14) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229) > at org.junit.runners.ParentRunner.run(ParentRunner.java:309) > at > org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:86) > at > org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38) > at > org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:459) > at > org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:678) > at > org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:382) > at > org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:192) > Caused by: java.lang.IllegalStateException: Partitioner returns null or empty. > at > com.datatorrent.stram.plan.physical.PhysicalPlan.initPartitioning(PhysicalPlan.java:755) > at > com.datatorrent.stram.plan.physical.PhysicalPlan.addLogicalOperator(PhysicalPlan.java:1676) > at > com.datatorrent.stram.plan.physical.PhysicalPlan.(PhysicalPlan.java:378) > at > com.datatorrent.stram.StreamingContainerManager.(StreamingContainerManager.java:418) > at > com.datatorrent.stram.StreamingContainerManager.(StreamingContainerManager.java:406) > at > com.datatorrent.stram.StramLocalCluster.(StramLocalCluster.java:299) > at > com.datatorrent.stram.LocalModeImpl.getController(LocalModeImpl.java:76) > ... 24 more -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (APEXMALHAR-2223) Managed state should parallelize WAL writes
[ https://issues.apache.org/jira/browse/APEXMALHAR-2223?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15468563#comment-15468563 ] Chandni Singh commented on APEXMALHAR-2223: --- A possible approach to address this: - Have a property in ManagedState called {code}writeBufferThreshold{code}. When a bucket size crosses this threshold, then the bucket is eligible for writing. - The writing to WAL of eligible buckets is done at the end of every application window() in {code}endWindow(){code} callback. With this approach there are fewer changes where data is still divided into windows when written to the WAL. > Managed state should parallelize WAL writes > --- > > Key: APEXMALHAR-2223 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2223 > Project: Apache Apex Malhar > Issue Type: Improvement >Affects Versions: 3.4.0 >Reporter: Thomas Weise >Assignee: Chandni Singh > > Currently, data is accumulated in memory and written to the WAL on checkpoint > only. This causes a write spike on checkpoint and does not utilize the HDFS > write pipeline. The other extreme is writing to the WAL as soon as data > arrives and then only flush in beforeCheckpoint. The downside of this is that > when the same key is written many times, all duplicates will be in the WAL. > Need to find a balances approach, that the user can potentially fine tune. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (APEXMALHAR-2223) Managed state should parallelize WAL writes
[ https://issues.apache.org/jira/browse/APEXMALHAR-2223?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chandni Singh reassigned APEXMALHAR-2223: - Assignee: Chandni Singh > Managed state should parallelize WAL writes > --- > > Key: APEXMALHAR-2223 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2223 > Project: Apache Apex Malhar > Issue Type: Improvement >Affects Versions: 3.4.0 >Reporter: Thomas Weise >Assignee: Chandni Singh > > Currently, data is accumulated in memory and written to the WAL on checkpoint > only. This causes a write spike on checkpoint and does not utilize the HDFS > write pipeline. The other extreme is writing to the WAL as soon as data > arrives and then only flush in beforeCheckpoint. The downside of this is that > when the same key is written many times, all duplicates will be in the WAL. > Need to find a balances approach, that the user can potentially fine tune. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (APEXMALHAR-2130) implement scalable windowed storage
[ https://issues.apache.org/jira/browse/APEXMALHAR-2130?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15447840#comment-15447840 ] Chandni Singh edited comment on APEXMALHAR-2130 at 8/30/16 3:04 AM: Note: The main change in ManagedState which is required here is that timeBuckets (Window time in your example) is now computed outside ManagedState. TimeBuckets were being computed by TimeBucketAssigner within ManagedState but now it will be provided to it. Since event time is arbitrary, unlike processing time, the actual key representing the timebucket cannot be assumed a natural sequence. However, TimeBucketAssigner.getTimeBucketAndAdjustBoundaries seems to return a long that is sequential starting from 0. We want to make the actual timebucket key based on the actual event window timestamp. Chandni Singh Will this break anything? Answer: No it will not break anything. The time here is event time and this does NOT assume that events are received in order. Based on event time, this method creates timebucket. In your use case, the time bucket is computed outside ManagedState so there are 2 ways to approach it: - create a special TimeBucketAssigner which will just return the input Window for the event. It will not further compute timebucket. - make TimeBucketAssigner an optional property in AbstractManagedStateImpl. If it is null, then the time argument is used as timebucket save in Bucket. >>> Expiring and purging are done very differently and should be based on 3. Managed State should determine whether to purge a timebucket based on whether an Apex window is committed and whether all event windows that belong to that timebucket are marked "deleted" for that Apex window. Answer: This is handled by TimeBucketAssigner again. I don't think much change is needed here. TimeBucketAssigner computes a timeBucket (in your case, this corresponds to Window time) and checks if the oldest buckets need to be purged (line 132 - 133). It figures out the lowest purgeable timebucket. In the endWindow, it informs IncrementalCheckpointManager, that it can delete all the timebuckets<=lowestPurgeableTimeBucket. However, IncrementalCheckpointManager deletes the data up to that timebucket only when the window in which it was request to be purged gets committed. So this will remain the same for you as well. I think this can also by achieved by creating a special TimeBucketAssigner and overriding a few methods. was (Author: csingh): Note: The main change in ManagedState which is required here is that timeBuckets (Window time in your example) is now computed outside ManagedState. TimeBuckets were being computed by TimeBucketAssigner within ManagedState but now it will be provided to it. Since event time is arbitrary, unlike processing time, the actual key representing the timebucket cannot be assumed a natural sequence. However, TimeBucketAssigner.getTimeBucketAndAdjustBoundaries seems to return a long that is sequential starting from 0. We want to make the actual timebucket key based on the actual event window timestamp. Chandni Singh Will this break anything? Answer: No it will not break anything. The time here is event time and this does NOT assume that events are received in order. Based on event time, this method creates timebucket. In your use case, the time bucket is computed outside ManagedState so there are 2 ways to approach it: - create a special TimeBucketAssigner which will just return the input Window for the event. It will not further compute timebucket. - make TimeBucketAssigner an optional property in AbstractManagedStateImpl. If it is null, then the time argument is used as timebucket save in Bucket. Expiring and purging are done very differently and should be based on 3. Managed State should determine whether to purge a timebucket based on whether an Apex window is committed and whether all event windows that belong to that timebucket are marked "deleted" for that Apex window. Answer: This is handled by TimeBucketAssigner again. I don't think much change is needed here. TimeBucketAssigner computes a timeBucket (in your case, this corresponds to Window time) and checks if the oldest buckets need to be purged (line 132 - 133). It figures out the lowest purgeable timebucket. In the endWindow, it informs IncrementalCheckpointManager, that it can delete all the timebuckets<=lowestPurgeableTimeBucket. However, IncrementalCheckpointManager deletes the data up to that timebucket only when the window in which it was request to be purged gets committed. So this will remain the same for you as well. I think this can also by achieved by creating a special TimeBucketAssigner and overriding a few methods. > implement scalable windowed storage > --- > > Key: APEXMALHAR-2130 > URL:
[jira] [Commented] (APEXMALHAR-2130) implement scalable windowed storage
[ https://issues.apache.org/jira/browse/APEXMALHAR-2130?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15447840#comment-15447840 ] Chandni Singh commented on APEXMALHAR-2130: --- Note: The main change in ManagedState which is required here is that timeBuckets (Window time in your example) is now computed outside ManagedState. TimeBuckets were being computed by TimeBucketAssigner within ManagedState but now it will be provided to it. Since event time is arbitrary, unlike processing time, the actual key representing the timebucket cannot be assumed a natural sequence. However, TimeBucketAssigner.getTimeBucketAndAdjustBoundaries seems to return a long that is sequential starting from 0. We want to make the actual timebucket key based on the actual event window timestamp. Chandni Singh Will this break anything? Answer: No it will not break anything. The time here is event time and this does NOT assume that events are received in order. Based on event time, this method creates timebucket. In your use case, the time bucket is computed outside ManagedState so there are 2 ways to approach it: - create a special TimeBucketAssigner which will just return the input Window for the event. It will not further compute timebucket. - make TimeBucketAssigner an optional property in AbstractManagedStateImpl. If it is null, then the time argument is used as timebucket save in Bucket. Expiring and purging are done very differently and should be based on 3. Managed State should determine whether to purge a timebucket based on whether an Apex window is committed and whether all event windows that belong to that timebucket are marked "deleted" for that Apex window. Answer: This is handled by TimeBucketAssigner again. I don't think much change is needed here. TimeBucketAssigner computes a timeBucket (in your case, this corresponds to Window time) and checks if the oldest buckets need to be purged (line 132 - 133). It figures out the lowest purgeable timebucket. In the endWindow, it informs IncrementalCheckpointManager, that it can delete all the timebuckets<=lowestPurgeableTimeBucket. However, IncrementalCheckpointManager deletes the data up to that timebucket only when the window in which it was request to be purged gets committed. So this will remain the same for you as well. I think this can also by achieved by creating a special TimeBucketAssigner and overriding a few methods. > implement scalable windowed storage > --- > > Key: APEXMALHAR-2130 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2130 > Project: Apache Apex Malhar > Issue Type: Task >Reporter: bright chen >Assignee: David Yan > > This feature is used for supporting windowing. > The storage needs to have the following features: > 1. Spillable key value storage (integrate with APEXMALHAR-2026) > 2. Upon checkpoint, it saves a snapshot for the entire data set with the > checkpointing window id. This should be done incrementally (ManagedState) to > avoid wasting space with unchanged data > 3. When recovering, it takes the recovery window id and restores to that > snapshot > 4. When a window is committed, all windows with a lower ID should be purged > from the store. > 5. It should implement the WindowedStorage and WindowedKeyedStorage > interfaces, and because of 2 and 3, we may want to add methods to the > WindowedStorage interface so that the implementation of WindowedOperator can > notify the storage of checkpointing, recovering and committing of a window. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (APEXCORE-448) Make operator name available in OperatorContext
[ https://issues.apache.org/jira/browse/APEXCORE-448?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chandni Singh reassigned APEXCORE-448: -- Assignee: Chandni Singh > Make operator name available in OperatorContext > --- > > Key: APEXCORE-448 > URL: https://issues.apache.org/jira/browse/APEXCORE-448 > Project: Apache Apex Core > Issue Type: Improvement >Reporter: Chandni Singh >Assignee: Chandni Singh > > Need name of the logical operator in the OperatorContext which can be used by > WindowDataManager to create a unique path per logical operator . -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (APEXMALHAR-2129) ManagedState: Add a disable purging option
[ https://issues.apache.org/jira/browse/APEXMALHAR-2129?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chandni Singh updated APEXMALHAR-2129: -- Description: Have an option that can disable purging of data. Currently the TimeBucketAssigner moves time boundary periodically based on system time as well as based on event time. This is error prone and moving time boundary periodically implies that data will always be purged. The change that needs to be made to TimeBucketAssigner is to just move time boundary when there are future events that don't fall in that boundary. This will imply that if there are no events outside the current boundary then data will not be purged. ManagedStateImpl uses processing time for an event, so moving boundaries based on system time is going to happen there nevertheless. was: Have an option that can disable purging of data. Currently the TimeBucketAssigner moves time boundary periodically based on system time as well as based on event time. This is error prone and moving time boundary periodically implies that data will always be purged. The change that needs to be made to TimeBucketAssigner is to just move time boundary when there are future events that don't fall in that boundary. This will imply that if there are no events outside the current boundary then data will not be purged. > ManagedState: Add a disable purging option > -- > > Key: APEXMALHAR-2129 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2129 > Project: Apache Apex Malhar > Issue Type: Task >Reporter: Bhupesh Chawda >Assignee: Chandni Singh > > Have an option that can disable purging of data. > Currently the TimeBucketAssigner moves time boundary periodically based on > system time as well as based on event time. This is error prone and moving > time boundary periodically implies that data will always be purged. > The change that needs to be made to TimeBucketAssigner is to just move time > boundary when there are future events that don't fall in that boundary. This > will imply that if there are no events outside the current boundary then data > will not be purged. > ManagedStateImpl uses processing time for an event, so moving boundaries > based on system time is going to happen there nevertheless. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (APEXMALHAR-2129) ManagedState: Add a disable purging option
[ https://issues.apache.org/jira/browse/APEXMALHAR-2129?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chandni Singh updated APEXMALHAR-2129: -- Description: Have an option that can disable purging of data. Currently the TimeBucketAssigner moves time boundary periodically based on system time as well as based on event time. This is error prone and moving time boundary periodically implies that data will always be purged. The change that needs to be made to TimeBucketAssigner is to just move time boundary when there are future events that don't fall in that boundary. This will imply that if there are no events outside the current boundary then data will not be purged. was:Have an option that can disable purging of data. > ManagedState: Add a disable purging option > -- > > Key: APEXMALHAR-2129 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2129 > Project: Apache Apex Malhar > Issue Type: Task >Reporter: Bhupesh Chawda >Assignee: Chandni Singh > > Have an option that can disable purging of data. > Currently the TimeBucketAssigner moves time boundary periodically based on > system time as well as based on event time. This is error prone and moving > time boundary periodically implies that data will always be purged. > The change that needs to be made to TimeBucketAssigner is to just move time > boundary when there are future events that don't fall in that boundary. This > will imply that if there are no events outside the current boundary then data > will not be purged. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (APEXMALHAR-2129) ManagedState: Add a disable purging option
[ https://issues.apache.org/jira/browse/APEXMALHAR-2129?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15415717#comment-15415717 ] Chandni Singh edited comment on APEXMALHAR-2129 at 8/10/16 6:47 PM: [~bhup...@apache.org] In TimeBucketAssigner, {code}expiryTask{code} is a task that runs repeatedly to purge data. It is not a different feature from purging. However, I will remove this task. Right now purging is happening with a mix of event + system time which is likely to be error prone. It should be only triggered by events. If all the events fall between the start and end time, then nothing will be purged. However if we see events outside this boundary, then older time-buckets will be purged. was (Author: csingh): [~bhup...@apache.org] In TimeBucketAssigner, {code}expiryTask{code} is a task that runs repeatedly to purge data. It is not a different feature from purging. I will rename the task, to avoid the confusion. When purging is disabled, then you store and dedup across the entire data set. > ManagedState: Add a disable purging option > -- > > Key: APEXMALHAR-2129 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2129 > Project: Apache Apex Malhar > Issue Type: Task >Reporter: Bhupesh Chawda >Assignee: Chandni Singh > > Have an option that can disable purging of data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (APEXMALHAR-2129) ManagedState: Add a disable purging option
[ https://issues.apache.org/jira/browse/APEXMALHAR-2129?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15415717#comment-15415717 ] Chandni Singh commented on APEXMALHAR-2129: --- [~bhup...@apache.org] In TimeBucketAssigner, {code}expiryTask{code} is a task that runs repeatedly to purge data. It is not a different feature from purging. I will rename the task, to avoid the confusion. When purging is disabled, then you store and dedup across the entire data set. > ManagedState: Add a disable purging option > -- > > Key: APEXMALHAR-2129 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2129 > Project: Apache Apex Malhar > Issue Type: Task >Reporter: Bhupesh Chawda >Assignee: Chandni Singh > > Have an option that can disable purging of data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (APEXMALHAR-2129) ManagedState: Add a disable purging option
[ https://issues.apache.org/jira/browse/APEXMALHAR-2129?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chandni Singh reassigned APEXMALHAR-2129: - Assignee: Chandni Singh > ManagedState: Add a disable purging option > -- > > Key: APEXMALHAR-2129 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2129 > Project: Apache Apex Malhar > Issue Type: Task >Reporter: Bhupesh Chawda >Assignee: Chandni Singh > > Have an option that can disable purging of data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (APEXMALHAR-2129) ManagedState: Add a disable purging option
[ https://issues.apache.org/jira/browse/APEXMALHAR-2129?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chandni Singh updated APEXMALHAR-2129: -- Assignee: (was: Chandni Singh) > ManagedState: Add a disable purging option > -- > > Key: APEXMALHAR-2129 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2129 > Project: Apache Apex Malhar > Issue Type: Task >Reporter: Bhupesh Chawda > > Have an option that can disable purging of data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (APEXMALHAR-2071) Add the ability to remove a key to ManagedState
[ https://issues.apache.org/jira/browse/APEXMALHAR-2071?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chandni Singh updated APEXMALHAR-2071: -- Assignee: (was: Chandni Singh) > Add the ability to remove a key to ManagedState > --- > > Key: APEXMALHAR-2071 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2071 > Project: Apache Apex Malhar > Issue Type: New Feature >Reporter: Timothy Farkas > > This is required for Spillable Data Structures -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (APEXMALHAR-2101) RuntimeException from Default Bucket which is under managed state.
[ https://issues.apache.org/jira/browse/APEXMALHAR-2101?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chandni Singh updated APEXMALHAR-2101: -- Assignee: (was: Chandni Singh) > RuntimeException from Default Bucket which is under managed state. > -- > > Key: APEXMALHAR-2101 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2101 > Project: Apache Apex Malhar > Issue Type: Bug >Reporter: Chaitanya > > Getting the following exception, while using the ManagedTimeStateImpl > operator: > 2016-05-24 15:42:48,813 ERROR > com.datatorrent.stram.engine.StreamingContainer: Operator set > [OperatorDeployInfo[id=4,name=join,type=GENERIC,checkpoint={, > 0, > 0},inputs=[OperatorDeployInfo.InputDeployInfo[portName=input,streamId=GenToJoin,sourceNodeId=2,sourcePortName=outputPort,locality=,partitionMask=1,partitionKeys=[1]], > > OperatorDeployInfo.InputDeployInfo[portName=product,streamId=ProductToJoin,sourceNodeId=1,sourcePortName=outputPort,locality=,partitionMask=0,partitionKeys=]],outputs=[OperatorDeployInfo.OutputDeployInfo[portName=output,streamId=JoinToConsole,bufferServer=chaitanya-HP-ENVY-15-Notebook-PC > stopped running due to an exception. > java.lang.RuntimeException: while loading 0, 12 > at > org.apache.apex.malhar.lib.state.managed.Bucket$DefaultBucket.getValueFromTimeBucketReader(Bucket.java:346) > at > org.apache.apex.malhar.lib.state.managed.Bucket$DefaultBucket.getFromReaders(Bucket.java:291) > at > org.apache.apex.malhar.lib.state.managed.Bucket$DefaultBucket.get(Bucket.java:323) > at > org.apache.apex.malhar.lib.state.managed.AbstractManagedStateImpl.getValueFromBucketSync(AbstractManagedStateImpl.java:288) > at > org.apache.apex.malhar.lib.state.managed.ManagedTimeStateImpl.getSync(ManagedTimeStateImpl.java:56) > at > com.examples.app.ManagedStateIntOperator.get(ManagedStateIntOperator.java:91) > at > com.examples.app.ManagedStateIntOperator.processTuple(ManagedStateIntOperator.java:80) > at > com.examples.app.ManagedStateIntOperator$2.process(ManagedStateIntOperator.java:73) > at > com.examples.app.ManagedStateIntOperator$2.process(ManagedStateIntOperator.java:68) > at com.datatorrent.api.DefaultInputPort.put(DefaultInputPort.java:79) > at > com.datatorrent.stram.stream.BufferServerSubscriber$BufferReservoir.sweep(BufferServerSubscriber.java:280) > at com.datatorrent.stram.engine.GenericNode.run(GenericNode.java:259) > at > com.datatorrent.stram.engine.StreamingContainer$2.run(StreamingContainer.java:1393) > Caused by: java.io.EOFException: Cannot seek after EOF > at > org.apache.hadoop.hdfs.DFSInputStream.seek(DFSInputStream.java:1378) > at > org.apache.hadoop.fs.FSDataInputStream.seek(FSDataInputStream.java:62) > at > org.apache.hadoop.io.file.tfile.DTBCFile$Reader.(DTBCFile.java:674) > at > org.apache.hadoop.io.file.tfile.DTFile$Reader.(DTFile.java:827) > at > com.datatorrent.lib.fileaccess.DTFileReader.(DTFileReader.java:55) > at > com.datatorrent.lib.fileaccess.TFileImpl$DTFileImpl.getReader(TFileImpl.java:173) > at > org.apache.apex.malhar.lib.state.managed.BucketsFileSystem.getReader(BucketsFileSystem.java:96) > at > org.apache.apex.malhar.lib.state.managed.Bucket$DefaultBucket.loadFileReader(Bucket.java:371) > at > org.apache.apex.malhar.lib.state.managed.Bucket$DefaultBucket.getValueFromTimeBucketReader(Bucket.java:341) > ... 12 more -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (APEXCORE-448) Make operator name available in OperatorContext
[ https://issues.apache.org/jira/browse/APEXCORE-448?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chandni Singh updated APEXCORE-448: --- Assignee: (was: Chandni Singh) > Make operator name available in OperatorContext > --- > > Key: APEXCORE-448 > URL: https://issues.apache.org/jira/browse/APEXCORE-448 > Project: Apache Apex Core > Issue Type: Improvement >Reporter: Chandni Singh > > Need name of the logical operator in the OperatorContext which can be used by > WindowDataManager to create a unique path per logical operator . -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (APEXMALHAR-2063) Integrate WAL to FS WindowDataManager
[ https://issues.apache.org/jira/browse/APEXMALHAR-2063?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chandni Singh updated APEXMALHAR-2063: -- Assignee: (was: Chandni Singh) > Integrate WAL to FS WindowDataManager > - > > Key: APEXMALHAR-2063 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2063 > Project: Apache Apex Malhar > Issue Type: Improvement >Reporter: Chandni Singh > > FS Window Data Manager is used to save meta-data that helps in replaying > tuples every completed application window after failure. For this it saves > meta-data in a file per window. Having multiple small size files on hdfs > cause issues as highlighted here: > http://blog.cloudera.com/blog/2009/02/the-small-files-problem/ > Instead FS Window Data Manager can utilize the WAL to write data and maintain > a mapping of how much data was flushed to WAL each window. > In order to use FileSystemWAL for replaying data of a finished window, there > are few changes made to FileSystemWAL this is because of following: > 1. WindowDataManager needs to reply data of every finished window. This > window may not be checkpointed. > FileSystemWAL truncates the WAL file to the checkpointed point after recovery > so this poses a problem. > WindowDataManager should be able to control recovery of FileSystemWAL. > 2. FileSystemWAL writes to temporary files. The mapping of temp files to > actual file is part of its state which is checkpointed. Since > WindowDataManager replays data of a window not yet checkpointed, it needs to > know the actual temporary file the data is being persisted to. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (APEXMALHAR-2068) FileSplitter- having multiple file splitters in an application results in incorrect behavior
[ https://issues.apache.org/jira/browse/APEXMALHAR-2068?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chandni Singh updated APEXMALHAR-2068: -- Assignee: (was: Chandni Singh) > FileSplitter- having multiple file splitters in an application results in > incorrect behavior > > > Key: APEXMALHAR-2068 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2068 > Project: Apache Apex Malhar > Issue Type: Bug >Reporter: Chandni Singh > > If an application has multiple logical operators that use Window Data > Manager, then by default they share the same state. This is confusing and > cause issues. > We need to make this path unique by default for each logical instance. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (APEXCORE-169) instantiating DTLoggerFactory during test causes incorrect logging behavior
[ https://issues.apache.org/jira/browse/APEXCORE-169?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chandni Singh updated APEXCORE-169: --- Assignee: (was: Chandni Singh) > instantiating DTLoggerFactory during test causes incorrect logging behavior > --- > > Key: APEXCORE-169 > URL: https://issues.apache.org/jira/browse/APEXCORE-169 > Project: Apache Apex Core > Issue Type: Bug >Reporter: Vlad Rozov > > After DTLoggerFactory is instantiated in maven build, log levels in > log4j.properties are ignored. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (APEXMALHAR-2005) FileSplitterInput: `paths` field in TimeBasedDirectoryScanner is not validated unless scanner is marked as @Valid
[ https://issues.apache.org/jira/browse/APEXMALHAR-2005?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chandni Singh updated APEXMALHAR-2005: -- Assignee: (was: Chandni Singh) > FileSplitterInput: `paths` field in TimeBasedDirectoryScanner is not > validated unless scanner is marked as @Valid > - > > Key: APEXMALHAR-2005 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2005 > Project: Apache Apex Malhar > Issue Type: Bug >Reporter: Chandni Singh > > I created a DAG : > FileSplitterInput -> BlockReader -> Devnull > Splitter has a property TimeBasedDirectoryScanner which has a property > `files` that is a Set of Strings. > I have marked that property with {code} @Size(min = 1) {code}. However even > when the set is empty, dag validation passes. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (APEXMALHAR-1967) JDBC Store setting connection properties from config file is broken
[ https://issues.apache.org/jira/browse/APEXMALHAR-1967?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chandni Singh updated APEXMALHAR-1967: -- Assignee: (was: Chandni Singh) > JDBC Store setting connection properties from config file is broken > --- > > Key: APEXMALHAR-1967 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-1967 > Project: Apache Apex Malhar > Issue Type: Bug >Reporter: Chandni Singh > > JdbcStore has connectionProperties field which is of type Properties. Setting > the value of this field using setters provided- > setConnectionProperties(String ...) and > setConnectionProperties(Properties ...) > is broken. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (APEXMALHAR-2035) ManagedState use case- Add a deduper to Malhar which is backed by ManagedState
[ https://issues.apache.org/jira/browse/APEXMALHAR-2035?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chandni Singh updated APEXMALHAR-2035: -- Assignee: (was: Chandni Singh) > ManagedState use case- Add a deduper to Malhar which is backed by ManagedState > -- > > Key: APEXMALHAR-2035 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2035 > Project: Apache Apex Malhar > Issue Type: New Feature >Reporter: Chandni Singh > > Managed State is added to Malhar which handles incremental checkpointing of > key/value state. This can be used to create a de-duplicator in Malhar > library. > The de-duper may receive events out of order and if the event is still > relevant (not older than configured time) it should find out if the event has > been seen before. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (APEXMALHAR-1852) File Splitter Test Failing On My Machine
[ https://issues.apache.org/jira/browse/APEXMALHAR-1852?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chandni Singh updated APEXMALHAR-1852: -- Assignee: (was: Chandni Singh) > File Splitter Test Failing On My Machine > > > Key: APEXMALHAR-1852 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-1852 > Project: Apache Apex Malhar > Issue Type: Bug >Reporter: Timothy Farkas > > FileSplitterInputTest.testRecoveryOfPartialFile:408 New file expected:<1> but > was:<0> -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (APEXMALHAR-1848) Block Reader doesn't honor value for minReaders
[ https://issues.apache.org/jira/browse/APEXMALHAR-1848?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chandni Singh updated APEXMALHAR-1848: -- Assignee: (was: Chandni Singh) > Block Reader doesn't honor value for minReaders > --- > > Key: APEXMALHAR-1848 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-1848 > Project: Apache Apex Malhar > Issue Type: Bug >Reporter: gaurav gupta > > Block reader has a property minReaders. Operator always returns one instance > initially irrespective of value of minReaders. minReaders is only used during > the dynamic partition. > I think operator should return instances equal to atleast minReaders. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (APEXCORE-21) Add capability to specify collection phase of AutoMetric
[ https://issues.apache.org/jira/browse/APEXCORE-21?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chandni Singh updated APEXCORE-21: -- Assignee: (was: Chandni Singh) > Add capability to specify collection phase of AutoMetric > > > Key: APEXCORE-21 > URL: https://issues.apache.org/jira/browse/APEXCORE-21 > Project: Apache Apex Core > Issue Type: Sub-task >Reporter: Chandni Singh > > Along with this also add phase for the Autometric. Right off the bat there > are 3 phases - end of streaming window, end of application window, and custom > (the last one needs to be thought through) but while we are at it, we could > also add beginning of the application window, begin of the streaming window. > The work may not be done to support all those but it ensures that there is a > forward compatibility. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (APEXCORE-69) Improve StatsListener api so that using it with multiple logical operators is straightforward
[ https://issues.apache.org/jira/browse/APEXCORE-69?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chandni Singh updated APEXCORE-69: -- Assignee: (was: Chandni Singh) > Improve StatsListener api so that using it with multiple logical operators is > straightforward > - > > Key: APEXCORE-69 > URL: https://issues.apache.org/jira/browse/APEXCORE-69 > Project: Apache Apex Core > Issue Type: Improvement >Reporter: Chandni Singh > > We have discussed theories about using a stats listener on multiple logical > operators- specifically using stats from downstream operator to slow down > upstream operator. > While trying to implement such a StatsListener the processStats() methods > becomes unwieldy which is mainly because there is no effortless way of > knowing of which logical operator the stats are. BatchedOperatorStats > contains operator id but that is assigned at the runtime (not known at the > time of writing code). > In past I have used counters to figure out the logical operator but that is a > hack. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (APEXMALHAR-1773) Organize the plugins in Malhar lib and contrib
[ https://issues.apache.org/jira/browse/APEXMALHAR-1773?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chandni Singh updated APEXMALHAR-1773: -- Assignee: (was: Chandni Singh) > Organize the plugins in Malhar lib and contrib > -- > > Key: APEXMALHAR-1773 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-1773 > Project: Apache Apex Malhar > Issue Type: Improvement >Reporter: Chandni Singh > > Need PluginManagement in Malhar where common plugins can be declared. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (APEXMALHAR-1600) Investigate if we can seek efficiently to the correct location when we restart after failure for ftp and fs scan operators
[ https://issues.apache.org/jira/browse/APEXMALHAR-1600?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chandni Singh updated APEXMALHAR-1600: -- Assignee: (was: Chandni Singh) > Investigate if we can seek efficiently to the correct location when we > restart after failure for ftp and fs scan operators > -- > > Key: APEXMALHAR-1600 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-1600 > Project: Apache Apex Malhar > Issue Type: Improvement >Reporter: Pramod Immaneni > > Currently when the fs scan operator fails and restarts it seeks to the > starting location by asking for the previous entries and discarding them. Can > this be done in a more efficient manner in a case-by-case basis, for example > a sub class of the operator can keep a count of the file read offset after > returning an entry and that can be used to seek directly to checkpointed > offset. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (APEXMALHAR-1701) Deduper : create a deduper backed by Managed State
[ https://issues.apache.org/jira/browse/APEXMALHAR-1701?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chandni Singh updated APEXMALHAR-1701: -- Assignee: (was: Chandni Singh) > Deduper : create a deduper backed by Managed State > -- > > Key: APEXMALHAR-1701 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-1701 > Project: Apache Apex Malhar > Issue Type: Task > Components: algorithms >Reporter: Chandni Singh > > Need a de-deduplicator operator that is based on Managed State. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (APEXMALHAR-722) Investigate re-partitioning of transactional database output adaptor
[ https://issues.apache.org/jira/browse/APEXMALHAR-722?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chandni Singh updated APEXMALHAR-722: - Assignee: (was: Chandni Singh) > Investigate re-partitioning of transactional database output adaptor > > > Key: APEXMALHAR-722 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-722 > Project: Apache Apex Malhar > Issue Type: Improvement > Components: adapters database >Reporter: Chandni Singh > > Currently the transactional database output adaptors use operator id to > persist last committed window. With this approach the operators cannot be > re-partitioned. > We need to find a way to make these operators dynamically partition-able. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (APEXMALHAR-1843) Split Malhar Library and Malhar Contrib package into baby packages
[ https://issues.apache.org/jira/browse/APEXMALHAR-1843?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chandni Singh updated APEXMALHAR-1843: -- Assignee: (was: Chandni Singh) > Split Malhar Library and Malhar Contrib package into baby packages > -- > > Key: APEXMALHAR-1843 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-1843 > Project: Apache Apex Malhar > Issue Type: Improvement >Reporter: Chetan Narsude >Priority: Critical > Labels: roadmap > > [~andyp] I am assigning this to you cause you are the one who first said it. > So either you lead it or find a willing lead to get this task to completion. > The problem with contrib and library modules of malhar is that a ton of > dependencies are prescribed as optional. The motive behind it was that the > users of these libraries are given an opportunity to keep the size of the > dependency-included packages to bare minimum. It comes at a cost that the > dependency now has to be manually figured out. This is a complete misuse of > the optional dependency, IMO. It defeats the purpose of maven having > dependency management as one of the biggest features of it. > So keep things sane - the proposed compromise is that we start creating > smaller discreet packages for discrete technologies. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (APEXMALHAR-2044) Exception in AbstractFileOutputOperator during write.
[ https://issues.apache.org/jira/browse/APEXMALHAR-2044?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chandni Singh updated APEXMALHAR-2044: -- Assignee: (was: Chandni Singh) > Exception in AbstractFileOutputOperator during write. > - > > Key: APEXMALHAR-2044 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2044 > Project: Apache Apex Malhar > Issue Type: Bug >Reporter: Tushar Gosavi > > Sometimes container gets killed because of following error, this happens > during flush or write. > Stopped running due to an exception. java.lang.RuntimeException: > java.nio.channels.ClosedChannelException > at > com.datatorrent.lib.io.fs.AbstractFileOutputOperator.processTuple(AbstractFileOutputOperator.java:823) > at > com.directv.sms.common.CsvOutputOperator.processTuple(CsvOutputOperator.java:70) > at > com.datatorrent.lib.io.fs.AbstractFileOutputOperator$1.process(AbstractFileOutputOperator.java:271) > at > com.datatorrent.api.DefaultInputPort.put(DefaultInputPort.java:70) > at > com.datatorrent.stram.engine.DefaultReservoir.sweep(DefaultReservoir.java:64) > at > com.datatorrent.stram.engine.GenericNode.run(GenericNode.java:229) > at > com.datatorrent.stram.engine.StreamingContainer$2.run(StreamingContainer.java:1386) > Caused by: java.nio.channels.ClosedChannelException > at > org.apache.hadoop.hdfs.DFSOutputStream.checkClosed(DFSOutputStream.java:1635) > at > org.apache.hadoop.fs.FSOutputSummer.write(FSOutputSummer.java:104) > at > org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:58) > at java.io.DataOutputStream.write(DataOutputStream.java:107) > at > java.io.FilterOutputStream.write(FilterOutputStream.java:97) > at > com.datatorrent.lib.io.fs.AbstractFileOutputOperator.processTuple(AbstractFileOutputOperator.java:79 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (APEXMALHAR-2136) Null pointer exception in AbstractManagedStateImpl
[ https://issues.apache.org/jira/browse/APEXMALHAR-2136?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chandni Singh resolved APEXMALHAR-2136. --- Resolution: Fixed Fix Version/s: 3.5.0 > Null pointer exception in AbstractManagedStateImpl > -- > > Key: APEXMALHAR-2136 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2136 > Project: Apache Apex Malhar > Issue Type: Bug >Reporter: Chaitanya >Assignee: Chaitanya > Fix For: 3.5.0 > > > Null pointed exception in AbstractManagedStateImpl during recovering the > state: > java.lang.NullPointerException > at > org.apache.apex.malhar.lib.state.managed.AbstractManagedStateImpl.setup(AbstractManagedStateImpl.java:206) > at > com.examples.app.ManagedTimeStateMImpl.setup(ManagedTimeStateMImpl.java:15) > at > com.examples.app.ManagedStateIntOperator.setup(ManagedStateIntOperator.java:138) > at > com.examples.app.ManagedStateIntOperator.setup(ManagedStateIntOperator.java:26) > at com.datatorrent.stram.engine.Node.setup(Node.java:187) > at > com.datatorrent.stram.engine.StreamingContainer.setupNode(StreamingContainer.java:1309) > at > com.datatorrent.stram.engine.StreamingContainer.access$100(StreamingContainer.java:130) > at > com.datatorrent.stram.engine.StreamingContainer$2.run(StreamingContainer.java:1388) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (APEXMALHAR-2129) ManagedState: Add a disable purging option
[ https://issues.apache.org/jira/browse/APEXMALHAR-2129?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15355507#comment-15355507 ] Chandni Singh commented on APEXMALHAR-2129: --- [~bhupesh] I re-opened the ticket and will work on it. We had discussed this earlier but can't find the Jira that addresses it. > ManagedState: Add a disable purging option > -- > > Key: APEXMALHAR-2129 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2129 > Project: Apache Apex Malhar > Issue Type: Task >Reporter: Bhupesh Chawda >Assignee: Chandni Singh > > Have an option that can disable purging of data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (APEXMALHAR-2129) ManagedState: Add a disable purging option
[ https://issues.apache.org/jira/browse/APEXMALHAR-2129?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chandni Singh updated APEXMALHAR-2129: -- Description: Have an option that can disable purging of data. (was: TimeBucketAssigner advances the time boundaries of the buckets viz. start and end to the current system time every window. The requirement is to add an option so that clients can disable this if needed. Tuple time based deduplication has such a requirement.) > ManagedState: Add a disable purging option > -- > > Key: APEXMALHAR-2129 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2129 > Project: Apache Apex Malhar > Issue Type: Task >Reporter: Bhupesh Chawda >Assignee: Chandni Singh > > Have an option that can disable purging of data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (APEXMALHAR-2129) ManagedState: Add a disable purging option
[ https://issues.apache.org/jira/browse/APEXMALHAR-2129?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chandni Singh updated APEXMALHAR-2129: -- Priority: Major (was: Minor) > ManagedState: Add a disable purging option > -- > > Key: APEXMALHAR-2129 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2129 > Project: Apache Apex Malhar > Issue Type: Task >Reporter: Bhupesh Chawda >Assignee: Chandni Singh > > TimeBucketAssigner advances the time boundaries of the buckets viz. start and > end to the current system time every window. > The requirement is to add an option so that clients can disable this if > needed. Tuple time based deduplication has such a requirement. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (APEXMALHAR-2129) ManagedState: Add a disable purging option
[ https://issues.apache.org/jira/browse/APEXMALHAR-2129?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chandni Singh updated APEXMALHAR-2129: -- Summary: ManagedState: Add a disable purging option (was: Introduce option to advance time through Expiry task in TimeBucketAssigner) > ManagedState: Add a disable purging option > -- > > Key: APEXMALHAR-2129 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2129 > Project: Apache Apex Malhar > Issue Type: Task >Reporter: Bhupesh Chawda >Assignee: Bhupesh Chawda >Priority: Minor > > TimeBucketAssigner advances the time boundaries of the buckets viz. start and > end to the current system time every window. > The requirement is to add an option so that clients can disable this if > needed. Tuple time based deduplication has such a requirement. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Reopened] (APEXMALHAR-2129) ManagedState: Add a disable purging option
[ https://issues.apache.org/jira/browse/APEXMALHAR-2129?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chandni Singh reopened APEXMALHAR-2129: --- Assignee: Chandni Singh (was: Bhupesh Chawda) > ManagedState: Add a disable purging option > -- > > Key: APEXMALHAR-2129 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2129 > Project: Apache Apex Malhar > Issue Type: Task >Reporter: Bhupesh Chawda >Assignee: Chandni Singh >Priority: Minor > > TimeBucketAssigner advances the time boundaries of the buckets viz. start and > end to the current system time every window. > The requirement is to add an option so that clients can disable this if > needed. Tuple time based deduplication has such a requirement. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (APEXMALHAR-2129) Introduce option to advance time through Expiry task in TimeBucketAssigner
[ https://issues.apache.org/jira/browse/APEXMALHAR-2129?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15355324#comment-15355324 ] Chandni Singh commented on APEXMALHAR-2129: --- I am not disagreeing with your use case. ManagedState is there for managing large amount data and that data when it is written on disk is timebucketed especially to enable purging of it easily. What you need is disabling purge which has been already discussed and I believe there is a ticket for it. > Introduce option to advance time through Expiry task in TimeBucketAssigner > -- > > Key: APEXMALHAR-2129 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2129 > Project: Apache Apex Malhar > Issue Type: Task >Reporter: Bhupesh Chawda >Assignee: Bhupesh Chawda >Priority: Minor > > TimeBucketAssigner advances the time boundaries of the buckets viz. start and > end to the current system time every window. > The requirement is to add an option so that clients can disable this if > needed. Tuple time based deduplication has such a requirement. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (APEXMALHAR-2129) Introduce option to advance time through Expiry task in TimeBucketAssigner
[ https://issues.apache.org/jira/browse/APEXMALHAR-2129?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15354658#comment-15354658 ] Chandni Singh commented on APEXMALHAR-2129: --- Please explain the requirement and why is it needed? This helps with purging of data. How will you purge data? > Introduce option to advance time through Expiry task in TimeBucketAssigner > -- > > Key: APEXMALHAR-2129 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2129 > Project: Apache Apex Malhar > Issue Type: Task >Reporter: Bhupesh Chawda >Assignee: Bhupesh Chawda >Priority: Minor > > TimeBucketAssigner advances the time boundaries of the buckets viz. start and > end to the current system time every window. > The requirement is to add an option so that clients can disable this if > needed. Tuple time based deduplication has such a requirement. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (APEXMALHAR-2063) Integrate WAL to FS WindowDataManager
[ https://issues.apache.org/jira/browse/APEXMALHAR-2063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15323383#comment-15323383 ] Chandni Singh commented on APEXMALHAR-2063: --- [~sandesh] Window Data Manager purges data once a window is committed this is because the data of that window may be needed. We can't delete any window until it is committed. I don't see in your pull request where this is required. > Integrate WAL to FS WindowDataManager > - > > Key: APEXMALHAR-2063 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2063 > Project: Apache Apex Malhar > Issue Type: Improvement >Reporter: Chandni Singh >Assignee: Chandni Singh > > FS Window Data Manager is used to save meta-data that helps in replaying > tuples every completed application window after failure. For this it saves > meta-data in a file per window. Having multiple small size files on hdfs > cause issues as highlighted here: > http://blog.cloudera.com/blog/2009/02/the-small-files-problem/ > Instead FS Window Data Manager can utilize the WAL to write data and maintain > a mapping of how much data was flushed to WAL each window. > In order to use FileSystemWAL for replaying data of a finished window, there > are few changes made to FileSystemWAL this is because of following: > 1. WindowDataManager needs to reply data of every finished window. This > window may not be checkpointed. > FileSystemWAL truncates the WAL file to the checkpointed point after recovery > so this poses a problem. > WindowDataManager should be able to control recovery of FileSystemWAL. > 2. FileSystemWAL writes to temporary files. The mapping of temp files to > actual file is part of its state which is checkpointed. Since > WindowDataManager replays data of a window not yet checkpointed, it needs to > know the actual temporary file the data is being persisted to. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (APEXMALHAR-2063) Integrate WAL to FS WindowDataManager
[ https://issues.apache.org/jira/browse/APEXMALHAR-2063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15321102#comment-15321102 ] Chandni Singh commented on APEXMALHAR-2063: --- [~sandesh] WindowDataManager is used for idempotency. The data of windows get delete when it gets committed. This is why we have this method deleteUpTo(...) Can you tell me when you will use delete windows strictly less than a particularly window? > Integrate WAL to FS WindowDataManager > - > > Key: APEXMALHAR-2063 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2063 > Project: Apache Apex Malhar > Issue Type: Improvement >Reporter: Chandni Singh >Assignee: Chandni Singh > > FS Window Data Manager is used to save meta-data that helps in replaying > tuples every completed application window after failure. For this it saves > meta-data in a file per window. Having multiple small size files on hdfs > cause issues as highlighted here: > http://blog.cloudera.com/blog/2009/02/the-small-files-problem/ > Instead FS Window Data Manager can utilize the WAL to write data and maintain > a mapping of how much data was flushed to WAL each window. > In order to use FileSystemWAL for replaying data of a finished window, there > are few changes made to FileSystemWAL this is because of following: > 1. WindowDataManager needs to reply data of every finished window. This > window may not be checkpointed. > FileSystemWAL truncates the WAL file to the checkpointed point after recovery > so this poses a problem. > WindowDataManager should be able to control recovery of FileSystemWAL. > 2. FileSystemWAL writes to temporary files. The mapping of temp files to > actual file is part of its state which is checkpointed. Since > WindowDataManager replays data of a window not yet checkpointed, it needs to > know the actual temporary file the data is being persisted to. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (APEXMALHAR-2063) Integrate WAL to FS WindowDataManager
[ https://issues.apache.org/jira/browse/APEXMALHAR-2063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15320164#comment-15320164 ] Chandni Singh edited comment on APEXMALHAR-2063 at 6/8/16 3:39 PM: --- Since we are deprecating these methods, I think it will be good to deprecate the following as 1. {code}save(Object object, int operatorId, long windowId){code} operatorId is redundant for WindowDataManager. WindowDataManager extends {code}Component {code} and knows the operator id in setup() for which it is saving the data. Also an instance of WindowDataManager can write data for only the enclosing partition and not any other partition. Add {code}save(Object object, long windowId){code} instead 2. {code} load(int operatorId, long windowId) and load(long windowId) {code} In the use cases (operators), the data is loaded for the enclosing partition or all the partitions so this could be changed as : {code}Object retrieve(long windowId) and MapretrieveAllPartitions(long windowId){code} 3. {code} delete(int operatorId, long windowId) {code} Same as the save(...), operatorId is redundant. {code}delete(long windowId){code} Even though WindowDataManager is @Evolving, we can mark these as deprecated and add the new ones. was (Author: csingh): Since we are deprecating these methods, I think it will be good to deprecate the following as 1. {code}save(Object object, int operatorId, long windowId){code} operatorId is redundant for WindowDataManager. WindowDataManager extends {code}Component {code} and knows the operator id in setup() for which it is saving the data. Also an instance of WindowDataManager can write data for only the enclosing partition and not any other partition. Add {code}save(Object object, long windowId){code} instead 2. {code} load(int operatorId, long windowId) and load(long windowId) {code} In the use cases (operators), the data is loaded for the enclosing partition or all the partitions so this could be changed as : {code}Object fetch(long windowId) and Map fetchAllPartitions(long windowId){code} 3. {code} delete(int operatorId, long windowId) {code} Same as the save(...), operatorId is redundant. {code}delete(long windowId){code} Even though WindowDataManager is @Evolving, we can mark these as deprecated and add the new ones. > Integrate WAL to FS WindowDataManager > - > > Key: APEXMALHAR-2063 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2063 > Project: Apache Apex Malhar > Issue Type: Improvement >Reporter: Chandni Singh >Assignee: Chandni Singh > > FS Window Data Manager is used to save meta-data that helps in replaying > tuples every completed application window after failure. For this it saves > meta-data in a file per window. Having multiple small size files on hdfs > cause issues as highlighted here: > http://blog.cloudera.com/blog/2009/02/the-small-files-problem/ > Instead FS Window Data Manager can utilize the WAL to write data and maintain > a mapping of how much data was flushed to WAL each window. > In order to use FileSystemWAL for replaying data of a finished window, there > are few changes made to FileSystemWAL this is because of following: > 1. WindowDataManager needs to reply data of every finished window. This > window may not be checkpointed. > FileSystemWAL truncates the WAL file to the checkpointed point after recovery > so this poses a problem. > WindowDataManager should be able to control recovery of FileSystemWAL. > 2. FileSystemWAL writes to temporary files. The mapping of temp files to > actual file is part of its state which is checkpointed. Since > WindowDataManager replays data of a window not yet checkpointed, it needs to > know the actual temporary file the data is being persisted to. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (APEXMALHAR-2063) Integrate WAL to FS WindowDataManager
[ https://issues.apache.org/jira/browse/APEXMALHAR-2063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15320164#comment-15320164 ] Chandni Singh edited comment on APEXMALHAR-2063 at 6/8/16 7:31 AM: --- Since we are deprecating these methods, I think it will be good to deprecate the following as 1. {code}save(Object object, int operatorId, long windowId){code} operatorId is redundant for WindowDataManager. WindowDataManager extends {code}Component {code} and knows the operator id in setup() for which it is saving the data. Also an instance of WindowDataManager can write data for only the enclosing partition and not any other partition. Add {code}save(Object object, long windowId){code} instead 2. {code} load(int operatorId, long windowId) and load(long windowId) {code} In the use cases (operators), the data is loaded for the enclosing partition or all the partitions so this could be changed as : {code}Object fetch(long windowId) and MapfetchAllPartitions(long windowId){code} 3. {code} delete(int operatorId, long windowId) {code} Same as the save(...), operatorId is redundant. {code}delete(long windowId){code} Even though WindowDataManager is @Evolving, we can mark these as deprecated and add the new ones. was (Author: csingh): Since we are deprecating these methods, I think it will be good to deprecate the following as 1. {code}save(Object object, int operatorId, long windowId){code} operatorId is redundant for WindowDataManager. WindowDataManager extends {code}Component {code} and knows the operator id in setup() for which it is saving the data. Also an instance of WindowDataManager can write data for only the enclosing partition and not any other partition. Add {code}save(Object object, long windowId){code} instead 2. {code} load(int operatorId, long windowId) and load(long windowId) {code} In the use cases (operators), the data is loaded for the enclosing partition or all the partitions so this could be changed as : {code}Object load(long windowId) and Map loadAllPartitions(long windowId){code} 3. {code} delete(int operatorId, long windowId) {code} Same as the save(...), operatorId is redundant. {code}delete(long windowId){code} Even though WindowDataManager is @Evolving, we can mark these as deprecated and add the new ones. > Integrate WAL to FS WindowDataManager > - > > Key: APEXMALHAR-2063 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2063 > Project: Apache Apex Malhar > Issue Type: Improvement >Reporter: Chandni Singh >Assignee: Chandni Singh > > FS Window Data Manager is used to save meta-data that helps in replaying > tuples every completed application window after failure. For this it saves > meta-data in a file per window. Having multiple small size files on hdfs > cause issues as highlighted here: > http://blog.cloudera.com/blog/2009/02/the-small-files-problem/ > Instead FS Window Data Manager can utilize the WAL to write data and maintain > a mapping of how much data was flushed to WAL each window. > In order to use FileSystemWAL for replaying data of a finished window, there > are few changes made to FileSystemWAL this is because of following: > 1. WindowDataManager needs to reply data of every finished window. This > window may not be checkpointed. > FileSystemWAL truncates the WAL file to the checkpointed point after recovery > so this poses a problem. > WindowDataManager should be able to control recovery of FileSystemWAL. > 2. FileSystemWAL writes to temporary files. The mapping of temp files to > actual file is part of its state which is checkpointed. Since > WindowDataManager replays data of a window not yet checkpointed, it needs to > know the actual temporary file the data is being persisted to. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (APEXMALHAR-2063) Integrate WAL to FS WindowDataManager
[ https://issues.apache.org/jira/browse/APEXMALHAR-2063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15320164#comment-15320164 ] Chandni Singh edited comment on APEXMALHAR-2063 at 6/8/16 7:22 AM: --- Since we are deprecating these methods, I think it will be good to deprecate the following as 1. {code}save(Object object, int operatorId, long windowId){code} operatorId is redundant for WindowDataManager. WindowDataManager extends {code}Component {code} and knows the operator id in setup() for which it is saving the data. Also an instance of WindowDataManager can write data for only the enclosing partition and not any other partition. Add {code}save(Object object, long windowId){code} instead 2. {code} load(int operatorId, long windowId) and load(long windowId) {code} In the use cases (operators), the data is loaded for the enclosing partition or all the partitions so this could be changed as : {code}Object load(long windowId) and MaploadAllPartitions(long windowId){code} 3. {code} delete(int operatorId, long windowId) {code} Same as the save(...), operatorId is redundant. {code}delete(long windowId){code} Even though WindowDataManager is @Evolving, we can mark these as deprecated and add the new ones. was (Author: csingh): Since we are deprecating these methods, I think it will be good to deprecate the following as 1. {code}save(Object object, int operatorId, long windowId){code} operatorId is redundant for WindowDataManager. WindowDataManager extends {code}Component {code} and knows the operator id in setup() for which it is saving the data. Also an instance of WindowDataManager can write data for only the enclosing partition and not any other partition. Add {code}save(Object object, long windowId){code} instead 2. {code} load(int operatorId, long windowId) and load(long windowId) {code} In the use cases (operators), the data is loaded for the enclosing partition or all the partitions so this could be changed as : {code}load(long windowId) and loadAll(long windowId){code} 3. {code} delete(int operatorId, long windowId) {code} Same as the save(...), operatorId is redundant. {code}delete(long windowId){code} Even though WindowDataManager is @Evolving, we can mark these as deprecated and add the new ones. > Integrate WAL to FS WindowDataManager > - > > Key: APEXMALHAR-2063 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2063 > Project: Apache Apex Malhar > Issue Type: Improvement >Reporter: Chandni Singh >Assignee: Chandni Singh > > FS Window Data Manager is used to save meta-data that helps in replaying > tuples every completed application window after failure. For this it saves > meta-data in a file per window. Having multiple small size files on hdfs > cause issues as highlighted here: > http://blog.cloudera.com/blog/2009/02/the-small-files-problem/ > Instead FS Window Data Manager can utilize the WAL to write data and maintain > a mapping of how much data was flushed to WAL each window. > In order to use FileSystemWAL for replaying data of a finished window, there > are few changes made to FileSystemWAL this is because of following: > 1. WindowDataManager needs to reply data of every finished window. This > window may not be checkpointed. > FileSystemWAL truncates the WAL file to the checkpointed point after recovery > so this poses a problem. > WindowDataManager should be able to control recovery of FileSystemWAL. > 2. FileSystemWAL writes to temporary files. The mapping of temp files to > actual file is part of its state which is checkpointed. Since > WindowDataManager replays data of a window not yet checkpointed, it needs to > know the actual temporary file the data is being persisted to. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (APEXMALHAR-2063) Integrate WAL to FS WindowDataManager
[ https://issues.apache.org/jira/browse/APEXMALHAR-2063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15320164#comment-15320164 ] Chandni Singh edited comment on APEXMALHAR-2063 at 6/8/16 7:21 AM: --- Since we are deprecating these methods, I think it will be good to deprecate the following as 1. {code}save(Object object, int operatorId, long windowId){code} operatorId is redundant for WindowDataManager. WindowDataManager extends {code}Component {code} and knows the operator id in setup() for which it is saving the data. Also an instance of WindowDataManager can write data for only the enclosing partition and not any other partition. Add {code}save(Object object, long windowId){code} instead 2. {code} load(int operatorId, long windowId) and load(long windowId) {code} In the use cases (operators), the data is loaded for the enclosing partition or all the partitions so this could be changed as : {code}load(long windowId) and loadAll(long windowId){code} 3. {code} delete(int operatorId, long windowId) {code} Same as the save(...), operatorId is redundant. {code}delete(long windowId){code} Even though WindowDataManager is @Evolving, we can mark these as deprecated and add the new ones. was (Author: csingh): Since we are deprecating these methods, I think it will be good to deprecate the following as 1. {code}save(Object object, int operatorId, long windowId){code} operatorId is redundant for WindowDataManager. WindowDataManager extends {code}Component {code} and knows the operator id in setup() for which it is saving the data. Also an instance of WindowDataManager can write data for only the enclosing partition and not any other partition. Add {code}save(Object object, long windowId){code} instead 2. {code} load(int operatorId, long windowId) {code} In the use cases (operators), the data is loaded for the enclosing partition or all the partitions so this could be changed as : {code}load(long windowId, boolean allPartitions){code} 3. {code} delete(int operatorId, long windowId) {code} Same as the save(...), operatorId is redundant. {code}delete(long windowId){code} Even though WindowDataManager is @Evolving, we can mark these as deprecated and add the new ones. > Integrate WAL to FS WindowDataManager > - > > Key: APEXMALHAR-2063 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2063 > Project: Apache Apex Malhar > Issue Type: Improvement >Reporter: Chandni Singh >Assignee: Chandni Singh > > FS Window Data Manager is used to save meta-data that helps in replaying > tuples every completed application window after failure. For this it saves > meta-data in a file per window. Having multiple small size files on hdfs > cause issues as highlighted here: > http://blog.cloudera.com/blog/2009/02/the-small-files-problem/ > Instead FS Window Data Manager can utilize the WAL to write data and maintain > a mapping of how much data was flushed to WAL each window. > In order to use FileSystemWAL for replaying data of a finished window, there > are few changes made to FileSystemWAL this is because of following: > 1. WindowDataManager needs to reply data of every finished window. This > window may not be checkpointed. > FileSystemWAL truncates the WAL file to the checkpointed point after recovery > so this poses a problem. > WindowDataManager should be able to control recovery of FileSystemWAL. > 2. FileSystemWAL writes to temporary files. The mapping of temp files to > actual file is part of its state which is checkpointed. Since > WindowDataManager replays data of a window not yet checkpointed, it needs to > know the actual temporary file the data is being persisted to. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (APEXMALHAR-2063) Integrate WAL to FS WindowDataManager
[ https://issues.apache.org/jira/browse/APEXMALHAR-2063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15320164#comment-15320164 ] Chandni Singh edited comment on APEXMALHAR-2063 at 6/8/16 7:12 AM: --- Since we are deprecating these methods, I think it will be good to deprecate the following as 1. {code}save(Object object, int operatorId, long windowId){code} operatorId is redundant for WindowDataManager. WindowDataManager extends {code}Component {code} and knows the operator id in setup() for which it is saving the data. Also an instance of WindowDataManager can write data for only the enclosing partition and not any other partition. Add {code}save(Object object, long windowId){code} instead 2. {code} load(int operatorId, long windowId) {code} In the use cases (operators), the data is loaded for the enclosing partition or all the partitions so this could be changed as : {code}load(long windowId, boolean allPartitions){code} 3. {code} delete(int operatorId, long windowId) {code} Same as the save(...), operatorId is redundant. {code}delete(long windowId){code} Even though WindowDataManager is @Evolving, we can mark these as deprecated and add the new ones. was (Author: csingh): Since we are deprecating these methods, I think it will be good to deprecate the following as 1. save(Object object, int operatorId, long windowId) operatorId is redundant for WindowDataManager. WindowDataManager extends {code}Component {code} and knows the operator id in setup() for which it is saving the data. Also an instance of WindowDataManager can write data for only the enclosing partition and not any other partition. Add {code}save(Object object, long windowId){code} instead 2. load(int operatorId, long windowId) In the use cases (operators), the data is loaded for the enclosing partition or all the partitions so this could be changed as : {code}load(long windowId, boolean allPartitions) 3. delete(int operatorId, long windowId) throws IOException; Same as the save(...), operatorId is redundant. {code}delete(long windowId){code} Even though WindowDataManager is @Evolving, we can mark these as deprecated and add the new ones. > Integrate WAL to FS WindowDataManager > - > > Key: APEXMALHAR-2063 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2063 > Project: Apache Apex Malhar > Issue Type: Improvement >Reporter: Chandni Singh >Assignee: Chandni Singh > > FS Window Data Manager is used to save meta-data that helps in replaying > tuples every completed application window after failure. For this it saves > meta-data in a file per window. Having multiple small size files on hdfs > cause issues as highlighted here: > http://blog.cloudera.com/blog/2009/02/the-small-files-problem/ > Instead FS Window Data Manager can utilize the WAL to write data and maintain > a mapping of how much data was flushed to WAL each window. > In order to use FileSystemWAL for replaying data of a finished window, there > are few changes made to FileSystemWAL this is because of following: > 1. WindowDataManager needs to reply data of every finished window. This > window may not be checkpointed. > FileSystemWAL truncates the WAL file to the checkpointed point after recovery > so this poses a problem. > WindowDataManager should be able to control recovery of FileSystemWAL. > 2. FileSystemWAL writes to temporary files. The mapping of temp files to > actual file is part of its state which is checkpointed. Since > WindowDataManager replays data of a window not yet checkpointed, it needs to > know the actual temporary file the data is being persisted to. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (APEXMALHAR-2063) Integrate WAL to FS WindowDataManager
[ https://issues.apache.org/jira/browse/APEXMALHAR-2063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15320103#comment-15320103 ] Chandni Singh commented on APEXMALHAR-2063: --- I am close to completing the implementation of all the methods supported in WindowDataManager api. WindowDataManager API extended the StorageAgent API and I am thinking of deprecating the following methods which are not being used in any operator using WindowDataManager. 1. long[] getWindowIds(int operatorId) 2. long[] getWindowIds() The reason to deprecate them is that the implementation to provide just the window ids is not optimal. If all the window ids[] are expected without reading the data of those windows then we basically re-read wal files. > Integrate WAL to FS WindowDataManager > - > > Key: APEXMALHAR-2063 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2063 > Project: Apache Apex Malhar > Issue Type: Improvement >Reporter: Chandni Singh >Assignee: Chandni Singh > > FS Window Data Manager is used to save meta-data that helps in replaying > tuples every completed application window after failure. For this it saves > meta-data in a file per window. Having multiple small size files on hdfs > cause issues as highlighted here: > http://blog.cloudera.com/blog/2009/02/the-small-files-problem/ > Instead FS Window Data Manager can utilize the WAL to write data and maintain > a mapping of how much data was flushed to WAL each window. > In order to use FileSystemWAL for replaying data of a finished window, there > are few changes made to FileSystemWAL this is because of following: > 1. WindowDataManager needs to reply data of every finished window. This > window may not be checkpointed. > FileSystemWAL truncates the WAL file to the checkpointed point after recovery > so this poses a problem. > WindowDataManager should be able to control recovery of FileSystemWAL. > 2. FileSystemWAL writes to temporary files. The mapping of temp files to > actual file is part of its state which is checkpointed. Since > WindowDataManager replays data of a window not yet checkpointed, it needs to > know the actual temporary file the data is being persisted to. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (APEXMALHAR-2063) Integrate WAL to FS WindowDataManager
[ https://issues.apache.org/jira/browse/APEXMALHAR-2063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15320102#comment-15320102 ] Chandni Singh commented on APEXMALHAR-2063: --- I am close to completing the implementation of all the methods supported in WindowDataManager api. WindowDataManager API extended the StorageAgent API and I am thinking of deprecating the following methods which are not being used in any operator using WindowDataManager. 1. long[] getWindowIds(int operatorId) 2. long[] getWindowIds() The reason to deprecate them is that the implementation to provide just the window ids is not optimal. If all the window ids[] are expected without reading the data of those windows then we basically re-read wal files. > Integrate WAL to FS WindowDataManager > - > > Key: APEXMALHAR-2063 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2063 > Project: Apache Apex Malhar > Issue Type: Improvement >Reporter: Chandni Singh >Assignee: Chandni Singh > > FS Window Data Manager is used to save meta-data that helps in replaying > tuples every completed application window after failure. For this it saves > meta-data in a file per window. Having multiple small size files on hdfs > cause issues as highlighted here: > http://blog.cloudera.com/blog/2009/02/the-small-files-problem/ > Instead FS Window Data Manager can utilize the WAL to write data and maintain > a mapping of how much data was flushed to WAL each window. > In order to use FileSystemWAL for replaying data of a finished window, there > are few changes made to FileSystemWAL this is because of following: > 1. WindowDataManager needs to reply data of every finished window. This > window may not be checkpointed. > FileSystemWAL truncates the WAL file to the checkpointed point after recovery > so this poses a problem. > WindowDataManager should be able to control recovery of FileSystemWAL. > 2. FileSystemWAL writes to temporary files. The mapping of temp files to > actual file is part of its state which is checkpointed. Since > WindowDataManager replays data of a window not yet checkpointed, it needs to > know the actual temporary file the data is being persisted to. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (APEXMALHAR-2063) Integrate WAL to FS WindowDataManager
[ https://issues.apache.org/jira/browse/APEXMALHAR-2063?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chandni Singh updated APEXMALHAR-2063: -- Description: FS Window Data Manager is used to save meta-data that helps in replaying tuples every completed application window after failure. For this it saves meta-data in a file per window. Having multiple small size files on hdfs cause issues as highlighted here: http://blog.cloudera.com/blog/2009/02/the-small-files-problem/ Instead FS Window Data Manager can utilize the WAL to write data and maintain a mapping of how much data was flushed to WAL each window. In order to use FileSystemWAL for replaying data of a finished window, there are few changes made to FileSystemWAL this is because of following: 1. WindowDataManager needs to reply data of every finished window. This window may not be checkpointed. FileSystemWAL truncates the WAL file to the checkpointed point after recovery so this poses a problem. WindowDataManager should be able to control recovery of FileSystemWAL. 2. FileSystemWAL writes to temporary files. The mapping of temp files to actual file is part of its state which is checkpointed. Since WindowDataManager replays data of a window not yet checkpointed, it needs to know the actual temporary file the data is being persisted to. was: FS Window Data Manager is used to save meta-data that helps in replaying tuples every completed application window after failure. For this it saves meta-data in a file per window. Having multiple small size files on hdfs cause issues as highlighted here: http://blog.cloudera.com/blog/2009/02/the-small-files-problem/ Instead FS Window Data Manager can utilize the WAL to write data and maintain a mapping of how much data was flushed to WAL each window. In order to use FileSystemWAL for replaying data of a finished window, there are few changes made to FileSystemWAL this is because of following: 1. WindowDataManager needs to reply data of every finished window. This window may not be checkpointed. FileSystemWAL truncates the WAL file to the checkpointed point after recovery so this poses a problem. WindowDataManager should be able to control recovery of FileSystemWAL. 2. FileSystemWAL writes to temporary files. The mapping of temp files to actual file is part of its state which is checkpointed. Since WindowDataManager replays data of a window not yet checkpointed, it needs to know the actual temporary file the data is being persisted to. At a high level, WindowDataManager will persist meta information on file system which includes following details for every window - start wal pointer - end was pointer - wal file path This is a single file which is updated every end-window along with the actual data in WAL file. > Integrate WAL to FS WindowDataManager > - > > Key: APEXMALHAR-2063 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2063 > Project: Apache Apex Malhar > Issue Type: Improvement >Reporter: Chandni Singh >Assignee: Chandni Singh > > FS Window Data Manager is used to save meta-data that helps in replaying > tuples every completed application window after failure. For this it saves > meta-data in a file per window. Having multiple small size files on hdfs > cause issues as highlighted here: > http://blog.cloudera.com/blog/2009/02/the-small-files-problem/ > Instead FS Window Data Manager can utilize the WAL to write data and maintain > a mapping of how much data was flushed to WAL each window. > In order to use FileSystemWAL for replaying data of a finished window, there > are few changes made to FileSystemWAL this is because of following: > 1. WindowDataManager needs to reply data of every finished window. This > window may not be checkpointed. > FileSystemWAL truncates the WAL file to the checkpointed point after recovery > so this poses a problem. > WindowDataManager should be able to control recovery of FileSystemWAL. > 2. FileSystemWAL writes to temporary files. The mapping of temp files to > actual file is part of its state which is checkpointed. Since > WindowDataManager replays data of a window not yet checkpointed, it needs to > know the actual temporary file the data is being persisted to. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (APEXMALHAR-2063) Integrate WAL to FS WindowDataManager
[ https://issues.apache.org/jira/browse/APEXMALHAR-2063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15314820#comment-15314820 ] Chandni Singh commented on APEXMALHAR-2063: --- Embedding the window id into the WAL was not necessary with the Meta file solution. The meta file contained offsets from/to in WAL for each window. Here there will be just one entry per window and the entry is the incremental state. Yes naming convention + finalizing part files just after rotation will help. Currently the finalization is delayed until a window is committed. For replay of a window the finalization should be done as soon as the part file gets rotated. This again is because we are using it for idempotency on the input side and not on the output side. I think we can get around without a Meta file. However, this still requires some changes to FSWal mainly re-factoring. > Integrate WAL to FS WindowDataManager > - > > Key: APEXMALHAR-2063 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2063 > Project: Apache Apex Malhar > Issue Type: Improvement >Reporter: Chandni Singh >Assignee: Chandni Singh > > FS Window Data Manager is used to save meta-data that helps in replaying > tuples every completed application window after failure. For this it saves > meta-data in a file per window. Having multiple small size files on hdfs > cause issues as highlighted here: > http://blog.cloudera.com/blog/2009/02/the-small-files-problem/ > Instead FS Window Data Manager can utilize the WAL to write data and maintain > a mapping of how much data was flushed to WAL each window. > In order to use FileSystemWAL for replaying data of a finished window, there > are few changes made to FileSystemWAL this is because of following: > 1. WindowDataManager needs to reply data of every finished window. This > window may not be checkpointed. > FileSystemWAL truncates the WAL file to the checkpointed point after recovery > so this poses a problem. > WindowDataManager should be able to control recovery of FileSystemWAL. > 2. FileSystemWAL writes to temporary files. The mapping of temp files to > actual file is part of its state which is checkpointed. Since > WindowDataManager replays data of a window not yet checkpointed, it needs to > know the actual temporary file the data is being persisted to. > At a high level, WindowDataManager will persist meta information on file > system which includes following details for every window > - start wal pointer > - end was pointer > - wal file path > This is a single file which is updated every end-window along with the actual > data in WAL file. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (APEXMALHAR-2063) Integrate WAL to FS WindowDataManager
[ https://issues.apache.org/jira/browse/APEXMALHAR-2063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15314680#comment-15314680 ] Chandni Singh commented on APEXMALHAR-2063: --- I am looking into the possibility of writing window ids with the WAL entries. This requires creating extensions of FSWALWriter and FSWALReader - Meta info consists of more than entry size. It includes window id as well. - temp files are finalized as soon as they are rotated which ensures that there is only one most recent valid temporary file. - wal content is not truncated to the last checkpoint. It is truncated to the point after last finished window. > Integrate WAL to FS WindowDataManager > - > > Key: APEXMALHAR-2063 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2063 > Project: Apache Apex Malhar > Issue Type: Improvement >Reporter: Chandni Singh >Assignee: Chandni Singh > > FS Window Data Manager is used to save meta-data that helps in replaying > tuples every completed application window after failure. For this it saves > meta-data in a file per window. Having multiple small size files on hdfs > cause issues as highlighted here: > http://blog.cloudera.com/blog/2009/02/the-small-files-problem/ > Instead FS Window Data Manager can utilize the WAL to write data and maintain > a mapping of how much data was flushed to WAL each window. > In order to use FileSystemWAL for replaying data of a finished window, there > are few changes made to FileSystemWAL this is because of following: > 1. WindowDataManager needs to reply data of every finished window. This > window may not be checkpointed. > FileSystemWAL truncates the WAL file to the checkpointed point after recovery > so this poses a problem. > WindowDataManager should be able to control recovery of FileSystemWAL. > 2. FileSystemWAL writes to temporary files. The mapping of temp files to > actual file is part of its state which is checkpointed. Since > WindowDataManager replays data of a window not yet checkpointed, it needs to > know the actual temporary file the data is being persisted to. > At a high level, WindowDataManager will persist meta information on file > system which includes following details for every window > - start wal pointer > - end was pointer > - wal file path > This is a single file which is updated every end-window along with the actual > data in WAL file. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (APEXMALHAR-2096) Add blockThreshold parameter to FSInputModule
[ https://issues.apache.org/jira/browse/APEXMALHAR-2096?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chandni Singh resolved APEXMALHAR-2096. --- Resolution: Fixed Fix Version/s: 3.5.0 > Add blockThreshold parameter to FSInputModule > - > > Key: APEXMALHAR-2096 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2096 > Project: Apache Apex Malhar > Issue Type: Improvement >Reporter: Priyanka Gugale >Assignee: Priyanka Gugale > Fix For: 3.5.0 > > > FileSplitter is very fast, the downstream operators can't work at that speed, > so we need to limit the speed with which fileSplitter works. > One way to do is limit number of blocks emitted per window. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (APEXMALHAR-2063) Integrate WAL to FS WindowDataManager
[ https://issues.apache.org/jira/browse/APEXMALHAR-2063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15301685#comment-15301685 ] Chandni Singh commented on APEXMALHAR-2063: --- Window Data Manager supports dynamic partitioning in operator by allowing an instance to read data saved for a particular window id by all operator instances. The following method provides that support {code} /** * When an operator can partition itself dynamically then there is no guarantee that an input state which was being * handled by one instance previously will be handled by the same instance after partitioning. * For eg. An {@link AbstractFileInputOperator} instance which reads a File X till offset l (not check-pointed) may no * longer be the instance that handles file X after repartitioning as no. of instances may have changed and file X * is re-hashed to another instance. * The new instance wouldn't know from what point to read the File X unless it reads the idempotent storage of all the * operators for the window being replayed and fix it's state. * * @param windowId window id. * @return mapping of operator id to the corresponding state * @throws IOException */ Mapload(long windowId) throws IOException; {code} To provide the support for above with FileSystemWAL becomes complicated. Currently the FileSystemWAL reader and writer are assumed to be in the same physical partition. However, supporting above requires multiple readers which are in different physical partitions than the writer. So the FileSystem WAL needs to be changed, in order to be used in read-only mode. > Integrate WAL to FS WindowDataManager > - > > Key: APEXMALHAR-2063 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2063 > Project: Apache Apex Malhar > Issue Type: Improvement >Reporter: Chandni Singh >Assignee: Chandni Singh > > FS Window Data Manager is used to save meta-data that helps in replaying > tuples every completed application window after failure. For this it saves > meta-data in a file per window. Having multiple small size files on hdfs > cause issues as highlighted here: > http://blog.cloudera.com/blog/2009/02/the-small-files-problem/ > Instead FS Window Data Manager can utilize the WAL to write data and maintain > a mapping of how much data was flushed to WAL each window. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (APEXMALHAR-2101) RuntimeException from Default Bucket which is under managed state.
[ https://issues.apache.org/jira/browse/APEXMALHAR-2101?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chandni Singh reassigned APEXMALHAR-2101: - Assignee: Chandni Singh > RuntimeException from Default Bucket which is under managed state. > -- > > Key: APEXMALHAR-2101 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2101 > Project: Apache Apex Malhar > Issue Type: Bug >Reporter: Chaitanya >Assignee: Chandni Singh > > Getting the following exception, while using the ManagedTimeStateImpl > operator: > 2016-05-24 15:42:48,813 ERROR > com.datatorrent.stram.engine.StreamingContainer: Operator set > [OperatorDeployInfo[id=4,name=join,type=GENERIC,checkpoint={, > 0, > 0},inputs=[OperatorDeployInfo.InputDeployInfo[portName=input,streamId=GenToJoin,sourceNodeId=2,sourcePortName=outputPort,locality=,partitionMask=1,partitionKeys=[1]], > > OperatorDeployInfo.InputDeployInfo[portName=product,streamId=ProductToJoin,sourceNodeId=1,sourcePortName=outputPort,locality=,partitionMask=0,partitionKeys=]],outputs=[OperatorDeployInfo.OutputDeployInfo[portName=output,streamId=JoinToConsole,bufferServer=chaitanya-HP-ENVY-15-Notebook-PC > stopped running due to an exception. > java.lang.RuntimeException: while loading 0, 12 > at > org.apache.apex.malhar.lib.state.managed.Bucket$DefaultBucket.getValueFromTimeBucketReader(Bucket.java:346) > at > org.apache.apex.malhar.lib.state.managed.Bucket$DefaultBucket.getFromReaders(Bucket.java:291) > at > org.apache.apex.malhar.lib.state.managed.Bucket$DefaultBucket.get(Bucket.java:323) > at > org.apache.apex.malhar.lib.state.managed.AbstractManagedStateImpl.getValueFromBucketSync(AbstractManagedStateImpl.java:288) > at > org.apache.apex.malhar.lib.state.managed.ManagedTimeStateImpl.getSync(ManagedTimeStateImpl.java:56) > at > com.examples.app.ManagedStateIntOperator.get(ManagedStateIntOperator.java:91) > at > com.examples.app.ManagedStateIntOperator.processTuple(ManagedStateIntOperator.java:80) > at > com.examples.app.ManagedStateIntOperator$2.process(ManagedStateIntOperator.java:73) > at > com.examples.app.ManagedStateIntOperator$2.process(ManagedStateIntOperator.java:68) > at com.datatorrent.api.DefaultInputPort.put(DefaultInputPort.java:79) > at > com.datatorrent.stram.stream.BufferServerSubscriber$BufferReservoir.sweep(BufferServerSubscriber.java:280) > at com.datatorrent.stram.engine.GenericNode.run(GenericNode.java:259) > at > com.datatorrent.stram.engine.StreamingContainer$2.run(StreamingContainer.java:1393) > Caused by: java.io.EOFException: Cannot seek after EOF > at > org.apache.hadoop.hdfs.DFSInputStream.seek(DFSInputStream.java:1378) > at > org.apache.hadoop.fs.FSDataInputStream.seek(FSDataInputStream.java:62) > at > org.apache.hadoop.io.file.tfile.DTBCFile$Reader.(DTBCFile.java:674) > at > org.apache.hadoop.io.file.tfile.DTFile$Reader.(DTFile.java:827) > at > com.datatorrent.lib.fileaccess.DTFileReader.(DTFileReader.java:55) > at > com.datatorrent.lib.fileaccess.TFileImpl$DTFileImpl.getReader(TFileImpl.java:173) > at > org.apache.apex.malhar.lib.state.managed.BucketsFileSystem.getReader(BucketsFileSystem.java:96) > at > org.apache.apex.malhar.lib.state.managed.Bucket$DefaultBucket.loadFileReader(Bucket.java:371) > at > org.apache.apex.malhar.lib.state.managed.Bucket$DefaultBucket.getValueFromTimeBucketReader(Bucket.java:341) > ... 12 more -- This message was sent by Atlassian JIRA (v6.3.4#6332)