[jira] [Comment Edited] (APEXCORE-796) Docker based deployment

2018-07-23 Thread Chandni Singh (JIRA)


[ 
https://issues.apache.org/jira/browse/APEXCORE-796?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16553655#comment-16553655
 ] 

Chandni Singh edited comment on APEXCORE-796 at 7/24/18 1:41 AM:
-

{quote}I thought the goal is to not have to depend on YARN in the first place.
{quote}
Don't see that mention anywhere in this Jira.

APEX is very tightly coupled to YARN. It is a YARN application that uses the 
YARN abstractions. Decoupling from YARN will be a substantial effort for APEX.  
The description says
{quote}This will be particularly helpful when applications depend on native, 
non-JVM packages like Python and R, that otherwise need to be installed 
separately.
{quote}
YARN supports running docker containers, which is via YARN-3611, it will be 
helpful for running dockerized APEX operators. This, in my opinion, is easier 
and faster to accomplish.


was (Author: csingh):
{quote}I thought the goal is to not have to depend on YARN in the first place.
{quote}
Don't see that mention anywhere in this Jira. 

APEX is very tightly coupled to YARN. It is a YARN client that uses the YARN 
abstractions. Decoupling from YARN will be a substantial effort for APEX.  
The description says
{quote}This will be particularly helpful when applications depend on native, 
non-JVM packages like Python and R, that otherwise need to be installed 
separately.
{quote}
YARN supports running docker containers, which is via YARN-3611, it will be 
helpful for running dockerized APEX operators. This, in my opinion, is easier 
and faster to accomplish.

> Docker based deployment
> ---
>
> Key: APEXCORE-796
> URL: https://issues.apache.org/jira/browse/APEXCORE-796
> Project: Apache Apex Core
>  Issue Type: New Feature
>Reporter: Thomas Weise
>Priority: Major
>  Labels: roadmap
>
> Apex should support deployment using Docker as alternative to application 
> packages. Docker images provide a simple and standard way to package 
> dependencies and solve isolation from the host environment. This will be 
> particularly helpful when applications depend on native, non-JVM packages 
> like Python and R, that otherwise need to be installed separately. Docker 
> support will also be a step towards supporting other cluster managers like 
> Kubernetes, Mesos and Swarm.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (APEXCORE-796) Docker based deployment

2018-07-23 Thread Chandni Singh (JIRA)


[ 
https://issues.apache.org/jira/browse/APEXCORE-796?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16553655#comment-16553655
 ] 

Chandni Singh commented on APEXCORE-796:


{quote}I thought the goal is to not have to depend on YARN in the first place.
{quote}
Don't see that mention anywhere in this Jira. 

APEX is very tightly coupled to YARN. It is a YARN client that uses the YARN 
abstractions. Decoupling from YARN will be a substantial effort for APEX.  
The description says
{quote}This will be particularly helpful when applications depend on native, 
non-JVM packages like Python and R, that otherwise need to be installed 
separately.
{quote}
YARN supports running docker containers, which is via YARN-3611, it will be 
helpful for running dockerized APEX operators. This, in my opinion, is easier 
and faster to accomplish.

> Docker based deployment
> ---
>
> Key: APEXCORE-796
> URL: https://issues.apache.org/jira/browse/APEXCORE-796
> Project: Apache Apex Core
>  Issue Type: New Feature
>Reporter: Thomas Weise
>Priority: Major
>  Labels: roadmap
>
> Apex should support deployment using Docker as alternative to application 
> packages. Docker images provide a simple and standard way to package 
> dependencies and solve isolation from the host environment. This will be 
> particularly helpful when applications depend on native, non-JVM packages 
> like Python and R, that otherwise need to be installed separately. Docker 
> support will also be a step towards supporting other cluster managers like 
> Kubernetes, Mesos and Swarm.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (APEXCORE-796) Docker based deployment

2018-07-23 Thread Chandni Singh (JIRA)


[ 
https://issues.apache.org/jira/browse/APEXCORE-796?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16553507#comment-16553507
 ] 

Chandni Singh commented on APEXCORE-796:


YARN supports running docker containers : YARN-3611

> Docker based deployment
> ---
>
> Key: APEXCORE-796
> URL: https://issues.apache.org/jira/browse/APEXCORE-796
> Project: Apache Apex Core
>  Issue Type: New Feature
>Reporter: Thomas Weise
>Priority: Major
>  Labels: roadmap
>
> Apex should support deployment using Docker as alternative to application 
> packages. Docker images provide a simple and standard way to package 
> dependencies and solve isolation from the host environment. This will be 
> particularly helpful when applications depend on native, non-JVM packages 
> like Python and R, that otherwise need to be installed separately. Docker 
> support will also be a step towards supporting other cluster managers like 
> Kubernetes, Mesos and Swarm.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (APEXMALHAR-2252) Document AbstractManagedStateImpl subclasses

2017-10-24 Thread Chandni Singh (JIRA)

 [ 
https://issues.apache.org/jira/browse/APEXMALHAR-2252?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chandni Singh reassigned APEXMALHAR-2252:
-

Assignee: (was: Chandni Singh)

> Document AbstractManagedStateImpl subclasses
> 
>
> Key: APEXMALHAR-2252
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2252
> Project: Apache Apex Malhar
>  Issue Type: Task
>Reporter: Thomas Weise
>
> For the user the time bucketing concept is very important to understand. We 
> should highlight the difference in the documentation and also consider 
> renaming the subclasses for clarify.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


Re: [VOTE] Apache Apex Core Release 3.5.0 (RC1)

2016-12-09 Thread Chandni Singh
+1 (binding)

Downloaded the source package, unzipped it and ran `mvn clean
apache-rat:check verify -Dlicense.skip=false install`.
It completed without any errors.

Ran pi demo locally with apex which worked as well.


On Thu, Dec 8, 2016 at 9:45 PM, Pramod Immaneni 
wrote:

> Not an issue, user error. Was able to launch the pi demo application
> successfully.
> +1 (binding)
>
> On Thu, Dec 8, 2016 at 8:37 PM, Pramod Immaneni 
> wrote:
>
> > The file integrity, source code verification and compilation steps were
> > successful. However, I am seeing the following error when running the
> cli,
> > looks like others here are not seeing this
> >
> > Exception in thread "main" java.lang.NoClassDefFoundError:
> > com/datatorrent/common/util/JacksonObjectMapperProvider
> >
> > at java.lang.ClassLoader.defineClass1(Native Method)
> > at java.lang.ClassLoader.defineClass(ClassLoader.java:800)
> > at java.security.SecureClassLoader.defineClass(
> SecureClassLoader.java:142)
> > at java.net.URLClassLoader.defineClass(URLClassLoader.java:449)
> > at java.net.URLClassLoader.access$100(URLClassLoader.java:71)
> > at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
> > at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
> > at java.security.AccessController.doPrivileged(Native Method)
> > at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
> > at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
> > at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
> > at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
> > at com.datatorrent.stram.cli.ApexCli.(ApexCli.java:174)
> > at com.datatorrent.stram.cli.ApexCli.main(ApexCli.java:4063)
> >
> > Caused by: java.lang.ClassNotFoundException:
> > com.datatorrent.common.util.JacksonObjectMapperProvider
> > at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
> > at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
> > at java.security.AccessController.doPrivileged(Native Method)
> > at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
> > at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
> > at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
> > at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
> >
> > ... 14 more
> >
> >
> >
> > On Thu, Dec 8, 2016 at 5:41 PM, David Yan  wrote:
> >
> >> +1 (binding)
> >>
> >> Unzipped the source package
> >> Ran "mvn clean apache-rat:check verify -Dlicense.skip=false install"
> >> without any error.
> >> Verified existence of LICENSE, NOTICE, README.md and CHANGELOG.md.
> >> Verified launch of pi-demo
> >>
> >>
> >> On Wed, Dec 7, 2016 at 12:52 PM, Sandesh Hegde  >
> >> wrote:
> >>
> >> > +1
> >> >
> >> > Followed the steps in mentioned in http://apex.apache.org/
> >> > verification.html
> >> > Verified the launch of an application.
> >> >
> >> > On Tue, Dec 6, 2016 at 10:55 PM Thomas Weise  wrote:
> >> >
> >> > > Dear Community,
> >> > >
> >> > > Please vote on the following Apache Apex Core 3.5.0 release
> candidate.
> >> > >
> >> > > This is a source release with binary artifacts published to Maven.
> >> > >
> >> > > List of all issues fixed:
> >> > >
> >> > > https://issues.apache.org/jira/secure/ReleaseNote.jspa?
> >> > version=12335724=Text=12318823
> >> > > User documentation: https://apex.apache.org/docs/apex-3.5/
> >> > >
> >> > > Staging directory:
> >> > > https://dist.apache.org/repos/dist/dev/apex/apache-apex-core
> >> -3.5.0-RC1/
> >> > > Source zip:
> >> > >
> >> > > https://dist.apache.org/repos/dist/dev/apex/apache-apex-
> >> > core-3.5.0-RC1/apache-apex-core-3.5.0-source-release.zip
> >> > > Source tar.gz:
> >> > >
> >> > > https://dist.apache.org/repos/dist/dev/apex/apache-apex-
> >> > core-3.5.0-RC1/apache-apex-core-3.5.0-source-release.tar.gz
> >> > > Maven staging repository:
> >> > > https://repository.apache.org/content/repositories/orgapache
> >> apex-1021/
> >> > >
> >> > > Git source:
> >> > >
> >> > > https://git-wip-us.apache.org/repos/asf?p=apex-core.git;a=
> >> > commit;h=refs/tags/v3.5.0-RC1
> >> > >  (commit: 6de8828e4f3d5734d0a6f9c1be0aa7057cb60ac8)
> >> > >
> >> > > PGP key:
> >> > > http://pgp.mit.edu:11371/pks/lookup?op=vindex=t...@apache.org
> >> > > KEYS file:
> >> > > https://dist.apache.org/repos/dist/release/apex/KEYS
> >> > >
> >> > > More information at:
> >> > > http://apex.apache.org
> >> > >
> >> > > Please try the release and vote; vote will be open for 72 hours.
> >> > >
> >> > > [ ] +1 approve (and what verification was done)
> >> > > [ ] -1 disapprove (and reason why)
> >> > >
> >> > > http://www.apache.org/foundation/voting.html
> >> > >
> >> > > How to verify release candidate:
> >> > >
> >> > > http://apex.apache.org/verification.html
> >> > >
> >> > > Thanks,
> >> > > Thomas
> >> > >
> >> >
> >>
> >
> >
>


[jira] [Commented] (APEXMALHAR-2321) Improve Buckets memory management

2016-11-01 Thread Chandni Singh (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2321?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15626117#comment-15626117
 ] 

Chandni Singh commented on APEXMALHAR-2321:
---


>>AbstractManagedStateImpl.maxMemorySize probably will be misunderstand as the 
>>max memory size of total managed state, but in fact it was used as memory 
>>size of each bucket. Better to rename it.

This is incorrect. This setting is not used per bucket. Please look at lines 
94-98 in StateTracker. Bytes are summed over all buckets and then compared to 
maxMemorySize.

> Improve Buckets memory management
> -
>
> Key: APEXMALHAR-2321
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2321
> Project: Apache Apex Malhar
>  Issue Type: Improvement
>Reporter: bright chen
>Assignee: bright chen
>
> Currently buckets were managed as an array. Each bucket have memory 
> limitation, and free memory will be triggered if the bucket memory usage over 
> the limitation.
> - For ManagedTimeUnifiedStateImpl, the default bucket number is 345600, which 
> probably too large.
> - AbstractManagedStateImpl.maxMemorySize probably will be misunderstand as 
> the max memory size of total managed state, but in fact it was used as memory 
> size of each bucket. Better to rename it.
> - The default maxMemorySize is zero. It's better to give a default reasonable 
> value to avoid too much garbage collection 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (APEXMALHAR-2284) POJOInnerJoinOperatorTest fails in Travis CI

2016-10-19 Thread Chandni Singh (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2284?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15588941#comment-15588941
 ] 

Chandni Singh edited comment on APEXMALHAR-2284 at 10/19/16 2:36 PM:
-

[~bhup...@apache.org], [~chaithu] I disagree. There are ways that Spillable... 
can be enhanced to support event time. 
ManagedTimeStateMultiValue is not even a Managed State Implementation so don't 
really know why it is with other managed state implementations. 



was (Author: csingh):
[~bhup...@apache.org], [~chaithu] I disagree. There are ways that Spillable... 
be enhanced to support event time. 
ManagedTimeStateMultiValue is not even a Managed State Implementation so don't 
really know why it is with other managed state implementations. 


> POJOInnerJoinOperatorTest fails in Travis CI
> 
>
> Key: APEXMALHAR-2284
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2284
> Project: Apache Apex Malhar
>  Issue Type: Bug
>Reporter: Thomas Weise
>Assignee: Chaitanya
>Priority: Blocker
> Fix For: 3.6.0
>
>
> https://s3.amazonaws.com/archive.travis-ci.org/jobs/166322754/log.txt
> {code}
> Failed tests: 
>   POJOInnerJoinOperatorTest.testEmitMultipleTuplesFromStream2:337 Number of 
> tuple emitted  expected:<2> but was:<4>
>   POJOInnerJoinOperatorTest.testInnerJoinOperator:184 Number of tuple emitted 
>  expected:<1> but was:<2>
>   POJOInnerJoinOperatorTest.testMultipleValues:236 Number of tuple emitted  
> expected:<2> but was:<3>
>   POJOInnerJoinOperatorTest.testUpdateStream1Values:292 Number of tuple 
> emitted  expected:<1> but was:<2>
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (APEXMALHAR-2284) POJOInnerJoinOperatorTest fails in Travis CI

2016-10-19 Thread Chandni Singh (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2284?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15588941#comment-15588941
 ] 

Chandni Singh commented on APEXMALHAR-2284:
---

[~bhup...@apache.org], [~chaithu] I disagree. There are ways that Spillable... 
be enhanced to support event time. 
ManagedTimeStateMultiValue is not even a Managed State Implementation so don't 
really know why it is with other managed state implementations. 


> POJOInnerJoinOperatorTest fails in Travis CI
> 
>
> Key: APEXMALHAR-2284
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2284
> Project: Apache Apex Malhar
>  Issue Type: Bug
>Reporter: Thomas Weise
>Assignee: Chaitanya
>Priority: Blocker
> Fix For: 3.6.0
>
>
> https://s3.amazonaws.com/archive.travis-ci.org/jobs/166322754/log.txt
> {code}
> Failed tests: 
>   POJOInnerJoinOperatorTest.testEmitMultipleTuplesFromStream2:337 Number of 
> tuple emitted  expected:<2> but was:<4>
>   POJOInnerJoinOperatorTest.testInnerJoinOperator:184 Number of tuple emitted 
>  expected:<1> but was:<2>
>   POJOInnerJoinOperatorTest.testMultipleValues:236 Number of tuple emitted  
> expected:<2> but was:<3>
>   POJOInnerJoinOperatorTest.testUpdateStream1Values:292 Number of tuple 
> emitted  expected:<1> but was:<2>
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (APEXMALHAR-2299) TimeBasedDedupOperator throws exception during time bucket assignment in certain edge cases

2016-10-18 Thread Chandni Singh (JIRA)

 [ 
https://issues.apache.org/jira/browse/APEXMALHAR-2299?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chandni Singh resolved APEXMALHAR-2299.
---
Resolution: Fixed

> TimeBasedDedupOperator throws exception during time bucket assignment in 
> certain edge cases
> ---
>
> Key: APEXMALHAR-2299
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2299
> Project: Apache Apex Malhar
>  Issue Type: Bug
>Affects Versions: 3.5.0
>Reporter: Francis Fernandes
>Assignee: Francis Fernandes
> Fix For: 3.6.0
>
>   Original Estimate: 96h
>  Remaining Estimate: 96h
>
> The following exception is thrown under certain edge cases:
> {noformat}
> Stopped running due to an exception. java.lang.IllegalArgumentException: new 
> time bucket should have a value greater than the old time bucket
>   at 
> com.google.common.base.Preconditions.checkArgument(Preconditions.java:88)
>   at 
> org.apache.apex.malhar.lib.state.managed.ManagedTimeUnifiedStateImpl.handleBucketConflict(ManagedTimeUnifiedStateImpl.java:126)
>   at 
> org.apache.apex.malhar.lib.state.managed.AbstractManagedStateImpl.prepareBucket(AbstractManagedStateImpl.java:265)
>   at 
> org.apache.apex.malhar.lib.state.managed.AbstractManagedStateImpl.putInBucket(AbstractManagedStateImpl.java:276)
>   at 
> org.apache.apex.malhar.lib.state.managed.ManagedTimeUnifiedStateImpl.put(ManagedTimeUnifiedStateImpl.java:71)
>   at 
> org.apache.apex.malhar.lib.dedup.TimeBasedDedupOperator.putManagedState(TimeBasedDedupOperator.java:191)
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (APEXMALHAR-2284) POJOInnerJoinOperatorTest fails in Travis CI

2016-10-18 Thread Chandni Singh (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2284?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15585751#comment-15585751
 ] 

Chandni Singh edited comment on APEXMALHAR-2284 at 10/18/16 3:40 PM:
-

One concept that I think I should point is how getAsync improves performance is 
that if the value is not available immediately, then the client can cache the 
future and work on other keys. Look at the implementations of AbstractDeduper. 

1. If SpillableArrayListMultimap only lacks asynchronous get, then that can be 
added to it. That should not be the only reason to duplicate the functionality.

2. ManagedTimeMultiValueState has no tests and is full of warnings.

We should not create a complete separate implementation which duplicates a 
larger part of functionality provided by something existing otherwise we will 
have too many slightly different variations of the same thing and that will be 
very difficult to manage. 

IMO this is a good opportunity to improve SpillableArrayListMultimapImpl and 
use that.



was (Author: csingh):
One concept that I think I should point is that getAsync improves performance 
is that if the value is not available immediately, then the client can cache 
the future and work on other keys. Look at the implementations of 
AbstractDeduper. 

1. If SpillableArrayListMultimap only lacks asynchronous get, then that can be 
added to it. That should not be the only reason to duplicate the functionality.

2. ManagedTimeMultiValueState has no tests and is full of warnings.

We should not create a complete separate implementation which duplicates a 
larger part of functionality provided by something existing otherwise we will 
have too many slightly different variations of the same thing and that will be 
very difficult to manage. 

IMO this is a good opportunity to improve SpillableArrayListMultimapImpl and 
use that.


> POJOInnerJoinOperatorTest fails in Travis CI
> 
>
> Key: APEXMALHAR-2284
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2284
> Project: Apache Apex Malhar
>  Issue Type: Bug
>Reporter: Thomas Weise
>Assignee: Chaitanya
>Priority: Blocker
> Fix For: 3.6.0
>
>
> https://s3.amazonaws.com/archive.travis-ci.org/jobs/166322754/log.txt
> {code}
> Failed tests: 
>   POJOInnerJoinOperatorTest.testEmitMultipleTuplesFromStream2:337 Number of 
> tuple emitted  expected:<2> but was:<4>
>   POJOInnerJoinOperatorTest.testInnerJoinOperator:184 Number of tuple emitted 
>  expected:<1> but was:<2>
>   POJOInnerJoinOperatorTest.testMultipleValues:236 Number of tuple emitted  
> expected:<2> but was:<3>
>   POJOInnerJoinOperatorTest.testUpdateStream1Values:292 Number of tuple 
> emitted  expected:<1> but was:<2>
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (APEXMALHAR-2284) POJOInnerJoinOperatorTest fails in Travis CI

2016-10-18 Thread Chandni Singh (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2284?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15585751#comment-15585751
 ] 

Chandni Singh commented on APEXMALHAR-2284:
---

One concept that I think I should point is that getAsync improves performance 
is that if the value is not available immediately, then the client can cache 
the future and work on other keys. Look at the implementations of 
AbstractDeduper. 

1. If SpillableArrayListMultimap only lacks asynchronous get, then that can be 
added to it. That should not be the only reason to duplicate the functionality.

2. ManagedTimeMultiValueState has no tests and is full of warnings.

We should not create a complete separate implementation which duplicates a 
larger part of functionality provided by something existing otherwise we will 
have too many slightly different variations of the same thing and that will be 
very difficult to manage. 

IMO this is a good opportunity to improve SpillableArrayListMultimapImpl and 
use that.


> POJOInnerJoinOperatorTest fails in Travis CI
> 
>
> Key: APEXMALHAR-2284
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2284
> Project: Apache Apex Malhar
>  Issue Type: Bug
>Reporter: Thomas Weise
>Assignee: Chaitanya
>Priority: Blocker
> Fix For: 3.6.0
>
>
> https://s3.amazonaws.com/archive.travis-ci.org/jobs/166322754/log.txt
> {code}
> Failed tests: 
>   POJOInnerJoinOperatorTest.testEmitMultipleTuplesFromStream2:337 Number of 
> tuple emitted  expected:<2> but was:<4>
>   POJOInnerJoinOperatorTest.testInnerJoinOperator:184 Number of tuple emitted 
>  expected:<1> but was:<2>
>   POJOInnerJoinOperatorTest.testMultipleValues:236 Number of tuple emitted  
> expected:<2> but was:<3>
>   POJOInnerJoinOperatorTest.testUpdateStream1Values:292 Number of tuple 
> emitted  expected:<1> but was:<2>
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (APEXMALHAR-2284) POJOInnerJoinOperatorTest fails in Travis CI

2016-10-17 Thread Chandni Singh (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2284?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15582672#comment-15582672
 ] 

Chandni Singh commented on APEXMALHAR-2284:
---

Can you look into using an implementation of SpillableArrayListMultimap 
directly?

Initially I think that is what was discussed to use for join.

> POJOInnerJoinOperatorTest fails in Travis CI
> 
>
> Key: APEXMALHAR-2284
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2284
> Project: Apache Apex Malhar
>  Issue Type: Bug
>Reporter: Thomas Weise
>Assignee: Chaitanya
>Priority: Blocker
> Fix For: 3.6.0
>
>
> https://s3.amazonaws.com/archive.travis-ci.org/jobs/166322754/log.txt
> {code}
> Failed tests: 
>   POJOInnerJoinOperatorTest.testEmitMultipleTuplesFromStream2:337 Number of 
> tuple emitted  expected:<2> but was:<4>
>   POJOInnerJoinOperatorTest.testInnerJoinOperator:184 Number of tuple emitted 
>  expected:<1> but was:<2>
>   POJOInnerJoinOperatorTest.testMultipleValues:236 Number of tuple emitted  
> expected:<2> but was:<3>
>   POJOInnerJoinOperatorTest.testUpdateStream1Values:292 Number of tuple 
> emitted  expected:<1> but was:<2>
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (APEXMALHAR-2284) POJOInnerJoinOperatorTest fails in Travis CI

2016-10-17 Thread Chandni Singh (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2284?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15582633#comment-15582633
 ] 

Chandni Singh commented on APEXMALHAR-2284:
---

I don't understand what constraint you are talking about.  Consider any 
concurrent data structure like  a concurrent hashmap.  If you do a simultaneous 
put and get,  you don't what value you get from the get call- it  can  be the 
old value or the new one. 

> POJOInnerJoinOperatorTest fails in Travis CI
> 
>
> Key: APEXMALHAR-2284
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2284
> Project: Apache Apex Malhar
>  Issue Type: Bug
>Reporter: Thomas Weise
>Assignee: Chaitanya
>Priority: Blocker
> Fix For: 3.6.0
>
>
> https://s3.amazonaws.com/archive.travis-ci.org/jobs/166322754/log.txt
> {code}
> Failed tests: 
>   POJOInnerJoinOperatorTest.testEmitMultipleTuplesFromStream2:337 Number of 
> tuple emitted  expected:<2> but was:<4>
>   POJOInnerJoinOperatorTest.testInnerJoinOperator:184 Number of tuple emitted 
>  expected:<1> but was:<2>
>   POJOInnerJoinOperatorTest.testMultipleValues:236 Number of tuple emitted  
> expected:<2> but was:<3>
>   POJOInnerJoinOperatorTest.testUpdateStream1Values:292 Number of tuple 
> emitted  expected:<1> but was:<2>
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (APEXMALHAR-2284) POJOInnerJoinOperatorTest fails in Travis CI

2016-10-13 Thread Chandni Singh (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2284?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15572183#comment-15572183
 ] 

Chandni Singh commented on APEXMALHAR-2284:
---

I think the proper fix for this may take time. IMO we should disable 
POJOInnerJoinOperatorTest but keep the JIRA open.


> POJOInnerJoinOperatorTest fails in Travis CI
> 
>
> Key: APEXMALHAR-2284
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2284
> Project: Apache Apex Malhar
>  Issue Type: Bug
>Reporter: Thomas Weise
>Assignee: Chaitanya
>Priority: Blocker
>
> https://s3.amazonaws.com/archive.travis-ci.org/jobs/166322754/log.txt
> {code}
> Failed tests: 
>   POJOInnerJoinOperatorTest.testEmitMultipleTuplesFromStream2:337 Number of 
> tuple emitted  expected:<2> but was:<4>
>   POJOInnerJoinOperatorTest.testInnerJoinOperator:184 Number of tuple emitted 
>  expected:<1> but was:<2>
>   POJOInnerJoinOperatorTest.testMultipleValues:236 Number of tuple emitted  
> expected:<2> but was:<3>
>   POJOInnerJoinOperatorTest.testUpdateStream1Values:292 Number of tuple 
> emitted  expected:<1> but was:<2>
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (APEXMALHAR-2284) POJOInnerJoinOperatorTest fails in Travis CI

2016-10-13 Thread Chandni Singh (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2284?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15572169#comment-15572169
 ] 

Chandni Singh commented on APEXMALHAR-2284:
---

1.  put needs to done only after you retrieve the value from getAsync(k1).  You 
may have to wait for the future to finish. 

2. it is an async call back and tries to retrieve the latest value.  Whatever 
is put in the bucket is the latest. So if you are doing a put, without 
completing get then it is a problem.


> POJOInnerJoinOperatorTest fails in Travis CI
> 
>
> Key: APEXMALHAR-2284
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2284
> Project: Apache Apex Malhar
>  Issue Type: Bug
>Reporter: Thomas Weise
>Assignee: Chaitanya
>Priority: Blocker
>
> https://s3.amazonaws.com/archive.travis-ci.org/jobs/166322754/log.txt
> {code}
> Failed tests: 
>   POJOInnerJoinOperatorTest.testEmitMultipleTuplesFromStream2:337 Number of 
> tuple emitted  expected:<2> but was:<4>
>   POJOInnerJoinOperatorTest.testInnerJoinOperator:184 Number of tuple emitted 
>  expected:<1> but was:<2>
>   POJOInnerJoinOperatorTest.testMultipleValues:236 Number of tuple emitted  
> expected:<2> but was:<3>
>   POJOInnerJoinOperatorTest.testUpdateStream1Values:292 Number of tuple 
> emitted  expected:<1> but was:<2>
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (APEXMALHAR-2284) POJOInnerJoinOperatorTest fails in Travis CI

2016-10-13 Thread Chandni Singh (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2284?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15571119#comment-15571119
 ] 

Chandni Singh edited comment on APEXMALHAR-2284 at 10/13/16 7:27 AM:
-

Chaitanya,

Please feel free to assign the ticket to me if my PR broke these tests. Having 
a variable {code}asyncReadSource{code} didn't really make much sense to me. 
Changing how reader thread fetch value from a bucket doesn't need to be exposed 
and configured.  
Anyways if this was done for tests, then there are better ways of doing it. 




was (Author: csingh):
Chaitanya,

Please feel free to assign the ticket to me if my PR broke these tests. Having 
a variable {code}asyncReadSource{code} didn't really make much sense to me. 
Changing how reader thread fetch value from a bucket doesn't need to be 
altered.  
Anyways if this was done for tests, then there are better ways of doing it. 



> POJOInnerJoinOperatorTest fails in Travis CI
> 
>
> Key: APEXMALHAR-2284
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2284
> Project: Apache Apex Malhar
>  Issue Type: Bug
>Reporter: Thomas Weise
>Assignee: Chaitanya
>Priority: Blocker
>
> https://s3.amazonaws.com/archive.travis-ci.org/jobs/166322754/log.txt
> {code}
> Failed tests: 
>   POJOInnerJoinOperatorTest.testEmitMultipleTuplesFromStream2:337 Number of 
> tuple emitted  expected:<2> but was:<4>
>   POJOInnerJoinOperatorTest.testInnerJoinOperator:184 Number of tuple emitted 
>  expected:<1> but was:<2>
>   POJOInnerJoinOperatorTest.testMultipleValues:236 Number of tuple emitted  
> expected:<2> but was:<3>
>   POJOInnerJoinOperatorTest.testUpdateStream1Values:292 Number of tuple 
> emitted  expected:<1> but was:<2>
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (APEXMALHAR-2284) POJOInnerJoinOperatorTest fails in Travis CI

2016-10-13 Thread Chandni Singh (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2284?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15571119#comment-15571119
 ] 

Chandni Singh commented on APEXMALHAR-2284:
---

Chaitanya,

Please feel free to assign the ticket to me if my PR broke these tests. Having 
a variable {code}asyncReadSource{code} didn't really make much sense to me. 
Changing how reader thread fetch value from a bucket doesn't need to be 
altered.  
Anyways if this was done for tests, then there are better ways of doing it. 



> POJOInnerJoinOperatorTest fails in Travis CI
> 
>
> Key: APEXMALHAR-2284
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2284
> Project: Apache Apex Malhar
>  Issue Type: Bug
>Reporter: Thomas Weise
>Assignee: Chaitanya
>Priority: Blocker
>
> https://s3.amazonaws.com/archive.travis-ci.org/jobs/166322754/log.txt
> {code}
> Failed tests: 
>   POJOInnerJoinOperatorTest.testEmitMultipleTuplesFromStream2:337 Number of 
> tuple emitted  expected:<2> but was:<4>
>   POJOInnerJoinOperatorTest.testInnerJoinOperator:184 Number of tuple emitted 
>  expected:<1> but was:<2>
>   POJOInnerJoinOperatorTest.testMultipleValues:236 Number of tuple emitted  
> expected:<2> but was:<3>
>   POJOInnerJoinOperatorTest.testUpdateStream1Values:292 Number of tuple 
> emitted  expected:<1> but was:<2>
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (APEXMALHAR-2284) POJOInnerJoinOperatorTest fails in Travis CI

2016-10-13 Thread Chandni Singh (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2284?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15571074#comment-15571074
 ] 

Chandni Singh edited comment on APEXMALHAR-2284 at 10/13/16 6:57 AM:
-

Are you sure about that?

The tests were failing before merging the PR on a different PR as well, if I 
remember correctly.

Also why do they fail intermittently?


was (Author: csingh):
Are you sure about that?

The tests were failing before merging the PR on a different PR as well, if I 
remember correctly?

Also why do they fail intermittently?

> POJOInnerJoinOperatorTest fails in Travis CI
> 
>
> Key: APEXMALHAR-2284
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2284
> Project: Apache Apex Malhar
>  Issue Type: Bug
>Reporter: Thomas Weise
>Assignee: Chaitanya
>Priority: Blocker
>
> https://s3.amazonaws.com/archive.travis-ci.org/jobs/166322754/log.txt
> {code}
> Failed tests: 
>   POJOInnerJoinOperatorTest.testEmitMultipleTuplesFromStream2:337 Number of 
> tuple emitted  expected:<2> but was:<4>
>   POJOInnerJoinOperatorTest.testInnerJoinOperator:184 Number of tuple emitted 
>  expected:<1> but was:<2>
>   POJOInnerJoinOperatorTest.testMultipleValues:236 Number of tuple emitted  
> expected:<2> but was:<3>
>   POJOInnerJoinOperatorTest.testUpdateStream1Values:292 Number of tuple 
> emitted  expected:<1> but was:<2>
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (APEXMALHAR-2284) POJOInnerJoinOperatorTest fails in Travis CI

2016-10-13 Thread Chandni Singh (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2284?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15571074#comment-15571074
 ] 

Chandni Singh commented on APEXMALHAR-2284:
---

Are you sure about that?

The tests were failing before merging the PR on a different PR as well, if I 
remember correctly?

Also why do they fail intermittently?

> POJOInnerJoinOperatorTest fails in Travis CI
> 
>
> Key: APEXMALHAR-2284
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2284
> Project: Apache Apex Malhar
>  Issue Type: Bug
>Reporter: Thomas Weise
>Assignee: Chaitanya
>Priority: Blocker
>
> https://s3.amazonaws.com/archive.travis-ci.org/jobs/166322754/log.txt
> {code}
> Failed tests: 
>   POJOInnerJoinOperatorTest.testEmitMultipleTuplesFromStream2:337 Number of 
> tuple emitted  expected:<2> but was:<4>
>   POJOInnerJoinOperatorTest.testInnerJoinOperator:184 Number of tuple emitted 
>  expected:<1> but was:<2>
>   POJOInnerJoinOperatorTest.testMultipleValues:236 Number of tuple emitted  
> expected:<2> but was:<3>
>   POJOInnerJoinOperatorTest.testUpdateStream1Values:292 Number of tuple 
> emitted  expected:<1> but was:<2>
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (APEXMALHAR-2285) Add a property to TimeBasedDirectoryScanner in FileSplitterInput that ignores missing directories instead of propagating exception

2016-10-09 Thread Chandni Singh (JIRA)
Chandni Singh created APEXMALHAR-2285:
-

 Summary: Add a property to TimeBasedDirectoryScanner in 
FileSplitterInput that ignores missing directories instead of propagating 
exception
 Key: APEXMALHAR-2285
 URL: https://issues.apache.org/jira/browse/APEXMALHAR-2285
 Project: Apache Apex Malhar
  Issue Type: Improvement
Reporter: Chandni Singh


The {code}TimeBasedDirectoryScanner{code} in {code}FileSplitterInput{code} 
fails the container if it is unable to open a directory (list all the paths in 
the directory). 
Some use cases may want to ignore missing directories, so we should be able to 
control it using a property.
This was discussed here:
https://github.com/apache/apex-malhar/pull/446



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (APEXMALHAR-1852) File Splitter Test Failing On My Machine

2016-10-09 Thread Chandni Singh (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-1852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15560419#comment-15560419
 ] 

Chandni Singh commented on APEXMALHAR-1852:
---

Found the problem with the test.

There are 12 test files being created file 0 -11. 
In the file content lines that get added are f{file-num}l{line-num}, where 0 >= 
line-num < 2.
Files 0 - 9 are smaller than files 10 - 11 because the file-num in the content 
is just a single digit. This results in different number of blocks.
The test relied on a file having 5 blocks, but instead it sometimes picks a 
file of 12 blocks. 
So fix is simple- made every file have 6 blocks and modified test

There was a race condition with testRecursive(). Fixed that too.

> File Splitter Test Failing On My Machine
> 
>
> Key: APEXMALHAR-1852
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-1852
> Project: Apache Apex Malhar
>  Issue Type: Bug
>Reporter: Timothy Farkas
>    Assignee: Chandni Singh
>
> FileSplitterInputTest.testRecoveryOfPartialFile:408 New file expected:<1> but 
> was:<0>



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (APEXMALHAR-1852) File Splitter Test Failing On My Machine

2016-10-07 Thread Chandni Singh (JIRA)

 [ 
https://issues.apache.org/jira/browse/APEXMALHAR-1852?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chandni Singh reassigned APEXMALHAR-1852:
-

Assignee: Chandni Singh

> File Splitter Test Failing On My Machine
> 
>
> Key: APEXMALHAR-1852
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-1852
> Project: Apache Apex Malhar
>  Issue Type: Bug
>Reporter: Timothy Farkas
>    Assignee: Chandni Singh
>
> FileSplitterInputTest.testRecoveryOfPartialFile:408 New file expected:<1> but 
> was:<0>



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (APEXMALHAR-2281) ManagedState: race condition with put & asyncGet

2016-10-06 Thread Chandni Singh (JIRA)

 [ 
https://issues.apache.org/jira/browse/APEXMALHAR-2281?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chandni Singh updated APEXMALHAR-2281:
--
Summary: ManagedState: race condition with put & asyncGet  (was: 
ManagedState: race condition with put & asyncGet and bucket size doesn't 
account for entries in file cache)

> ManagedState: race condition with put & asyncGet
> 
>
> Key: APEXMALHAR-2281
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2281
> Project: Apache Apex Malhar
>  Issue Type: Bug
>Reporter: Chandni Singh
>Assignee: Chandni Singh
>
> There are 2 bugs:
> 1. Race condition with put and asyncGet. put(...) inserts a bucketedValue in 
> the flash with value = null first. It sets the value afterwords. There is a 
> possibility that the reader thread will get BucketedValue from flash and read 
> the value as null.
> 2. Bucket size doesn't account for entries in fileCache



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (APEXMALHAR-2281) ManagedState: race condition with put & asyncGet

2016-10-06 Thread Chandni Singh (JIRA)

 [ 
https://issues.apache.org/jira/browse/APEXMALHAR-2281?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chandni Singh updated APEXMALHAR-2281:
--
Description: Race condition with put and asyncGet. put(...) inserts a 
bucketedValue in the flash with value = null first. It sets the value 
afterwords. There is a possibility that the reader thread will get 
BucketedValue from flash and read the value as null.  (was: There are 2 bugs:
1. Race condition with put and asyncGet. put(...) inserts a bucketedValue in 
the flash with value = null first. It sets the value afterwords. There is a 
possibility that the reader thread will get BucketedValue from flash and read 
the value as null.

2. Bucket size doesn't account for entries in fileCache
)

> ManagedState: race condition with put & asyncGet
> 
>
> Key: APEXMALHAR-2281
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2281
> Project: Apache Apex Malhar
>  Issue Type: Bug
>    Reporter: Chandni Singh
>Assignee: Chandni Singh
>
> Race condition with put and asyncGet. put(...) inserts a bucketedValue in the 
> flash with value = null first. It sets the value afterwords. There is a 
> possibility that the reader thread will get BucketedValue from flash and read 
> the value as null.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (APEXMALHAR-2276) ManagedState: value of a key does not get over-written in the same time bucket

2016-10-06 Thread Chandni Singh (JIRA)

 [ 
https://issues.apache.org/jira/browse/APEXMALHAR-2276?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chandni Singh resolved APEXMALHAR-2276.
---
Resolution: Fixed

> ManagedState: value of a key does not get over-written in the same time bucket
> --
>
> Key: APEXMALHAR-2276
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2276
> Project: Apache Apex Malhar
>  Issue Type: Bug
>Reporter: Siyuan Hua
>    Assignee: Chandni Singh
> Fix For: 3.6.0
>
>
> For example:
> ManagedTimeUnifiedStateImpl mtus;
> mtus.put(1, key1, val1)
> mtus.put(1, key1, val2)
> mtus.get(1, key1).equals(val2) will return false



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (APEXMALHAR-2281) ManagedState: race condition with put & asyncGet and bucket size doesn't account for entries in file cache

2016-10-06 Thread Chandni Singh (JIRA)

 [ 
https://issues.apache.org/jira/browse/APEXMALHAR-2281?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chandni Singh updated APEXMALHAR-2281:
--
Description: 
There are 2 bugs:
1. Race condition with put and asyncGet. put(...) inserts a bucketedValue in 
the flash with value = null first. It sets the value afterwords. There is a 
possibility that the reader thread will get BucketedValue from flash and read 
the value as null.

2. Bucket size doesn't account for entries in fileCache


  was:
There are 2 bugs:
1. Race condition with put and asyncGet. put(...) inserts a bucketedValue in 
the flash with value = null first. It sets the value afterwords. There is a 
possibility that the reader thread will get BucketedValue from flash and read 
the value as null.



> ManagedState: race condition with put & asyncGet and bucket size doesn't 
> account for entries in file cache
> --
>
> Key: APEXMALHAR-2281
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2281
> Project: Apache Apex Malhar
>  Issue Type: Bug
>Reporter: Chandni Singh
>    Assignee: Chandni Singh
>
> There are 2 bugs:
> 1. Race condition with put and asyncGet. put(...) inserts a bucketedValue in 
> the flash with value = null first. It sets the value afterwords. There is a 
> possibility that the reader thread will get BucketedValue from flash and read 
> the value as null.
> 2. Bucket size doesn't account for entries in fileCache



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (APEXMALHAR-2281) ManagedState: race condition with put & asyncGet and bucket size doesn't account for entries in file cache

2016-10-06 Thread Chandni Singh (JIRA)
Chandni Singh created APEXMALHAR-2281:
-

 Summary: ManagedState: race condition with put & asyncGet and 
bucket size doesn't account for entries in file cache
 Key: APEXMALHAR-2281
 URL: https://issues.apache.org/jira/browse/APEXMALHAR-2281
 Project: Apache Apex Malhar
  Issue Type: Bug
Reporter: Chandni Singh
Assignee: Chandni Singh


There are 2 bugs:
1. Race condition with put and asyncGet. put(...) inserts a bucketedValue in 
the flash with value = null first. It sets the value afterwords. There is a 
possibility that the reader thread will get BucketedValue from flash and read 
the value as null.




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (APEXMALHAR-2276) ManagedState: value of a key does not get over-written in the same time bucket

2016-10-06 Thread Chandni Singh (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2276?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15552532#comment-15552532
 ] 

Chandni Singh edited comment on APEXMALHAR-2276 at 10/6/16 5:28 PM:


I don't see a problem. In Scenario A, it is evident right at the time of 
insertion that update is old and is ignored.
In Scenario B, it is unknown at the time of insertion that update is old.

Which is why I mentioned, that the client needs to do a get() and then put(). 


was (Author: csingh):
I don't see a problem. In Scenario A, it is evident right at the time of 
insertion that update is old and is ignored.
In Scenario B, it is unknown at the time of insertion that update is old.

Which is why I mentioned, that the client needs to do a get() and then put(). 

ManagedStateImpl and ManagedTimeStateImpl are not for maintaining time series 
in memory. 

> ManagedState: value of a key does not get over-written in the same time bucket
> --
>
> Key: APEXMALHAR-2276
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2276
> Project: Apache Apex Malhar
>  Issue Type: Bug
>Reporter: Siyuan Hua
>    Assignee: Chandni Singh
> Fix For: 3.6.0
>
>
> For example:
> ManagedTimeUnifiedStateImpl mtus;
> mtus.put(1, key1, val1)
> mtus.put(1, key1, val2)
> mtus.get(1, key1).equals(val2) will return false



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (APEXMALHAR-2276) ManagedState: value of a key does not get over-written in the same time bucket

2016-10-06 Thread Chandni Singh (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2276?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15552532#comment-15552532
 ] 

Chandni Singh commented on APEXMALHAR-2276:
---

I don't see a problem. In Scenario A, it is evident right at the time of 
insertion that update is old and is ignored.
In Scenario B, it is unknown at the time of insertion that update is old.

Which is why I mentioned, that the client needs to do a get() and then put(). 

ManagedStateImpl and ManagedTimeStateImpl are not for maintaining time series 
in memory. 

> ManagedState: value of a key does not get over-written in the same time bucket
> --
>
> Key: APEXMALHAR-2276
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2276
> Project: Apache Apex Malhar
>  Issue Type: Bug
>Reporter: Siyuan Hua
>    Assignee: Chandni Singh
> Fix For: 3.6.0
>
>
> For example:
> ManagedTimeUnifiedStateImpl mtus;
> mtus.put(1, key1, val1)
> mtus.put(1, key1, val2)
> mtus.get(1, key1).equals(val2) will return false



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (APEXMALHAR-2276) ManagedState: value of a key does not get over-written in the same time bucket

2016-10-05 Thread Chandni Singh (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2276?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15550649#comment-15550649
 ] 

Chandni Singh commented on APEXMALHAR-2276:
---

I agree with [~thw] too. This creates un-necessary overhead for every use case. 
 Additional space for timestamp is required per key both in memory and disk. 
While making the changes though, I discovered a race condition and an issue 
with free space. I will create another ticket for that.

> ManagedState: value of a key does not get over-written in the same time bucket
> --
>
> Key: APEXMALHAR-2276
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2276
> Project: Apache Apex Malhar
>  Issue Type: Bug
>Reporter: Siyuan Hua
>    Assignee: Chandni Singh
> Fix For: 3.6.0
>
>
> For example:
> ManagedTimeUnifiedStateImpl mtus;
> mtus.put(1, key1, val1)
> mtus.put(1, key1, val2)
> mtus.get(1, key1).equals(val2) will return false



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Test coverage tool

2016-10-05 Thread Chandni Singh
Hi,

Tim and I spoke sometime back that it will be helpful if we can have a test
coverage tool which flags a pull request if it reduces the test coverage
percentage than the previous build.

Recently while making improvements to ManagedState (APEXMALHAR-2223
), I came across a
class 'ManagedTimeStateMultiValue'. This class has no tests. Some part of
this class is tested under join operator but the majority of it is not
covered by unit tests.

This is just an example and apologies for highlighting it here but I think
having a test coverage tool whose results are compared with the last build
can help us address these issues promptly.

Let me know what everyone thinks and I can look into some plugins that we
can use.

Thanks,
Chandni


[jira] [Commented] (APEXMALHAR-2276) ManagedState: value of a key does not get over-written in the same time bucket

2016-10-04 Thread Chandni Singh (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2276?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15546623#comment-15546623
 ] 

Chandni Singh commented on APEXMALHAR-2276:
---

Yes time gets mapped to a time bucket, however the exact value of time is lost 
and currently we cannot derive time from the timebucket. 

This affects cases where we want to save the most recent value of a key. In 
use-cases, like de-duplication/aggregation this is not the case.

In De-duplication we check whether there is a key present and drop the event if 
its duplicate. 
In Aggregation, we use the existing value in the time-bucket and aggregate it 
with new value.


> ManagedState: value of a key does not get over-written in the same time bucket
> --
>
> Key: APEXMALHAR-2276
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2276
> Project: Apache Apex Malhar
>  Issue Type: Bug
>Reporter: Siyuan Hua
>    Assignee: Chandni Singh
> Fix For: 3.6.0
>
>
> For example:
> ManagedTimeUnifiedStateImpl mtus;
> mtus.put(1, key1, val1)
> mtus.put(1, key1, val2)
> mtus.get(1, key1).equals(val2) will return false



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Issue Comment Deleted] (APEXMALHAR-2276) ManagedState: value of a key does not get over-written in the same time bucket

2016-10-04 Thread Chandni Singh (JIRA)

 [ 
https://issues.apache.org/jira/browse/APEXMALHAR-2276?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chandni Singh updated APEXMALHAR-2276:
--
Comment: was deleted

(was: Yes time gets mapped to a time bucket, however the exact value of time is 
lost and currently we cannot derive time from the timebucket. 

This affects cases where we want to save the most recent value of a key. In 
use-cases, like de-duplication/aggregation this is not the case.

In De-duplication we check whether there is a key present and drop the event if 
its duplicate. 
In Aggregation, we use the existing value in the time-bucket and aggregate it 
with new value.
)

> ManagedState: value of a key does not get over-written in the same time bucket
> --
>
> Key: APEXMALHAR-2276
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2276
> Project: Apache Apex Malhar
>  Issue Type: Bug
>Reporter: Siyuan Hua
>    Assignee: Chandni Singh
> Fix For: 3.6.0
>
>
> For example:
> ManagedTimeUnifiedStateImpl mtus;
> mtus.put(1, key1, val1)
> mtus.put(1, key1, val2)
> mtus.get(1, key1).equals(val2) will return false



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (APEXMALHAR-2276) ManagedState: value of a key does not get over-written in the same time bucket

2016-10-04 Thread Chandni Singh (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2276?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15546592#comment-15546592
 ] 

Chandni Singh commented on APEXMALHAR-2276:
---

As [~chaithu] pointed out, we need to compare time and not time buckets. 
The fix for that will not be simple. For each key/value now we have to remember 
the latest time. So far bucket didn't have reference to time which means the 
bucket API will probably change as well.


> ManagedState: value of a key does not get over-written in the same time bucket
> --
>
> Key: APEXMALHAR-2276
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2276
> Project: Apache Apex Malhar
>  Issue Type: Bug
>Reporter: Siyuan Hua
>    Assignee: Chandni Singh
> Fix For: 3.6.0
>
>
> For example:
> ManagedTimeUnifiedStateImpl mtus;
> mtus.put(1, key1, val1)
> mtus.put(1, key1, val2)
> mtus.get(1, key1).equals(val2) will return false



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Reopened] (APEXMALHAR-2276) ManagedState: value of a key does not get over-written in the same time bucket

2016-10-04 Thread Chandni Singh (JIRA)

 [ 
https://issues.apache.org/jira/browse/APEXMALHAR-2276?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chandni Singh reopened APEXMALHAR-2276:
---

> ManagedState: value of a key does not get over-written in the same time bucket
> --
>
> Key: APEXMALHAR-2276
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2276
> Project: Apache Apex Malhar
>  Issue Type: Bug
>Reporter: Siyuan Hua
>    Assignee: Chandni Singh
> Fix For: 3.6.0
>
>
> For example:
> ManagedTimeUnifiedStateImpl mtus;
> mtus.put(1, key1, val1)
> mtus.put(1, key1, val2)
> mtus.get(1, key1).equals(val2) will return false



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (APEXMALHAR-2223) Managed state should parallelize WAL writes

2016-09-30 Thread Chandni Singh (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2223?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15537595#comment-15537595
 ] 

Chandni Singh commented on APEXMALHAR-2223:
---

IncrementalCheckpointManager used to extend FSWindowDataManager but this change 
requires saving multiple artifacts per window and the api of 
IncrementalCheckpointManager was quite different from FSWindowDataManager. So 
abstracted out the common code to `AbstractFSWindowStateManager`.

> Managed state should parallelize WAL writes
> ---
>
> Key: APEXMALHAR-2223
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2223
> Project: Apache Apex Malhar
>  Issue Type: Improvement
>Affects Versions: 3.4.0
>Reporter: Thomas Weise
>    Assignee: Chandni Singh
>
> Currently, data is accumulated in memory and written to the WAL on checkpoint 
> only. This causes a write spike on checkpoint and does not utilize the HDFS 
> write pipeline. The other extreme is writing to the WAL as soon as data 
> arrives and then only flush in beforeCheckpoint. The downside of this is that 
> when the same key is written many times, all duplicates will be in the WAL. 
> Need to find a balances approach, that the user can potentially fine tune. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (APEXMALHAR-2276) ManagedState: value of a key does not get over-written in the same time bucket

2016-09-30 Thread Chandni Singh (JIRA)

 [ 
https://issues.apache.org/jira/browse/APEXMALHAR-2276?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chandni Singh updated APEXMALHAR-2276:
--
Summary: ManagedState: value of a key does not get over-written in the same 
time bucket  (was: ManagedTimeUnifiedStateImpl: value of a key does not get 
over-written in the same time bucket)

> ManagedState: value of a key does not get over-written in the same time bucket
> --
>
> Key: APEXMALHAR-2276
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2276
> Project: Apache Apex Malhar
>  Issue Type: Bug
>Reporter: Siyuan Hua
>    Assignee: Chandni Singh
>
> For example:
> ManagedTimeUnifiedStateImpl mtus;
> mtus.put(1, key1, val1)
> mtus.put(1, key1, val2)
> mtus.get(1, key1).equals(val2) will return false



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (APEXMALHAR-2276) ManagedTimeUnifiedStateImpl: value of a key does not get over-written in the same time bucket

2016-09-30 Thread Chandni Singh (JIRA)

 [ 
https://issues.apache.org/jira/browse/APEXMALHAR-2276?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chandni Singh updated APEXMALHAR-2276:
--
Summary: ManagedTimeUnifiedStateImpl: value of a key does not get 
over-written in the same time bucket  (was: ManagedTimeUnifiedStateImpl doesn't 
support time events properly)

> ManagedTimeUnifiedStateImpl: value of a key does not get over-written in the 
> same time bucket
> -
>
> Key: APEXMALHAR-2276
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2276
> Project: Apache Apex Malhar
>  Issue Type: Bug
>Reporter: Siyuan Hua
>    Assignee: Chandni Singh
>
> For example:
> ManagedTimeUnifiedStateImpl mtus;
> mtus.put(1, key1, val1)
> mtus.put(1, key1, val2)
> mtus.get(1, key1).equals(val2) will return false



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (APEXMALHAR-2252) Document AbstractManagedStateImpl subclasses

2016-09-20 Thread Chandni Singh (JIRA)

 [ 
https://issues.apache.org/jira/browse/APEXMALHAR-2252?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chandni Singh reassigned APEXMALHAR-2252:
-

Assignee: Chandni Singh

> Document AbstractManagedStateImpl subclasses
> 
>
> Key: APEXMALHAR-2252
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2252
> Project: Apache Apex Malhar
>  Issue Type: Task
>Reporter: Thomas Weise
>    Assignee: Chandni Singh
>
> For the user the time bucketing concept is very important to understand. We 
> should highlight the difference in the documentation and also consider 
> renaming the subclasses for clarify.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (APEXMALHAR-2223) Managed state should parallelize WAL writes

2016-09-16 Thread Chandni Singh (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2223?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15496811#comment-15496811
 ] 

Chandni Singh commented on APEXMALHAR-2223:
---

The reason to delay to endWindow is that with every event will have to compare 
the bucket size with the threshold. This comparison, though not very expensive, 
can be done at intervals because it will take some time to reach the threshold. 
The interval of an application window seemed fair to me. 


When buffer of the outputStream is full, data is automatically flushed. 
Explicitly calling flush forces any buffered data so for us this forced flush 
can be invoked in beforeCheckpoint(). We do not need to call it every 
endWindow. This component relies on checkpoint state, that is, the wal is 
truncated to the offset saved in the state after failures. 

> Managed state should parallelize WAL writes
> ---
>
> Key: APEXMALHAR-2223
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2223
> Project: Apache Apex Malhar
>  Issue Type: Improvement
>Affects Versions: 3.4.0
>Reporter: Thomas Weise
>    Assignee: Chandni Singh
>
> Currently, data is accumulated in memory and written to the WAL on checkpoint 
> only. This causes a write spike on checkpoint and does not utilize the HDFS 
> write pipeline. The other extreme is writing to the WAL as soon as data 
> arrives and then only flush in beforeCheckpoint. The downside of this is that 
> when the same key is written many times, all duplicates will be in the WAL. 
> Need to find a balances approach, that the user can potentially fine tune. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (APEXMALHAR-2223) Managed state should parallelize WAL writes

2016-09-16 Thread Chandni Singh (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2223?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15496811#comment-15496811
 ] 

Chandni Singh edited comment on APEXMALHAR-2223 at 9/16/16 4:59 PM:


The reason to delay to endWindow is that otherwise with every event will have 
to compare the bucket size with the threshold. This comparison, though not very 
expensive, can be done at intervals because it will take some time to reach the 
threshold. The interval of an application window seemed fair to me. 


When buffer of the outputStream is full, data is automatically flushed. 
Explicitly calling flush forces any buffered data so for us this forced flush 
can be invoked in beforeCheckpoint(). We do not need to call it every 
endWindow. This component relies on checkpoint state, that is, the wal is 
truncated to the offset saved in the state after failures. 


was (Author: csingh):
The reason to delay to endWindow is that with every event will have to compare 
the bucket size with the threshold. This comparison, though not very expensive, 
can be done at intervals because it will take some time to reach the threshold. 
The interval of an application window seemed fair to me. 


When buffer of the outputStream is full, data is automatically flushed. 
Explicitly calling flush forces any buffered data so for us this forced flush 
can be invoked in beforeCheckpoint(). We do not need to call it every 
endWindow. This component relies on checkpoint state, that is, the wal is 
truncated to the offset saved in the state after failures. 

> Managed state should parallelize WAL writes
> ---
>
> Key: APEXMALHAR-2223
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2223
> Project: Apache Apex Malhar
>  Issue Type: Improvement
>Affects Versions: 3.4.0
>Reporter: Thomas Weise
>    Assignee: Chandni Singh
>
> Currently, data is accumulated in memory and written to the WAL on checkpoint 
> only. This causes a write spike on checkpoint and does not utilize the HDFS 
> write pipeline. The other extreme is writing to the WAL as soon as data 
> arrives and then only flush in beforeCheckpoint. The downside of this is that 
> when the same key is written many times, all duplicates will be in the WAL. 
> Need to find a balances approach, that the user can potentially fine tune. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Issue Comment Deleted] (APEXMALHAR-2227) Error while connecting with Kafka using Apache Apex

2016-09-07 Thread Chandni Singh (JIRA)

 [ 
https://issues.apache.org/jira/browse/APEXMALHAR-2227?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chandni Singh updated APEXMALHAR-2227:
--
Comment: was deleted

(was: Hi,

We will need to make the fix in the Malhar 3.5.0 release branch and push it to 
maven repo. So yes you will need to download the malhar jar again from maven 
repo.

Another quick way is to get the source code by cloning the Malhar repo. Switch 
to 3.5.0 release branch. Making the fix and building it. 
The fix is quite trivial. You can see the change that needs to be done here:
https://github.com/apache/apex-malhar/pull/402

Thanks,
Chandni)

> Error while connecting with Kafka using Apache Apex
> ---
>
> Key: APEXMALHAR-2227
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2227
> Project: Apache Apex Malhar
>  Issue Type: Bug
>Affects Versions: 3.4.0
>Reporter: Rambrij Chauhan
>    Assignee: Chandni Singh
>Priority: Blocker
>
> Hi,
> I am working on Apache Apex and trying to implement KafkaInputOperator 
> provided by apex-malhar but I am hitting with following exception as soon as 
> i am running the application:-
> Strange thing is, the same application working for other zookeeper ip but not 
> for that IP i need it to work so i went and check the version of respective 
> zookeeper and found that, the one which is working belongs to 0.8.1.1 and 
> other one is 0.8.2.0. Could you kindly help me out for some work arround, i 
> really need to work on with the IP which belongs to 0.8.2.0.
> java.lang.RuntimeException: Error creating local cluster
>   at 
> com.datatorrent.stram.LocalModeImpl.getController(LocalModeImpl.java:78)
>   at 
> org.capitalone.apex.main.ApplicationTest.testSomeMethod(ApplicationTest.java:14)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
>   at 
> org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:86)
>   at 
> org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
>   at 
> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:459)
>   at 
> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:678)
>   at 
> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:382)
>   at 
> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:192)
> Caused by: java.lang.IllegalStateException: Partitioner returns null or empty.
>   at 
> com.datatorrent.stram.plan.physical.PhysicalPlan.initPartitioning(PhysicalPlan.java:755)
>   at 
> com.datatorrent.stram.plan.physical.PhysicalPlan.addLogicalOperator(PhysicalPlan.java:1676)
>   at 
> com.datatorrent.stram.plan.physical.PhysicalPlan.(PhysicalPlan.java:378)
>   at 
> com.datatorrent.stram.StreamingContainerManager.(StreamingContainerManager.java:418)
>   at 
> com.datatorrent.stram.StreamingContainerManager.(StreamingContainerManager.java:406)
>   at 
> com.datatorrent.stram.StramLocalCluster.(StramLocalCluster.java:299)
>   at 
> com.datatorrent.stram.LocalModeImpl.getController(LocalModeImpl.java:76)
>   ... 24 more



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (APEXMALHAR-2227) Error while connecting with Kafka using Apache Apex

2016-09-07 Thread Chandni Singh (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2227?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15471182#comment-15471182
 ] 

Chandni Singh commented on APEXMALHAR-2227:
---

Hi,

We will need to make the fix in the Malhar 3.5.0 release branch and push it to 
maven repo. So yes you will need to download the malhar jar again from maven 
repo.

Another quick way is to get the source code by cloning the Malhar repo. Switch 
to 3.5.0 release branch. Making the fix and building it. 
The fix is quite trivial. You can see the change that needs to be done here:
https://github.com/apache/apex-malhar/pull/402

Thanks,
Chandni

> Error while connecting with Kafka using Apache Apex
> ---
>
> Key: APEXMALHAR-2227
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2227
> Project: Apache Apex Malhar
>  Issue Type: Bug
>Affects Versions: 3.4.0
>Reporter: Rambrij Chauhan
>    Assignee: Chandni Singh
>Priority: Blocker
>
> Hi,
> I am working on Apache Apex and trying to implement KafkaInputOperator 
> provided by apex-malhar but I am hitting with following exception as soon as 
> i am running the application:-
> Strange thing is, the same application working for other zookeeper ip but not 
> for that IP i need it to work so i went and check the version of respective 
> zookeeper and found that, the one which is working belongs to 0.8.1.1 and 
> other one is 0.8.2.0. Could you kindly help me out for some work arround, i 
> really need to work on with the IP which belongs to 0.8.2.0.
> java.lang.RuntimeException: Error creating local cluster
>   at 
> com.datatorrent.stram.LocalModeImpl.getController(LocalModeImpl.java:78)
>   at 
> org.capitalone.apex.main.ApplicationTest.testSomeMethod(ApplicationTest.java:14)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
>   at 
> org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:86)
>   at 
> org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
>   at 
> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:459)
>   at 
> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:678)
>   at 
> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:382)
>   at 
> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:192)
> Caused by: java.lang.IllegalStateException: Partitioner returns null or empty.
>   at 
> com.datatorrent.stram.plan.physical.PhysicalPlan.initPartitioning(PhysicalPlan.java:755)
>   at 
> com.datatorrent.stram.plan.physical.PhysicalPlan.addLogicalOperator(PhysicalPlan.java:1676)
>   at 
> com.datatorrent.stram.plan.physical.PhysicalPlan.(PhysicalPlan.java:378)
>   at 
> com.datatorrent.stram.StreamingContainerManager.(StreamingContainerManager.java:418)
>   at 
> com.datatorrent.stram.StreamingContainerManager.(StreamingContainerManager.java:406)
>   at 
> com.datatorrent.stram.StramLocalCluster.(StramLocalCluster.java:299)
>   at 
> com.datatorrent.stram.LocalModeImpl.getController(LocalModeImpl.java:76)
>   ... 24 more



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (APEXMALHAR-2227) Error while connecting with Kafka using Apache Apex

2016-09-07 Thread Chandni Singh (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2227?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15470994#comment-15470994
 ] 

Chandni Singh commented on APEXMALHAR-2227:
---

This was caused by recent changes to WindowDataManager. Will create a pull 
request to fix it.

> Error while connecting with Kafka using Apache Apex
> ---
>
> Key: APEXMALHAR-2227
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2227
> Project: Apache Apex Malhar
>  Issue Type: Bug
>Affects Versions: 3.4.0
>Reporter: Rambrij Chauhan
>    Assignee: Chandni Singh
>Priority: Blocker
>
> Hi,
> I am working on Apache Apex and trying to implement KafkaInputOperator 
> provided by apex-malhar but I am hitting with following exception as soon as 
> i am running the application:-
> Strange thing is, the same application working for other zookeeper ip but not 
> for that IP i need it to work so i went and check the version of respective 
> zookeeper and found that, the one which is working belongs to 0.8.1.1 and 
> other one is 0.8.2.0. Could you kindly help me out for some work arround, i 
> really need to work on with the IP which belongs to 0.8.2.0.
> java.lang.RuntimeException: Error creating local cluster
>   at 
> com.datatorrent.stram.LocalModeImpl.getController(LocalModeImpl.java:78)
>   at 
> org.capitalone.apex.main.ApplicationTest.testSomeMethod(ApplicationTest.java:14)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
>   at 
> org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:86)
>   at 
> org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
>   at 
> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:459)
>   at 
> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:678)
>   at 
> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:382)
>   at 
> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:192)
> Caused by: java.lang.IllegalStateException: Partitioner returns null or empty.
>   at 
> com.datatorrent.stram.plan.physical.PhysicalPlan.initPartitioning(PhysicalPlan.java:755)
>   at 
> com.datatorrent.stram.plan.physical.PhysicalPlan.addLogicalOperator(PhysicalPlan.java:1676)
>   at 
> com.datatorrent.stram.plan.physical.PhysicalPlan.(PhysicalPlan.java:378)
>   at 
> com.datatorrent.stram.StreamingContainerManager.(StreamingContainerManager.java:418)
>   at 
> com.datatorrent.stram.StreamingContainerManager.(StreamingContainerManager.java:406)
>   at 
> com.datatorrent.stram.StramLocalCluster.(StramLocalCluster.java:299)
>   at 
> com.datatorrent.stram.LocalModeImpl.getController(LocalModeImpl.java:76)
>   ... 24 more



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (APEXMALHAR-2227) Error while connecting with Kafka using Apache Apex

2016-09-07 Thread Chandni Singh (JIRA)

 [ 
https://issues.apache.org/jira/browse/APEXMALHAR-2227?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chandni Singh reassigned APEXMALHAR-2227:
-

Assignee: Chandni Singh

> Error while connecting with Kafka using Apache Apex
> ---
>
> Key: APEXMALHAR-2227
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2227
> Project: Apache Apex Malhar
>  Issue Type: Bug
>Affects Versions: 3.4.0
>Reporter: Rambrij Chauhan
>    Assignee: Chandni Singh
>Priority: Blocker
>
> Hi,
> I am working on Apache Apex and trying to implement KafkaInputOperator 
> provided by apex-malhar but I am hitting with following exception as soon as 
> i am running the application:-
> Strange thing is, the same application working for other zookeeper ip but not 
> for that IP i need it to work so i went and check the version of respective 
> zookeeper and found that, the one which is working belongs to 0.8.1.1 and 
> other one is 0.8.2.0. Could you kindly help me out for some work arround, i 
> really need to work on with the IP which belongs to 0.8.2.0.
> java.lang.RuntimeException: Error creating local cluster
>   at 
> com.datatorrent.stram.LocalModeImpl.getController(LocalModeImpl.java:78)
>   at 
> org.capitalone.apex.main.ApplicationTest.testSomeMethod(ApplicationTest.java:14)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
>   at 
> org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:86)
>   at 
> org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
>   at 
> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:459)
>   at 
> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:678)
>   at 
> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:382)
>   at 
> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:192)
> Caused by: java.lang.IllegalStateException: Partitioner returns null or empty.
>   at 
> com.datatorrent.stram.plan.physical.PhysicalPlan.initPartitioning(PhysicalPlan.java:755)
>   at 
> com.datatorrent.stram.plan.physical.PhysicalPlan.addLogicalOperator(PhysicalPlan.java:1676)
>   at 
> com.datatorrent.stram.plan.physical.PhysicalPlan.(PhysicalPlan.java:378)
>   at 
> com.datatorrent.stram.StreamingContainerManager.(StreamingContainerManager.java:418)
>   at 
> com.datatorrent.stram.StreamingContainerManager.(StreamingContainerManager.java:406)
>   at 
> com.datatorrent.stram.StramLocalCluster.(StramLocalCluster.java:299)
>   at 
> com.datatorrent.stram.LocalModeImpl.getController(LocalModeImpl.java:76)
>   ... 24 more



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (APEXMALHAR-2223) Managed state should parallelize WAL writes

2016-09-06 Thread Chandni Singh (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2223?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15468563#comment-15468563
 ] 

Chandni Singh commented on APEXMALHAR-2223:
---

A possible approach to address this:
- Have a property in ManagedState called {code}writeBufferThreshold{code}. When 
a bucket size crosses this threshold, then the bucket is eligible for writing.
- The writing to WAL of  eligible buckets is done at the end of every 
application window() in {code}endWindow(){code} callback.

With this approach there are fewer changes where data is still divided into 
windows when written to the WAL.


> Managed state should parallelize WAL writes
> ---
>
> Key: APEXMALHAR-2223
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2223
> Project: Apache Apex Malhar
>  Issue Type: Improvement
>Affects Versions: 3.4.0
>Reporter: Thomas Weise
>    Assignee: Chandni Singh
>
> Currently, data is accumulated in memory and written to the WAL on checkpoint 
> only. This causes a write spike on checkpoint and does not utilize the HDFS 
> write pipeline. The other extreme is writing to the WAL as soon as data 
> arrives and then only flush in beforeCheckpoint. The downside of this is that 
> when the same key is written many times, all duplicates will be in the WAL. 
> Need to find a balances approach, that the user can potentially fine tune. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (APEXMALHAR-2223) Managed state should parallelize WAL writes

2016-09-02 Thread Chandni Singh (JIRA)

 [ 
https://issues.apache.org/jira/browse/APEXMALHAR-2223?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chandni Singh reassigned APEXMALHAR-2223:
-

Assignee: Chandni Singh

> Managed state should parallelize WAL writes
> ---
>
> Key: APEXMALHAR-2223
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2223
> Project: Apache Apex Malhar
>  Issue Type: Improvement
>Affects Versions: 3.4.0
>Reporter: Thomas Weise
>    Assignee: Chandni Singh
>
> Currently, data is accumulated in memory and written to the WAL on checkpoint 
> only. This causes a write spike on checkpoint and does not utilize the HDFS 
> write pipeline. The other extreme is writing to the WAL as soon as data 
> arrives and then only flush in beforeCheckpoint. The downside of this is that 
> when the same key is written many times, all duplicates will be in the WAL. 
> Need to find a balances approach, that the user can potentially fine tune. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (APEXMALHAR-2130) implement scalable windowed storage

2016-08-29 Thread Chandni Singh (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2130?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15447840#comment-15447840
 ] 

Chandni Singh edited comment on APEXMALHAR-2130 at 8/30/16 3:04 AM:


Note: The main change in ManagedState which is required here is that 
timeBuckets (Window time in your example) is now computed outside ManagedState. 
TimeBuckets were being computed by TimeBucketAssigner within ManagedState but 
now it will be provided to it.

>>>>
Since event time is arbitrary, unlike processing time, the actual key 
representing the timebucket cannot be assumed a natural sequence. However, 
TimeBucketAssigner.getTimeBucketAndAdjustBoundaries seems to return a long that 
is sequential starting from 0. We want to make the actual timebucket key based 
on the actual event window timestamp. Chandni Singh Will this break anything?

Answer: No it will not break anything. The time here is event time and this 
does NOT assume that events are received in order. Based on event time, this 
method creates timebucket. In your use case, the time bucket is computed 
outside ManagedState so there are 2 ways to approach it:
 - create a special TimeBucketAssigner which will just return the input Window 
for the event. It will not further compute timebucket.
 - make TimeBucketAssigner an optional property in AbstractManagedStateImpl. If 
it is null, then the time argument is used as timebucket save in Bucket.

>>>
Expiring and purging are done very differently and should be based on 3. 
Managed State should determine whether to purge a timebucket based on whether 
an Apex window is committed and whether all event windows that belong to that 
timebucket are marked "deleted" for that Apex window.

Answer: This is handled by TimeBucketAssigner again. I don't think much change 
is needed here. TimeBucketAssigner computes a timeBucket (in your case, this 
corresponds to Window time) and checks if the oldest buckets need to be purged 
(line 132 - 133). It figures out the lowest purgeable timebucket. In the 
endWindow, it informs IncrementalCheckpointManager, that it can delete all the 
timebuckets<=lowestPurgeableTimeBucket. However, IncrementalCheckpointManager 
deletes the data up to that timebucket only when the window in which it was 
request to be purged gets committed. So this will remain the same for you as 
well.

I think this can also by achieved by creating a special TimeBucketAssigner and 
overriding a few methods.



was (Author: csingh):
Note: The main change in ManagedState which is required here is that 
timeBuckets (Window time in your example) is now computed outside ManagedState. 
TimeBuckets were being computed by TimeBucketAssigner within ManagedState but 
now it will be provided to it.

Since event time is arbitrary, unlike processing time, the actual key 
representing the timebucket cannot be assumed a natural sequence. However, 
TimeBucketAssigner.getTimeBucketAndAdjustBoundaries seems to return a long that 
is sequential starting from 0. We want to make the actual timebucket key based 
on the actual event window timestamp. Chandni Singh Will this break anything?

Answer: No it will not break anything. The time here is event time and this 
does NOT assume that events are received in order. Based on event time, this 
method creates timebucket. In your use case, the time bucket is computed 
outside ManagedState so there are 2 ways to approach it:
 - create a special TimeBucketAssigner which will just return the input Window 
for the event. It will not further compute timebucket.
 - make TimeBucketAssigner an optional property in AbstractManagedStateImpl. If 
it is null, then the time argument is used as timebucket save in Bucket.

Expiring and purging are done very differently and should be based on 3. 
Managed State should determine whether to purge a timebucket based on whether 
an Apex window is committed and whether all event windows that belong to that 
timebucket are marked "deleted" for that Apex window.

Answer: This is handled by TimeBucketAssigner again. I don't think much change 
is needed here. TimeBucketAssigner computes a timeBucket (in your case, this 
corresponds to Window time) and checks if the oldest buckets need to be purged 
(line 132 - 133). It figures out the lowest purgeable timebucket. In the 
endWindow, it informs IncrementalCheckpointManager, that it can delete all the 
timebuckets<=lowestPurgeableTimeBucket. However, IncrementalCheckpointManager 
deletes the data up to that timebucket only when the window in which it was 
request to be purged gets committed. So this will remain the same for you as 
well.

I think this can also by achieved by creating a special TimeBucketAssigner and 
overriding a few methods.


> implement scalable windowed storage
> ---
>
>   

[jira] [Commented] (APEXMALHAR-2130) implement scalable windowed storage

2016-08-29 Thread Chandni Singh (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2130?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15447840#comment-15447840
 ] 

Chandni Singh commented on APEXMALHAR-2130:
---

Note: The main change in ManagedState which is required here is that 
timeBuckets (Window time in your example) is now computed outside ManagedState. 
TimeBuckets were being computed by TimeBucketAssigner within ManagedState but 
now it will be provided to it.

Since event time is arbitrary, unlike processing time, the actual key 
representing the timebucket cannot be assumed a natural sequence. However, 
TimeBucketAssigner.getTimeBucketAndAdjustBoundaries seems to return a long that 
is sequential starting from 0. We want to make the actual timebucket key based 
on the actual event window timestamp. Chandni Singh Will this break anything?

Answer: No it will not break anything. The time here is event time and this 
does NOT assume that events are received in order. Based on event time, this 
method creates timebucket. In your use case, the time bucket is computed 
outside ManagedState so there are 2 ways to approach it:
 - create a special TimeBucketAssigner which will just return the input Window 
for the event. It will not further compute timebucket.
 - make TimeBucketAssigner an optional property in AbstractManagedStateImpl. If 
it is null, then the time argument is used as timebucket save in Bucket.

Expiring and purging are done very differently and should be based on 3. 
Managed State should determine whether to purge a timebucket based on whether 
an Apex window is committed and whether all event windows that belong to that 
timebucket are marked "deleted" for that Apex window.

Answer: This is handled by TimeBucketAssigner again. I don't think much change 
is needed here. TimeBucketAssigner computes a timeBucket (in your case, this 
corresponds to Window time) and checks if the oldest buckets need to be purged 
(line 132 - 133). It figures out the lowest purgeable timebucket. In the 
endWindow, it informs IncrementalCheckpointManager, that it can delete all the 
timebuckets<=lowestPurgeableTimeBucket. However, IncrementalCheckpointManager 
deletes the data up to that timebucket only when the window in which it was 
request to be purged gets committed. So this will remain the same for you as 
well.

I think this can also by achieved by creating a special TimeBucketAssigner and 
overriding a few methods.


> implement scalable windowed storage
> ---
>
> Key: APEXMALHAR-2130
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2130
> Project: Apache Apex Malhar
>  Issue Type: Task
>Reporter: bright chen
>Assignee: David Yan
>
> This feature is used for supporting windowing.
> The storage needs to have the following features:
> 1. Spillable key value storage (integrate with APEXMALHAR-2026)
> 2. Upon checkpoint, it saves a snapshot for the entire data set with the 
> checkpointing window id.  This should be done incrementally (ManagedState) to 
> avoid wasting space with unchanged data
> 3. When recovering, it takes the recovery window id and restores to that 
> snapshot
> 4. When a window is committed, all windows with a lower ID should be purged 
> from the store.
> 5. It should implement the WindowedStorage and WindowedKeyedStorage 
> interfaces, and because of 2 and 3, we may want to add methods to the 
> WindowedStorage interface so that the implementation of WindowedOperator can 
> notify the storage of checkpointing, recovering and committing of a window.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (APEXCORE-448) Make operator name available in OperatorContext

2016-08-15 Thread Chandni Singh (JIRA)

 [ 
https://issues.apache.org/jira/browse/APEXCORE-448?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chandni Singh reassigned APEXCORE-448:
--

Assignee: Chandni Singh

> Make operator name available in OperatorContext
> ---
>
> Key: APEXCORE-448
> URL: https://issues.apache.org/jira/browse/APEXCORE-448
> Project: Apache Apex Core
>  Issue Type: Improvement
>    Reporter: Chandni Singh
>    Assignee: Chandni Singh
>
> Need name of the logical operator in the OperatorContext which can be used by 
> WindowDataManager to create a unique path per logical operator .



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (APEXMALHAR-2129) ManagedState: Add a disable purging option

2016-08-10 Thread Chandni Singh (JIRA)

 [ 
https://issues.apache.org/jira/browse/APEXMALHAR-2129?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chandni Singh updated APEXMALHAR-2129:
--
Description: 
Have an option that can disable purging of data.
Currently the TimeBucketAssigner moves time boundary periodically based on 
system time as well as based on event time. This is error prone and moving time 
boundary periodically implies that data will always be purged. 
The change that needs to be made to TimeBucketAssigner is to just move time 
boundary when there are future events that don't fall in that boundary. This 
will imply that if there are no events outside the current boundary then data 
will not be purged.

ManagedStateImpl uses processing time for an event, so moving boundaries based 
on system time is going to happen there nevertheless.

  was:
Have an option that can disable purging of data.
Currently the TimeBucketAssigner moves time boundary periodically based on 
system time as well as based on event time. This is error prone and moving time 
boundary periodically implies that data will always be purged. 
The change that needs to be made to TimeBucketAssigner is to just move time 
boundary when there are future events that don't fall in that boundary. This 
will imply that if there are no events outside the current boundary then data 
will not be purged.



> ManagedState: Add a disable purging option
> --
>
> Key: APEXMALHAR-2129
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2129
> Project: Apache Apex Malhar
>  Issue Type: Task
>Reporter: Bhupesh Chawda
>    Assignee: Chandni Singh
>
> Have an option that can disable purging of data.
> Currently the TimeBucketAssigner moves time boundary periodically based on 
> system time as well as based on event time. This is error prone and moving 
> time boundary periodically implies that data will always be purged. 
> The change that needs to be made to TimeBucketAssigner is to just move time 
> boundary when there are future events that don't fall in that boundary. This 
> will imply that if there are no events outside the current boundary then data 
> will not be purged.
> ManagedStateImpl uses processing time for an event, so moving boundaries 
> based on system time is going to happen there nevertheless.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (APEXMALHAR-2129) ManagedState: Add a disable purging option

2016-08-10 Thread Chandni Singh (JIRA)

 [ 
https://issues.apache.org/jira/browse/APEXMALHAR-2129?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chandni Singh updated APEXMALHAR-2129:
--
Description: 
Have an option that can disable purging of data.
Currently the TimeBucketAssigner moves time boundary periodically based on 
system time as well as based on event time. This is error prone and moving time 
boundary periodically implies that data will always be purged. 
The change that needs to be made to TimeBucketAssigner is to just move time 
boundary when there are future events that don't fall in that boundary. This 
will imply that if there are no events outside the current boundary then data 
will not be purged.


  was:Have an option that can disable purging of data.


> ManagedState: Add a disable purging option
> --
>
> Key: APEXMALHAR-2129
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2129
> Project: Apache Apex Malhar
>  Issue Type: Task
>Reporter: Bhupesh Chawda
>    Assignee: Chandni Singh
>
> Have an option that can disable purging of data.
> Currently the TimeBucketAssigner moves time boundary periodically based on 
> system time as well as based on event time. This is error prone and moving 
> time boundary periodically implies that data will always be purged. 
> The change that needs to be made to TimeBucketAssigner is to just move time 
> boundary when there are future events that don't fall in that boundary. This 
> will imply that if there are no events outside the current boundary then data 
> will not be purged.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (APEXMALHAR-2129) ManagedState: Add a disable purging option

2016-08-10 Thread Chandni Singh (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2129?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15415717#comment-15415717
 ] 

Chandni Singh edited comment on APEXMALHAR-2129 at 8/10/16 6:47 PM:


[~bhup...@apache.org]
In TimeBucketAssigner, {code}expiryTask{code} is a task that runs repeatedly to 
purge data. It is not a different feature from purging. 
However, I will remove this task. Right now purging is happening with a mix of 
event + system time which is likely to be error prone. 

It should be only triggered by events. If all the events fall between the start 
and end time, then nothing will be purged. However if we see events outside 
this boundary, then older time-buckets will be purged. 


was (Author: csingh):
[~bhup...@apache.org]
In TimeBucketAssigner, {code}expiryTask{code} is a task that runs repeatedly to 
purge data. It is not a different feature from purging. 
I will rename the task, to avoid the confusion.

When purging is disabled, then you store and dedup across the entire data set.

> ManagedState: Add a disable purging option
> --
>
> Key: APEXMALHAR-2129
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2129
> Project: Apache Apex Malhar
>  Issue Type: Task
>Reporter: Bhupesh Chawda
>    Assignee: Chandni Singh
>
> Have an option that can disable purging of data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (APEXMALHAR-2129) ManagedState: Add a disable purging option

2016-08-10 Thread Chandni Singh (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2129?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15415717#comment-15415717
 ] 

Chandni Singh commented on APEXMALHAR-2129:
---

[~bhup...@apache.org]
In TimeBucketAssigner, {code}expiryTask{code} is a task that runs repeatedly to 
purge data. It is not a different feature from purging. 
I will rename the task, to avoid the confusion.

When purging is disabled, then you store and dedup across the entire data set.

> ManagedState: Add a disable purging option
> --
>
> Key: APEXMALHAR-2129
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2129
> Project: Apache Apex Malhar
>  Issue Type: Task
>Reporter: Bhupesh Chawda
>    Assignee: Chandni Singh
>
> Have an option that can disable purging of data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (APEXMALHAR-2129) ManagedState: Add a disable purging option

2016-08-09 Thread Chandni Singh (JIRA)

 [ 
https://issues.apache.org/jira/browse/APEXMALHAR-2129?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chandni Singh reassigned APEXMALHAR-2129:
-

Assignee: Chandni Singh

> ManagedState: Add a disable purging option
> --
>
> Key: APEXMALHAR-2129
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2129
> Project: Apache Apex Malhar
>  Issue Type: Task
>Reporter: Bhupesh Chawda
>    Assignee: Chandni Singh
>
> Have an option that can disable purging of data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (APEXMALHAR-2129) ManagedState: Add a disable purging option

2016-08-03 Thread Chandni Singh (JIRA)

 [ 
https://issues.apache.org/jira/browse/APEXMALHAR-2129?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chandni Singh updated APEXMALHAR-2129:
--
Assignee: (was: Chandni Singh)

> ManagedState: Add a disable purging option
> --
>
> Key: APEXMALHAR-2129
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2129
> Project: Apache Apex Malhar
>  Issue Type: Task
>Reporter: Bhupesh Chawda
>
> Have an option that can disable purging of data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (APEXMALHAR-2071) Add the ability to remove a key to ManagedState

2016-08-03 Thread Chandni Singh (JIRA)

 [ 
https://issues.apache.org/jira/browse/APEXMALHAR-2071?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chandni Singh updated APEXMALHAR-2071:
--
Assignee: (was: Chandni Singh)

> Add the ability to remove a key to ManagedState
> ---
>
> Key: APEXMALHAR-2071
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2071
> Project: Apache Apex Malhar
>  Issue Type: New Feature
>Reporter: Timothy Farkas
>
> This is required for Spillable Data Structures



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (APEXMALHAR-2101) RuntimeException from Default Bucket which is under managed state.

2016-08-03 Thread Chandni Singh (JIRA)

 [ 
https://issues.apache.org/jira/browse/APEXMALHAR-2101?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chandni Singh updated APEXMALHAR-2101:
--
Assignee: (was: Chandni Singh)

> RuntimeException from Default Bucket which is under managed state.
> --
>
> Key: APEXMALHAR-2101
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2101
> Project: Apache Apex Malhar
>  Issue Type: Bug
>Reporter: Chaitanya
>
> Getting the following exception, while using the ManagedTimeStateImpl 
> operator:
> 2016-05-24 15:42:48,813 ERROR 
> com.datatorrent.stram.engine.StreamingContainer: Operator set 
> [OperatorDeployInfo[id=4,name=join,type=GENERIC,checkpoint={, 
> 0, 
> 0},inputs=[OperatorDeployInfo.InputDeployInfo[portName=input,streamId=GenToJoin,sourceNodeId=2,sourcePortName=outputPort,locality=,partitionMask=1,partitionKeys=[1]],
>  
> OperatorDeployInfo.InputDeployInfo[portName=product,streamId=ProductToJoin,sourceNodeId=1,sourcePortName=outputPort,locality=,partitionMask=0,partitionKeys=]],outputs=[OperatorDeployInfo.OutputDeployInfo[portName=output,streamId=JoinToConsole,bufferServer=chaitanya-HP-ENVY-15-Notebook-PC
>  stopped running due to an exception.
> java.lang.RuntimeException: while loading 0, 12
> at 
> org.apache.apex.malhar.lib.state.managed.Bucket$DefaultBucket.getValueFromTimeBucketReader(Bucket.java:346)
> at 
> org.apache.apex.malhar.lib.state.managed.Bucket$DefaultBucket.getFromReaders(Bucket.java:291)
> at 
> org.apache.apex.malhar.lib.state.managed.Bucket$DefaultBucket.get(Bucket.java:323)
> at 
> org.apache.apex.malhar.lib.state.managed.AbstractManagedStateImpl.getValueFromBucketSync(AbstractManagedStateImpl.java:288)
> at 
> org.apache.apex.malhar.lib.state.managed.ManagedTimeStateImpl.getSync(ManagedTimeStateImpl.java:56)
> at 
> com.examples.app.ManagedStateIntOperator.get(ManagedStateIntOperator.java:91)
> at 
> com.examples.app.ManagedStateIntOperator.processTuple(ManagedStateIntOperator.java:80)
> at 
> com.examples.app.ManagedStateIntOperator$2.process(ManagedStateIntOperator.java:73)
> at 
> com.examples.app.ManagedStateIntOperator$2.process(ManagedStateIntOperator.java:68)
> at com.datatorrent.api.DefaultInputPort.put(DefaultInputPort.java:79)
> at 
> com.datatorrent.stram.stream.BufferServerSubscriber$BufferReservoir.sweep(BufferServerSubscriber.java:280)
> at com.datatorrent.stram.engine.GenericNode.run(GenericNode.java:259)
> at 
> com.datatorrent.stram.engine.StreamingContainer$2.run(StreamingContainer.java:1393)
> Caused by: java.io.EOFException: Cannot seek after EOF
> at 
> org.apache.hadoop.hdfs.DFSInputStream.seek(DFSInputStream.java:1378)
> at 
> org.apache.hadoop.fs.FSDataInputStream.seek(FSDataInputStream.java:62)
> at 
> org.apache.hadoop.io.file.tfile.DTBCFile$Reader.(DTBCFile.java:674)
> at 
> org.apache.hadoop.io.file.tfile.DTFile$Reader.(DTFile.java:827)
> at 
> com.datatorrent.lib.fileaccess.DTFileReader.(DTFileReader.java:55)
> at 
> com.datatorrent.lib.fileaccess.TFileImpl$DTFileImpl.getReader(TFileImpl.java:173)
> at 
> org.apache.apex.malhar.lib.state.managed.BucketsFileSystem.getReader(BucketsFileSystem.java:96)
> at 
> org.apache.apex.malhar.lib.state.managed.Bucket$DefaultBucket.loadFileReader(Bucket.java:371)
> at 
> org.apache.apex.malhar.lib.state.managed.Bucket$DefaultBucket.getValueFromTimeBucketReader(Bucket.java:341)
> ... 12 more



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (APEXCORE-448) Make operator name available in OperatorContext

2016-08-03 Thread Chandni Singh (JIRA)

 [ 
https://issues.apache.org/jira/browse/APEXCORE-448?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chandni Singh updated APEXCORE-448:
---
Assignee: (was: Chandni Singh)

> Make operator name available in OperatorContext
> ---
>
> Key: APEXCORE-448
> URL: https://issues.apache.org/jira/browse/APEXCORE-448
> Project: Apache Apex Core
>  Issue Type: Improvement
>    Reporter: Chandni Singh
>
> Need name of the logical operator in the OperatorContext which can be used by 
> WindowDataManager to create a unique path per logical operator .



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (APEXMALHAR-2063) Integrate WAL to FS WindowDataManager

2016-08-03 Thread Chandni Singh (JIRA)

 [ 
https://issues.apache.org/jira/browse/APEXMALHAR-2063?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chandni Singh updated APEXMALHAR-2063:
--
Assignee: (was: Chandni Singh)

> Integrate WAL to FS WindowDataManager
> -
>
> Key: APEXMALHAR-2063
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2063
> Project: Apache Apex Malhar
>  Issue Type: Improvement
>    Reporter: Chandni Singh
>
> FS Window Data Manager is used to save meta-data that helps in replaying 
> tuples every completed application window after failure. For this it saves 
> meta-data in a file per window. Having multiple small size files on hdfs 
> cause issues as highlighted here:
> http://blog.cloudera.com/blog/2009/02/the-small-files-problem/
> Instead FS Window Data Manager can utilize the WAL to write data and maintain 
> a mapping of how much data was flushed to WAL each window. 
> In order to use FileSystemWAL for replaying data of a finished window, there 
> are few changes made to FileSystemWAL this is because of following:
> 1. WindowDataManager needs to reply data of every finished window. This 
> window may not be checkpointed. 
> FileSystemWAL truncates the WAL file to the checkpointed point after recovery 
> so this poses a problem. 
> WindowDataManager should be able to control recovery of FileSystemWAL.
> 2.  FileSystemWAL writes to temporary files. The mapping of temp files to 
> actual file is part of its state which is checkpointed. Since 
> WindowDataManager replays data of a window not yet checkpointed, it needs to 
> know the actual temporary file the data is being persisted to.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (APEXMALHAR-2068) FileSplitter- having multiple file splitters in an application results in incorrect behavior

2016-08-03 Thread Chandni Singh (JIRA)

 [ 
https://issues.apache.org/jira/browse/APEXMALHAR-2068?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chandni Singh updated APEXMALHAR-2068:
--
Assignee: (was: Chandni Singh)

> FileSplitter- having multiple file splitters in an application results in 
> incorrect behavior
> 
>
> Key: APEXMALHAR-2068
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2068
> Project: Apache Apex Malhar
>  Issue Type: Bug
>    Reporter: Chandni Singh
>
> If an application has multiple logical operators that use Window Data 
> Manager, then by default they share the same state. This is confusing and 
> cause issues.
> We need to make this path unique by default for each logical instance.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (APEXCORE-169) instantiating DTLoggerFactory during test causes incorrect logging behavior

2016-08-03 Thread Chandni Singh (JIRA)

 [ 
https://issues.apache.org/jira/browse/APEXCORE-169?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chandni Singh updated APEXCORE-169:
---
Assignee: (was: Chandni Singh)

> instantiating DTLoggerFactory during test causes incorrect logging behavior
> ---
>
> Key: APEXCORE-169
> URL: https://issues.apache.org/jira/browse/APEXCORE-169
> Project: Apache Apex Core
>  Issue Type: Bug
>Reporter: Vlad Rozov
>
> After DTLoggerFactory is instantiated in maven build, log levels in 
> log4j.properties are ignored.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (APEXMALHAR-2005) FileSplitterInput: `paths` field in TimeBasedDirectoryScanner is not validated unless scanner is marked as @Valid

2016-08-03 Thread Chandni Singh (JIRA)

 [ 
https://issues.apache.org/jira/browse/APEXMALHAR-2005?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chandni Singh updated APEXMALHAR-2005:
--
Assignee: (was: Chandni Singh)

> FileSplitterInput: `paths` field in TimeBasedDirectoryScanner is not 
> validated unless scanner is marked as @Valid
> -
>
> Key: APEXMALHAR-2005
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2005
> Project: Apache Apex Malhar
>  Issue Type: Bug
>    Reporter: Chandni Singh
>
> I created a DAG :
> FileSplitterInput -> BlockReader -> Devnull
> Splitter has a property TimeBasedDirectoryScanner which has a property 
> `files` that is a Set of Strings.
> I have marked that property with {code} @Size(min = 1) {code}. However even 
> when the set is empty, dag validation passes.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (APEXMALHAR-1967) JDBC Store setting connection properties from config file is broken

2016-08-03 Thread Chandni Singh (JIRA)

 [ 
https://issues.apache.org/jira/browse/APEXMALHAR-1967?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chandni Singh updated APEXMALHAR-1967:
--
Assignee: (was: Chandni Singh)

> JDBC Store setting connection properties from config file is broken
> ---
>
> Key: APEXMALHAR-1967
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-1967
> Project: Apache Apex Malhar
>  Issue Type: Bug
>    Reporter: Chandni Singh
>
> JdbcStore has connectionProperties field which is of type Properties. Setting 
> the value of this field using setters provided-
> setConnectionProperties(String ...) and 
> setConnectionProperties(Properties ...) 
> is broken.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (APEXMALHAR-2035) ManagedState use case- Add a deduper to Malhar which is backed by ManagedState

2016-08-03 Thread Chandni Singh (JIRA)

 [ 
https://issues.apache.org/jira/browse/APEXMALHAR-2035?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chandni Singh updated APEXMALHAR-2035:
--
Assignee: (was: Chandni Singh)

> ManagedState use case- Add a deduper to Malhar which is backed by ManagedState
> --
>
> Key: APEXMALHAR-2035
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2035
> Project: Apache Apex Malhar
>  Issue Type: New Feature
>    Reporter: Chandni Singh
>
> Managed State is added to Malhar which handles incremental checkpointing of 
> key/value state. This can be used to create a de-duplicator in Malhar 
> library. 
> The de-duper may receive events out of order and if the event is still 
> relevant (not older than configured time) it should find out if the event has 
> been seen before.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (APEXMALHAR-1852) File Splitter Test Failing On My Machine

2016-08-03 Thread Chandni Singh (JIRA)

 [ 
https://issues.apache.org/jira/browse/APEXMALHAR-1852?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chandni Singh updated APEXMALHAR-1852:
--
Assignee: (was: Chandni Singh)

> File Splitter Test Failing On My Machine
> 
>
> Key: APEXMALHAR-1852
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-1852
> Project: Apache Apex Malhar
>  Issue Type: Bug
>Reporter: Timothy Farkas
>
> FileSplitterInputTest.testRecoveryOfPartialFile:408 New file expected:<1> but 
> was:<0>



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (APEXMALHAR-1848) Block Reader doesn't honor value for minReaders

2016-08-03 Thread Chandni Singh (JIRA)

 [ 
https://issues.apache.org/jira/browse/APEXMALHAR-1848?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chandni Singh updated APEXMALHAR-1848:
--
Assignee: (was: Chandni Singh)

> Block Reader doesn't honor value for minReaders
> ---
>
> Key: APEXMALHAR-1848
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-1848
> Project: Apache Apex Malhar
>  Issue Type: Bug
>Reporter: gaurav gupta
>
> Block reader has a property minReaders. Operator always returns one instance 
> initially irrespective of value of minReaders. minReaders is only used during 
> the dynamic partition.
> I think operator should return instances equal to atleast minReaders.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (APEXCORE-21) Add capability to specify collection phase of AutoMetric

2016-08-03 Thread Chandni Singh (JIRA)

 [ 
https://issues.apache.org/jira/browse/APEXCORE-21?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chandni Singh updated APEXCORE-21:
--
Assignee: (was: Chandni Singh)

> Add capability to specify collection phase of AutoMetric
> 
>
> Key: APEXCORE-21
> URL: https://issues.apache.org/jira/browse/APEXCORE-21
> Project: Apache Apex Core
>  Issue Type: Sub-task
>    Reporter: Chandni Singh
>
> Along with this also add phase for the Autometric. Right off the bat there 
> are 3 phases - end of streaming window, end of application window, and custom 
> (the last one needs to be thought through) but while we are at it, we could 
> also add beginning of the application window, begin of the streaming window. 
> The work may not be done to support all those but it ensures that there is a 
> forward compatibility.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (APEXCORE-69) Improve StatsListener api so that using it with multiple logical operators is straightforward

2016-08-03 Thread Chandni Singh (JIRA)

 [ 
https://issues.apache.org/jira/browse/APEXCORE-69?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chandni Singh updated APEXCORE-69:
--
Assignee: (was: Chandni Singh)

> Improve StatsListener api so that using it with multiple logical operators is 
> straightforward
> -
>
> Key: APEXCORE-69
> URL: https://issues.apache.org/jira/browse/APEXCORE-69
> Project: Apache Apex Core
>  Issue Type: Improvement
>    Reporter: Chandni Singh
>
> We have discussed theories about using a stats listener on multiple logical 
> operators- specifically using stats from downstream operator to slow down 
> upstream operator.
> While trying to implement such a StatsListener the processStats() methods 
> becomes unwieldy which is mainly because there is no effortless way of 
> knowing of which logical operator the stats are. BatchedOperatorStats 
> contains operator id but that is assigned at the runtime (not known at the 
> time of writing code).
> In past I have used counters to figure out the logical operator but that is a 
> hack.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (APEXMALHAR-1773) Organize the plugins in Malhar lib and contrib

2016-08-03 Thread Chandni Singh (JIRA)

 [ 
https://issues.apache.org/jira/browse/APEXMALHAR-1773?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chandni Singh updated APEXMALHAR-1773:
--
Assignee: (was: Chandni Singh)

> Organize the plugins in Malhar lib and contrib
> --
>
> Key: APEXMALHAR-1773
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-1773
> Project: Apache Apex Malhar
>  Issue Type: Improvement
>    Reporter: Chandni Singh
>
> Need PluginManagement in Malhar where common plugins can be declared.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (APEXMALHAR-1600) Investigate if we can seek efficiently to the correct location when we restart after failure for ftp and fs scan operators

2016-08-03 Thread Chandni Singh (JIRA)

 [ 
https://issues.apache.org/jira/browse/APEXMALHAR-1600?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chandni Singh updated APEXMALHAR-1600:
--
Assignee: (was: Chandni Singh)

> Investigate if we can seek efficiently to the correct location when we 
> restart after failure for ftp and fs scan operators
> --
>
> Key: APEXMALHAR-1600
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-1600
> Project: Apache Apex Malhar
>  Issue Type: Improvement
>Reporter: Pramod Immaneni
>
> Currently when the fs scan operator fails and restarts it seeks to the 
> starting location by asking for the previous entries and discarding them. Can 
> this be done in a more efficient manner in a case-by-case basis, for example 
> a sub class of the operator can keep a count of the file read offset after 
> returning an entry and that can be used to seek directly to checkpointed 
> offset.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (APEXMALHAR-1701) Deduper : create a deduper backed by Managed State

2016-08-03 Thread Chandni Singh (JIRA)

 [ 
https://issues.apache.org/jira/browse/APEXMALHAR-1701?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chandni Singh updated APEXMALHAR-1701:
--
Assignee: (was: Chandni Singh)

> Deduper : create a deduper backed by Managed State
> --
>
> Key: APEXMALHAR-1701
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-1701
> Project: Apache Apex Malhar
>  Issue Type: Task
>  Components: algorithms
>    Reporter: Chandni Singh
>
> Need a de-deduplicator operator that is based on Managed State.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (APEXMALHAR-722) Investigate re-partitioning of transactional database output adaptor

2016-08-03 Thread Chandni Singh (JIRA)

 [ 
https://issues.apache.org/jira/browse/APEXMALHAR-722?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chandni Singh updated APEXMALHAR-722:
-
Assignee: (was: Chandni Singh)

> Investigate re-partitioning of transactional database output adaptor
> 
>
> Key: APEXMALHAR-722
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-722
> Project: Apache Apex Malhar
>  Issue Type: Improvement
>  Components: adapters database
>    Reporter: Chandni Singh
>
> Currently the transactional database output adaptors use operator id to 
> persist last committed window. With this approach the operators cannot be 
> re-partitioned. 
> We need to find a way to make these operators dynamically partition-able.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (APEXMALHAR-1843) Split Malhar Library and Malhar Contrib package into baby packages

2016-08-03 Thread Chandni Singh (JIRA)

 [ 
https://issues.apache.org/jira/browse/APEXMALHAR-1843?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chandni Singh updated APEXMALHAR-1843:
--
Assignee: (was: Chandni Singh)

> Split Malhar Library and Malhar Contrib package into baby packages
> --
>
> Key: APEXMALHAR-1843
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-1843
> Project: Apache Apex Malhar
>  Issue Type: Improvement
>Reporter: Chetan Narsude
>Priority: Critical
>  Labels: roadmap
>
> [~andyp] I am assigning this to you cause you are the one who first said it. 
> So either you lead it or find a willing lead to get this task to completion.
> The problem with contrib and library modules of malhar is that a ton of 
> dependencies are prescribed as optional. The motive behind it was that the 
> users of these libraries are given an opportunity to keep the size of the 
> dependency-included packages to bare minimum. It  comes at a cost that the 
> dependency now has to be manually figured out. This is a complete misuse of 
> the optional dependency, IMO. It defeats the purpose of maven having 
> dependency management as one of the biggest features of it.
> So keep things sane - the proposed compromise is that we start creating 
> smaller discreet packages for discrete technologies.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (APEXMALHAR-2044) Exception in AbstractFileOutputOperator during write.

2016-08-01 Thread Chandni Singh (JIRA)

 [ 
https://issues.apache.org/jira/browse/APEXMALHAR-2044?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chandni Singh updated APEXMALHAR-2044:
--
Assignee: (was: Chandni Singh)

> Exception in AbstractFileOutputOperator during write.
> -
>
> Key: APEXMALHAR-2044
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2044
> Project: Apache Apex Malhar
>  Issue Type: Bug
>Reporter: Tushar Gosavi
>
> Sometimes container gets killed because of following error, this happens 
> during flush or write.
> Stopped running due to an exception. java.lang.RuntimeException: 
> java.nio.channels.ClosedChannelException
> at 
> com.datatorrent.lib.io.fs.AbstractFileOutputOperator.processTuple(AbstractFileOutputOperator.java:823)
> at 
> com.directv.sms.common.CsvOutputOperator.processTuple(CsvOutputOperator.java:70)
> at 
> com.datatorrent.lib.io.fs.AbstractFileOutputOperator$1.process(AbstractFileOutputOperator.java:271)
> at 
> com.datatorrent.api.DefaultInputPort.put(DefaultInputPort.java:70)
> at 
> com.datatorrent.stram.engine.DefaultReservoir.sweep(DefaultReservoir.java:64)
> at 
> com.datatorrent.stram.engine.GenericNode.run(GenericNode.java:229)
> at 
> com.datatorrent.stram.engine.StreamingContainer$2.run(StreamingContainer.java:1386)
> Caused by: java.nio.channels.ClosedChannelException
> at 
> org.apache.hadoop.hdfs.DFSOutputStream.checkClosed(DFSOutputStream.java:1635)
> at 
> org.apache.hadoop.fs.FSOutputSummer.write(FSOutputSummer.java:104)
> at 
> org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:58)
> at java.io.DataOutputStream.write(DataOutputStream.java:107)
> at 
> java.io.FilterOutputStream.write(FilterOutputStream.java:97)
> at 
> com.datatorrent.lib.io.fs.AbstractFileOutputOperator.processTuple(AbstractFileOutputOperator.java:79



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (APEXMALHAR-2136) Null pointer exception in AbstractManagedStateImpl

2016-07-07 Thread Chandni Singh (JIRA)

 [ 
https://issues.apache.org/jira/browse/APEXMALHAR-2136?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chandni Singh resolved APEXMALHAR-2136.
---
   Resolution: Fixed
Fix Version/s: 3.5.0

> Null pointer exception in AbstractManagedStateImpl
> --
>
> Key: APEXMALHAR-2136
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2136
> Project: Apache Apex Malhar
>  Issue Type: Bug
>Reporter: Chaitanya
>Assignee: Chaitanya
> Fix For: 3.5.0
>
>
> Null pointed exception in AbstractManagedStateImpl during recovering the 
> state:
> java.lang.NullPointerException
> at 
> org.apache.apex.malhar.lib.state.managed.AbstractManagedStateImpl.setup(AbstractManagedStateImpl.java:206)
> at 
> com.examples.app.ManagedTimeStateMImpl.setup(ManagedTimeStateMImpl.java:15)
> at 
> com.examples.app.ManagedStateIntOperator.setup(ManagedStateIntOperator.java:138)
> at 
> com.examples.app.ManagedStateIntOperator.setup(ManagedStateIntOperator.java:26)
> at com.datatorrent.stram.engine.Node.setup(Node.java:187)
> at 
> com.datatorrent.stram.engine.StreamingContainer.setupNode(StreamingContainer.java:1309)
> at 
> com.datatorrent.stram.engine.StreamingContainer.access$100(StreamingContainer.java:130)
> at 
> com.datatorrent.stram.engine.StreamingContainer$2.run(StreamingContainer.java:1388)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (APEXMALHAR-2129) ManagedState: Add a disable purging option

2016-06-29 Thread Chandni Singh (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2129?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15355507#comment-15355507
 ] 

Chandni Singh commented on APEXMALHAR-2129:
---

[~bhupesh] I re-opened the ticket and will work on it. We had discussed this 
earlier but can't find the Jira that addresses it. 

> ManagedState: Add a disable purging option
> --
>
> Key: APEXMALHAR-2129
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2129
> Project: Apache Apex Malhar
>  Issue Type: Task
>Reporter: Bhupesh Chawda
>    Assignee: Chandni Singh
>
> Have an option that can disable purging of data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (APEXMALHAR-2129) ManagedState: Add a disable purging option

2016-06-29 Thread Chandni Singh (JIRA)

 [ 
https://issues.apache.org/jira/browse/APEXMALHAR-2129?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chandni Singh updated APEXMALHAR-2129:
--
Description: Have an option that can disable purging of data.  (was: 
TimeBucketAssigner advances the time boundaries of the buckets viz. start and 
end to the current system time every window.
The requirement is to add an option so that clients can disable this if needed. 
Tuple time based deduplication has such a requirement.)

> ManagedState: Add a disable purging option
> --
>
> Key: APEXMALHAR-2129
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2129
> Project: Apache Apex Malhar
>  Issue Type: Task
>Reporter: Bhupesh Chawda
>    Assignee: Chandni Singh
>
> Have an option that can disable purging of data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (APEXMALHAR-2129) ManagedState: Add a disable purging option

2016-06-29 Thread Chandni Singh (JIRA)

 [ 
https://issues.apache.org/jira/browse/APEXMALHAR-2129?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chandni Singh updated APEXMALHAR-2129:
--
Priority: Major  (was: Minor)

> ManagedState: Add a disable purging option
> --
>
> Key: APEXMALHAR-2129
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2129
> Project: Apache Apex Malhar
>  Issue Type: Task
>Reporter: Bhupesh Chawda
>    Assignee: Chandni Singh
>
> TimeBucketAssigner advances the time boundaries of the buckets viz. start and 
> end to the current system time every window.
> The requirement is to add an option so that clients can disable this if 
> needed. Tuple time based deduplication has such a requirement.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (APEXMALHAR-2129) ManagedState: Add a disable purging option

2016-06-29 Thread Chandni Singh (JIRA)

 [ 
https://issues.apache.org/jira/browse/APEXMALHAR-2129?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chandni Singh updated APEXMALHAR-2129:
--
Summary: ManagedState: Add a disable purging option  (was: Introduce option 
to advance time through Expiry task in TimeBucketAssigner)

> ManagedState: Add a disable purging option
> --
>
> Key: APEXMALHAR-2129
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2129
> Project: Apache Apex Malhar
>  Issue Type: Task
>Reporter: Bhupesh Chawda
>Assignee: Bhupesh Chawda
>Priority: Minor
>
> TimeBucketAssigner advances the time boundaries of the buckets viz. start and 
> end to the current system time every window.
> The requirement is to add an option so that clients can disable this if 
> needed. Tuple time based deduplication has such a requirement.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Reopened] (APEXMALHAR-2129) ManagedState: Add a disable purging option

2016-06-29 Thread Chandni Singh (JIRA)

 [ 
https://issues.apache.org/jira/browse/APEXMALHAR-2129?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chandni Singh reopened APEXMALHAR-2129:
---
  Assignee: Chandni Singh  (was: Bhupesh Chawda)

> ManagedState: Add a disable purging option
> --
>
> Key: APEXMALHAR-2129
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2129
> Project: Apache Apex Malhar
>  Issue Type: Task
>Reporter: Bhupesh Chawda
>    Assignee: Chandni Singh
>Priority: Minor
>
> TimeBucketAssigner advances the time boundaries of the buckets viz. start and 
> end to the current system time every window.
> The requirement is to add an option so that clients can disable this if 
> needed. Tuple time based deduplication has such a requirement.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (APEXMALHAR-2129) Introduce option to advance time through Expiry task in TimeBucketAssigner

2016-06-29 Thread Chandni Singh (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2129?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15355324#comment-15355324
 ] 

Chandni Singh commented on APEXMALHAR-2129:
---

I am not disagreeing with your use case. ManagedState is there for managing 
large amount data and that data when it is written on disk is timebucketed 
especially to enable purging of it easily. 
What you need is disabling purge which has been already discussed and I believe 
there is a ticket for it. 

> Introduce option to advance time through Expiry task in TimeBucketAssigner
> --
>
> Key: APEXMALHAR-2129
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2129
> Project: Apache Apex Malhar
>  Issue Type: Task
>Reporter: Bhupesh Chawda
>Assignee: Bhupesh Chawda
>Priority: Minor
>
> TimeBucketAssigner advances the time boundaries of the buckets viz. start and 
> end to the current system time every window.
> The requirement is to add an option so that clients can disable this if 
> needed. Tuple time based deduplication has such a requirement.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (APEXMALHAR-2129) Introduce option to advance time through Expiry task in TimeBucketAssigner

2016-06-29 Thread Chandni Singh (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2129?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15354658#comment-15354658
 ] 

Chandni Singh commented on APEXMALHAR-2129:
---

Please explain the requirement and why is it needed?  This helps with purging 
of data.  How will you purge data? 

> Introduce option to advance time through Expiry task in TimeBucketAssigner
> --
>
> Key: APEXMALHAR-2129
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2129
> Project: Apache Apex Malhar
>  Issue Type: Task
>Reporter: Bhupesh Chawda
>Assignee: Bhupesh Chawda
>Priority: Minor
>
> TimeBucketAssigner advances the time boundaries of the buckets viz. start and 
> end to the current system time every window.
> The requirement is to add an option so that clients can disable this if 
> needed. Tuple time based deduplication has such a requirement.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: APEXCORE-408 : Ability to schedule Sub-DAG from running application

2016-06-21 Thread Chandni Singh
Hi,
IMO scheduling a job can be independent of any operator while
StatsListeners are not.  I understand that in a lot of cases input/output
operators will decide when the job ends but there can be cases when
scheduling can be independent of it.

Thanks,
Chandni
On Jun 21, 2016 12:12 PM, "Thomas Weise"  wrote:

> This looks like something that coordination wise belongs into the master
> and can be done with a shared stats listener.
>
> The operator request/response protocol could be used the relay the data for
> the scheduling decisions.
>
> Thomas
>
>
> On Tue, Jun 21, 2016 at 11:38 AM, Singh, Chandni <
> chandni.si...@capitalone.com> wrote:
>
> > Hi Tushar,
> >
> > I have some questions about the use case 2: Batch Support
> > I don¹t understand the advantages of providing batch support by having an
> > operator as a scheduler.
> >
> > An approach that seemed a little more straightforward to me was to expose
> > an API for scheduler. If there is a scheduler set then the master uses
> and
> > schedules operators. By default there isn¹t any scheduler and the job is
> > run as it is now.
> >
> > Maybe this is too simplistic but can you please let me know why having an
> > operator as a scheduler is a better way?
> >
> > Thanks,
> > Chandni
> >
> >
> > On 6/21/16, 11:09 AM, "Tushar Gosavi"  wrote:
> >
> > >Hi All,
> > >
> > >We have seen few use cases in field which require Apex application
> > >scheduling based on some condition. This has also came up as part of
> > >Batch Support in Apex previously
> > >(
> >
> http://mail-archives.apache.org/mod_mbox/apex-dev/201602.mbox/%3CCAKJfLDP
> > >XNsG1kEQs4T_2e%3DweRJLs4VeL%2B1-7LxO_bjovD9%2B-Rw%40mail.gmail.com%3E)
> > >. I am proposing following functionality in Apex to help scheduling
> > >and better resource utilization for batch jobs. Please provide your
> > >comments.
> > >
> > >Usecase 1 - Dynamic Dag modification.
> > >
> > >Each operator in DAG consumes yarn resources, sometimes it is
> > >desirable to return the resources to yarn when no data is available
> > >for processing, and deploy whole DAG once data starts to appear. For
> > >this to happen automatically, we will need some data monitoring
> > >operators running in the DAG to trigger restart and shutdown of the
> > >operators in the DAG.
> > >
> > >Apex already have such api to dynamically change the running dag
> > >through cli. We could provide similar API available to operators which
> > >will trigger dag modification at runtime. This information can be
> > >passed to master using heartbeat RPC and master will make
> > >required changed to the DAG. let me know what do you think about it..
> > >something like below.
> > >Context.beginDagChange();
> > >context.addOperator("o1") <== launch operator from previous
> check-pointed
> > >state.
> > >context.addOperator("o2", new Operator2()) <== create new operator
> > >context.addStream("s1", "reader.output", "o1.input");
> > >context.shutdown("o3"); <== delete this and downstream operators from
> the
> > >DAG.
> > >context.apply();  <== dag changes will be send to master, and master
> > >will apply these changes.
> > >
> > >Similarly API for other functionalities such as locality settings
> > >needs to be provided.
> > >
> > >
> > >Usecase 2 - Classic Batch Scheduling.
> > >
> > >Provide an API callable from operator to launch a DAG. The operator
> > >will prepare an dag object and submit it to the yarn, the DAG will be
> > >scheduled as a new application. This way complex schedulers can be
> > >written as operators.
> > >
> > >public SchedulerOperator implements Operator {
> > >   void handleIdleTime() {
> > >  // check of conditions to start a job (for example enough files
> > >available, enough items are available in kafa, or time has reached
> > > Dag dag = context.createDAG();
> > > dag.addOperator();
> > > dag.addOperator();
> > > LaunchOptions lOptions = new LaunchOptions();
> > > lOptions.oldId = ""; // start for this checkpoint.
> > > DagHandler dagHandler = context.submit(dag, lOptions);
> > >   }
> > >}
> > >
> > >DagHandler will have methods to monitor the final state of
> > >application, or to kill the DAG
> > >dagHandler.waitForCompletion() <== wait till the DAG terminates
> > >dagHandler.status()  <== get the status of application.
> > >dagHandler.kill() <== kill the running application.
> > >dagHandler.shutdown() <== shutdown the application.
> > >
> > >The more complex Scheduler operators could be written to manage the
> > >workflows, i.e DAG of DAGs. using these APIs.
> > >
> > >Regards,
> > >-Tushar.
> >
> > 
> >
> > The information contained in this e-mail is confidential and/or
> > proprietary to Capital One and/or its affiliates and may only be used
> > solely in performance of work or services for Capital One. The
> information
> > transmitted herewith is intended only for use by the individual or 

Re: Parquet Writer Operator

2016-06-16 Thread Chandni Singh
Dev,

The FileSystemWalWritter closes the temporary as soon as it gets rotated.
It renames (finalizes) the temporary file to the actual file until the
window is committed. The mapping of temporary file to actual file is
present in the checkpointed state.

The FileSystemWalReader reads from the temporary file so maybe you can use
that  to read the wal.

Chandni



On Thu, Jun 16, 2016 at 9:55 PM, Devendra Tagare 
wrote:

> Hi,
>
> WAL based approach :
>
> The FileSystemWAL.FileSystemWALWriter closes a temporary file only after
> the window is committed.We cannot read any such files till this point.
> Once this file is committed, in the same committed callback the
> ParquetOutputOperator will have to read the committed files, convert the
> spooled records from the WAL to parquet and then write the parquet file.
> A file can only be deleted from the WAL after it has been successfully
> written as a parquet file.
>
> Small files problem : to handle this with a WAL based approach, we will
> have to read files from the WAL till the parquet block size is reached.This
> will mean the WAL reader is could end up polling files for windows before
> the highest committed window since the block size may not have been reached
> in the committed callback for a given window.
>
> Fault tolerance : if the parquet writes to the file system fail then the
> operator will go down.In this case we will have to add a retry logic to
> read the files from WAL for the windows which failed.
>
> Please let me know if I am missing something in using the WAL and also if
> using a 2 operator solution would be better suited in this case.
>
> Thanks,
> Dev
>
>
> On Wed, Jun 15, 2016 at 5:02 PM, Thomas Weise 
> wrote:
>
> > Hi Dev,
> >
> > Can you not use the existing WAL implementation (via WindowDataManager or
> > directly)?
> >
> > Thomas
> >
> >
> > On Wed, Jun 15, 2016 at 3:47 PM, Devendra Tagare <
> > devend...@datatorrent.com>
> > wrote:
> >
> > > Hi,
> > >
> > > Initial thoughts were to go for a WAL based approach where the operator
> > > would first write POJO's to the WAL and then a separate thread would do
> > the
> > > task of reading from the WAL and writing the destination files based on
> > the
> > > block size.
> > >
> > > There is a ticket open for a pluggable spooling implementation with
> > output
> > > operators which can be leveraged for this,
> > > https://issues.apache.org/jira/browse/APEXMALHAR-2037
> > >
> > > Since work is already being done on that front, we can plug in the
> > spooler
> > > with the existing implementation of the ParquetFileWriter at that point
> > and
> > > remove the first operator - ParquetFileOutputOperator.
> > >
> > > Thanks,
> > > Dev
> > >
> > > On Tue, Jun 14, 2016 at 7:21 PM, Thomas Weise 
> > > wrote:
> > >
> > > > What's the reason for not considering the WAL based approach?
> > > >
> > > > What are the pros and cons?
> > > >
> > > >
> > > > On Tue, Jun 14, 2016 at 6:54 PM, Devendra Tagare <
> > > > devend...@datatorrent.com>
> > > > wrote:
> > > >
> > > > > Hi All,
> > > > >
> > > > > We can focus on the below 2 problems,
> > > > > 1.Avoid the small files problem which could arise due a flush at
> > every
> > > > > endWindow, since there wouldn't be significant data in a window.
> > > > > 2.Fault Tolerance.
> > > > >
> > > > > *Proposal* : Create a module in which there are 2 operators,
> > > > >
> > > > > *Operator 1 : ParquetFileOutputOperator*
> > > > > This operator will be an implementation of the
> > > > AbstractFileOutputOperator.
> > > > > It will write data to a HDFS location and leverage the
> > fault-tolerance
> > > > > semantics of the AbstractFileOutputOperator.
> > > > >
> > > > > This operator will implement the CheckpointNotificationListener and
> > > will
> > > > > emit the finalizedFiles from the beforeCheckpoint method.
> > > > > Map
> > > > >
> > > > > *Operator 2 : ParquetFileWriter*
> > > > > This operator will receive a Set
> from
> > > the
> > > > > ParquetFileOutputOperator on its input port.
> > > > > Once it receives this map, it will do the below things,
> > > > >
> > > > > 1.Save the input received to a Map
> > > > inputFilesMap
> > > > >
> > > > > 2.Instantiate a new ParquetWriter
> > > > >   2.a. Get a unique file name.
> > > > >   2.b. Add a configurable writer that extends the ParquetWriter and
> > > > include
> > > > > a write support for writing various supported formats like
> > Avro,thrift
> > > > etc.
> > > > >
> > > > > 3.For each file from the inputFilesMap,
> > > > >   3.a Read the file and write the record using the writer created
> in
> > > (2)
> > > > >   3.b Check if the block size (configurable) is reached.If yes then
> > > close
> > > > > the file and add its entry to a
> > > > > MapcompletedFilesMap.Remove the entry from
> > > > > inputFilesMap.
> > > > > If the writes fail then the files can be 

[jira] [Commented] (APEXMALHAR-2063) Integrate WAL to FS WindowDataManager

2016-06-09 Thread Chandni Singh (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15323383#comment-15323383
 ] 

Chandni Singh commented on APEXMALHAR-2063:
---

[~sandesh]
Window Data Manager purges data once a window is committed this is because the 
data of that window may be needed.

We can't delete any window until it is committed. 

I don't see in your pull request where this is required.

> Integrate WAL to FS WindowDataManager
> -
>
> Key: APEXMALHAR-2063
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2063
> Project: Apache Apex Malhar
>  Issue Type: Improvement
>    Reporter: Chandni Singh
>    Assignee: Chandni Singh
>
> FS Window Data Manager is used to save meta-data that helps in replaying 
> tuples every completed application window after failure. For this it saves 
> meta-data in a file per window. Having multiple small size files on hdfs 
> cause issues as highlighted here:
> http://blog.cloudera.com/blog/2009/02/the-small-files-problem/
> Instead FS Window Data Manager can utilize the WAL to write data and maintain 
> a mapping of how much data was flushed to WAL each window. 
> In order to use FileSystemWAL for replaying data of a finished window, there 
> are few changes made to FileSystemWAL this is because of following:
> 1. WindowDataManager needs to reply data of every finished window. This 
> window may not be checkpointed. 
> FileSystemWAL truncates the WAL file to the checkpointed point after recovery 
> so this poses a problem. 
> WindowDataManager should be able to control recovery of FileSystemWAL.
> 2.  FileSystemWAL writes to temporary files. The mapping of temp files to 
> actual file is part of its state which is checkpointed. Since 
> WindowDataManager replays data of a window not yet checkpointed, it needs to 
> know the actual temporary file the data is being persisted to.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (APEXMALHAR-2063) Integrate WAL to FS WindowDataManager

2016-06-08 Thread Chandni Singh (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15321102#comment-15321102
 ] 

Chandni Singh commented on APEXMALHAR-2063:
---

[~sandesh]
WindowDataManager is used for idempotency. The data of windows get delete when 
it gets committed. This is why we have this method deleteUpTo(...)

Can you tell me when you will use delete windows strictly less than a 
particularly window?

> Integrate WAL to FS WindowDataManager
> -
>
> Key: APEXMALHAR-2063
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2063
> Project: Apache Apex Malhar
>  Issue Type: Improvement
>    Reporter: Chandni Singh
>    Assignee: Chandni Singh
>
> FS Window Data Manager is used to save meta-data that helps in replaying 
> tuples every completed application window after failure. For this it saves 
> meta-data in a file per window. Having multiple small size files on hdfs 
> cause issues as highlighted here:
> http://blog.cloudera.com/blog/2009/02/the-small-files-problem/
> Instead FS Window Data Manager can utilize the WAL to write data and maintain 
> a mapping of how much data was flushed to WAL each window. 
> In order to use FileSystemWAL for replaying data of a finished window, there 
> are few changes made to FileSystemWAL this is because of following:
> 1. WindowDataManager needs to reply data of every finished window. This 
> window may not be checkpointed. 
> FileSystemWAL truncates the WAL file to the checkpointed point after recovery 
> so this poses a problem. 
> WindowDataManager should be able to control recovery of FileSystemWAL.
> 2.  FileSystemWAL writes to temporary files. The mapping of temp files to 
> actual file is part of its state which is checkpointed. Since 
> WindowDataManager replays data of a window not yet checkpointed, it needs to 
> know the actual temporary file the data is being persisted to.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (APEXMALHAR-2063) Integrate WAL to FS WindowDataManager

2016-06-08 Thread Chandni Singh (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15320164#comment-15320164
 ] 

Chandni Singh edited comment on APEXMALHAR-2063 at 6/8/16 3:39 PM:
---

Since we are deprecating these methods, I think it will be good to deprecate 
the following as
1. {code}save(Object object, int operatorId, long windowId){code}
operatorId is redundant for WindowDataManager. WindowDataManager extends 
{code}Component {code} and knows the operator id in 
setup() for which it is saving the data. Also an instance of WindowDataManager 
can write data for only the enclosing partition and not any other partition.
Add {code}save(Object object, long windowId){code} instead

2. {code} load(int operatorId, long windowId)  and load(long windowId) {code} 
In the use cases (operators), the data is loaded for the enclosing partition or 
all the partitions so this could be changed as :
{code}Object retrieve(long windowId) and Map<Integer, Object> 
retrieveAllPartitions(long windowId){code}

3.  {code} delete(int operatorId, long windowId) {code}
Same as the save(...), operatorId is redundant.
{code}delete(long windowId){code}

Even though WindowDataManager is @Evolving, we can mark these as deprecated and 
add the new ones. 


was (Author: csingh):
Since we are deprecating these methods, I think it will be good to deprecate 
the following as
1. {code}save(Object object, int operatorId, long windowId){code}
operatorId is redundant for WindowDataManager. WindowDataManager extends 
{code}Component {code} and knows the operator id in 
setup() for which it is saving the data. Also an instance of WindowDataManager 
can write data for only the enclosing partition and not any other partition.
Add {code}save(Object object, long windowId){code} instead

2. {code} load(int operatorId, long windowId)  and load(long windowId) {code} 
In the use cases (operators), the data is loaded for the enclosing partition or 
all the partitions so this could be changed as :
{code}Object fetch(long windowId) and Map<Integer, Object> 
fetchAllPartitions(long windowId){code}

3.  {code} delete(int operatorId, long windowId) {code}
Same as the save(...), operatorId is redundant.
{code}delete(long windowId){code}

Even though WindowDataManager is @Evolving, we can mark these as deprecated and 
add the new ones. 

> Integrate WAL to FS WindowDataManager
> -
>
> Key: APEXMALHAR-2063
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2063
> Project: Apache Apex Malhar
>  Issue Type: Improvement
>Reporter: Chandni Singh
>Assignee: Chandni Singh
>
> FS Window Data Manager is used to save meta-data that helps in replaying 
> tuples every completed application window after failure. For this it saves 
> meta-data in a file per window. Having multiple small size files on hdfs 
> cause issues as highlighted here:
> http://blog.cloudera.com/blog/2009/02/the-small-files-problem/
> Instead FS Window Data Manager can utilize the WAL to write data and maintain 
> a mapping of how much data was flushed to WAL each window. 
> In order to use FileSystemWAL for replaying data of a finished window, there 
> are few changes made to FileSystemWAL this is because of following:
> 1. WindowDataManager needs to reply data of every finished window. This 
> window may not be checkpointed. 
> FileSystemWAL truncates the WAL file to the checkpointed point after recovery 
> so this poses a problem. 
> WindowDataManager should be able to control recovery of FileSystemWAL.
> 2.  FileSystemWAL writes to temporary files. The mapping of temp files to 
> actual file is part of its state which is checkpointed. Since 
> WindowDataManager replays data of a window not yet checkpointed, it needs to 
> know the actual temporary file the data is being persisted to.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (APEXMALHAR-2063) Integrate WAL to FS WindowDataManager

2016-06-08 Thread Chandni Singh (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15320164#comment-15320164
 ] 

Chandni Singh edited comment on APEXMALHAR-2063 at 6/8/16 7:31 AM:
---

Since we are deprecating these methods, I think it will be good to deprecate 
the following as
1. {code}save(Object object, int operatorId, long windowId){code}
operatorId is redundant for WindowDataManager. WindowDataManager extends 
{code}Component {code} and knows the operator id in 
setup() for which it is saving the data. Also an instance of WindowDataManager 
can write data for only the enclosing partition and not any other partition.
Add {code}save(Object object, long windowId){code} instead

2. {code} load(int operatorId, long windowId)  and load(long windowId) {code} 
In the use cases (operators), the data is loaded for the enclosing partition or 
all the partitions so this could be changed as :
{code}Object fetch(long windowId) and Map<Integer, Object> 
fetchAllPartitions(long windowId){code}

3.  {code} delete(int operatorId, long windowId) {code}
Same as the save(...), operatorId is redundant.
{code}delete(long windowId){code}

Even though WindowDataManager is @Evolving, we can mark these as deprecated and 
add the new ones. 


was (Author: csingh):
Since we are deprecating these methods, I think it will be good to deprecate 
the following as
1. {code}save(Object object, int operatorId, long windowId){code}
operatorId is redundant for WindowDataManager. WindowDataManager extends 
{code}Component {code} and knows the operator id in 
setup() for which it is saving the data. Also an instance of WindowDataManager 
can write data for only the enclosing partition and not any other partition.
Add {code}save(Object object, long windowId){code} instead

2. {code} load(int operatorId, long windowId)  and load(long windowId) {code} 
In the use cases (operators), the data is loaded for the enclosing partition or 
all the partitions so this could be changed as :
{code}Object load(long windowId) and Map<Integer, Object> 
loadAllPartitions(long windowId){code}

3.  {code} delete(int operatorId, long windowId) {code}
Same as the save(...), operatorId is redundant.
{code}delete(long windowId){code}

Even though WindowDataManager is @Evolving, we can mark these as deprecated and 
add the new ones. 

> Integrate WAL to FS WindowDataManager
> -
>
> Key: APEXMALHAR-2063
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2063
> Project: Apache Apex Malhar
>  Issue Type: Improvement
>Reporter: Chandni Singh
>Assignee: Chandni Singh
>
> FS Window Data Manager is used to save meta-data that helps in replaying 
> tuples every completed application window after failure. For this it saves 
> meta-data in a file per window. Having multiple small size files on hdfs 
> cause issues as highlighted here:
> http://blog.cloudera.com/blog/2009/02/the-small-files-problem/
> Instead FS Window Data Manager can utilize the WAL to write data and maintain 
> a mapping of how much data was flushed to WAL each window. 
> In order to use FileSystemWAL for replaying data of a finished window, there 
> are few changes made to FileSystemWAL this is because of following:
> 1. WindowDataManager needs to reply data of every finished window. This 
> window may not be checkpointed. 
> FileSystemWAL truncates the WAL file to the checkpointed point after recovery 
> so this poses a problem. 
> WindowDataManager should be able to control recovery of FileSystemWAL.
> 2.  FileSystemWAL writes to temporary files. The mapping of temp files to 
> actual file is part of its state which is checkpointed. Since 
> WindowDataManager replays data of a window not yet checkpointed, it needs to 
> know the actual temporary file the data is being persisted to.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (APEXMALHAR-2063) Integrate WAL to FS WindowDataManager

2016-06-08 Thread Chandni Singh (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15320164#comment-15320164
 ] 

Chandni Singh edited comment on APEXMALHAR-2063 at 6/8/16 7:22 AM:
---

Since we are deprecating these methods, I think it will be good to deprecate 
the following as
1. {code}save(Object object, int operatorId, long windowId){code}
operatorId is redundant for WindowDataManager. WindowDataManager extends 
{code}Component {code} and knows the operator id in 
setup() for which it is saving the data. Also an instance of WindowDataManager 
can write data for only the enclosing partition and not any other partition.
Add {code}save(Object object, long windowId){code} instead

2. {code} load(int operatorId, long windowId)  and load(long windowId) {code} 
In the use cases (operators), the data is loaded for the enclosing partition or 
all the partitions so this could be changed as :
{code}Object load(long windowId) and Map<Integer, Object> 
loadAllPartitions(long windowId){code}

3.  {code} delete(int operatorId, long windowId) {code}
Same as the save(...), operatorId is redundant.
{code}delete(long windowId){code}

Even though WindowDataManager is @Evolving, we can mark these as deprecated and 
add the new ones. 


was (Author: csingh):
Since we are deprecating these methods, I think it will be good to deprecate 
the following as
1. {code}save(Object object, int operatorId, long windowId){code}
operatorId is redundant for WindowDataManager. WindowDataManager extends 
{code}Component {code} and knows the operator id in 
setup() for which it is saving the data. Also an instance of WindowDataManager 
can write data for only the enclosing partition and not any other partition.
Add {code}save(Object object, long windowId){code} instead

2. {code} load(int operatorId, long windowId)  and load(long windowId) {code} 
In the use cases (operators), the data is loaded for the enclosing partition or 
all the partitions so this could be changed as :
{code}load(long windowId) and loadAll(long windowId){code}

3.  {code} delete(int operatorId, long windowId) {code}
Same as the save(...), operatorId is redundant.
{code}delete(long windowId){code}

Even though WindowDataManager is @Evolving, we can mark these as deprecated and 
add the new ones. 

> Integrate WAL to FS WindowDataManager
> -
>
> Key: APEXMALHAR-2063
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2063
> Project: Apache Apex Malhar
>  Issue Type: Improvement
>    Reporter: Chandni Singh
>    Assignee: Chandni Singh
>
> FS Window Data Manager is used to save meta-data that helps in replaying 
> tuples every completed application window after failure. For this it saves 
> meta-data in a file per window. Having multiple small size files on hdfs 
> cause issues as highlighted here:
> http://blog.cloudera.com/blog/2009/02/the-small-files-problem/
> Instead FS Window Data Manager can utilize the WAL to write data and maintain 
> a mapping of how much data was flushed to WAL each window. 
> In order to use FileSystemWAL for replaying data of a finished window, there 
> are few changes made to FileSystemWAL this is because of following:
> 1. WindowDataManager needs to reply data of every finished window. This 
> window may not be checkpointed. 
> FileSystemWAL truncates the WAL file to the checkpointed point after recovery 
> so this poses a problem. 
> WindowDataManager should be able to control recovery of FileSystemWAL.
> 2.  FileSystemWAL writes to temporary files. The mapping of temp files to 
> actual file is part of its state which is checkpointed. Since 
> WindowDataManager replays data of a window not yet checkpointed, it needs to 
> know the actual temporary file the data is being persisted to.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (APEXMALHAR-2063) Integrate WAL to FS WindowDataManager

2016-06-08 Thread Chandni Singh (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15320164#comment-15320164
 ] 

Chandni Singh edited comment on APEXMALHAR-2063 at 6/8/16 7:21 AM:
---

Since we are deprecating these methods, I think it will be good to deprecate 
the following as
1. {code}save(Object object, int operatorId, long windowId){code}
operatorId is redundant for WindowDataManager. WindowDataManager extends 
{code}Component {code} and knows the operator id in 
setup() for which it is saving the data. Also an instance of WindowDataManager 
can write data for only the enclosing partition and not any other partition.
Add {code}save(Object object, long windowId){code} instead

2. {code} load(int operatorId, long windowId)  and load(long windowId) {code} 
In the use cases (operators), the data is loaded for the enclosing partition or 
all the partitions so this could be changed as :
{code}load(long windowId) and loadAll(long windowId){code}

3.  {code} delete(int operatorId, long windowId) {code}
Same as the save(...), operatorId is redundant.
{code}delete(long windowId){code}

Even though WindowDataManager is @Evolving, we can mark these as deprecated and 
add the new ones. 


was (Author: csingh):
Since we are deprecating these methods, I think it will be good to deprecate 
the following as
1. {code}save(Object object, int operatorId, long windowId){code}
operatorId is redundant for WindowDataManager. WindowDataManager extends 
{code}Component {code} and knows the operator id in 
setup() for which it is saving the data. Also an instance of WindowDataManager 
can write data for only the enclosing partition and not any other partition.
Add {code}save(Object object, long windowId){code} instead

2. {code} load(int operatorId, long windowId) {code}
In the use cases (operators), the data is loaded for the enclosing partition or 
all the partitions so this could be changed as :
{code}load(long windowId, boolean allPartitions){code}

3.  {code} delete(int operatorId, long windowId) {code}
Same as the save(...), operatorId is redundant.
{code}delete(long windowId){code}

Even though WindowDataManager is @Evolving, we can mark these as deprecated and 
add the new ones. 

> Integrate WAL to FS WindowDataManager
> -
>
> Key: APEXMALHAR-2063
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2063
> Project: Apache Apex Malhar
>  Issue Type: Improvement
>    Reporter: Chandni Singh
>    Assignee: Chandni Singh
>
> FS Window Data Manager is used to save meta-data that helps in replaying 
> tuples every completed application window after failure. For this it saves 
> meta-data in a file per window. Having multiple small size files on hdfs 
> cause issues as highlighted here:
> http://blog.cloudera.com/blog/2009/02/the-small-files-problem/
> Instead FS Window Data Manager can utilize the WAL to write data and maintain 
> a mapping of how much data was flushed to WAL each window. 
> In order to use FileSystemWAL for replaying data of a finished window, there 
> are few changes made to FileSystemWAL this is because of following:
> 1. WindowDataManager needs to reply data of every finished window. This 
> window may not be checkpointed. 
> FileSystemWAL truncates the WAL file to the checkpointed point after recovery 
> so this poses a problem. 
> WindowDataManager should be able to control recovery of FileSystemWAL.
> 2.  FileSystemWAL writes to temporary files. The mapping of temp files to 
> actual file is part of its state which is checkpointed. Since 
> WindowDataManager replays data of a window not yet checkpointed, it needs to 
> know the actual temporary file the data is being persisted to.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (APEXMALHAR-2063) Integrate WAL to FS WindowDataManager

2016-06-08 Thread Chandni Singh (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15320164#comment-15320164
 ] 

Chandni Singh edited comment on APEXMALHAR-2063 at 6/8/16 7:12 AM:
---

Since we are deprecating these methods, I think it will be good to deprecate 
the following as
1. {code}save(Object object, int operatorId, long windowId){code}
operatorId is redundant for WindowDataManager. WindowDataManager extends 
{code}Component {code} and knows the operator id in 
setup() for which it is saving the data. Also an instance of WindowDataManager 
can write data for only the enclosing partition and not any other partition.
Add {code}save(Object object, long windowId){code} instead

2. {code} load(int operatorId, long windowId) {code}
In the use cases (operators), the data is loaded for the enclosing partition or 
all the partitions so this could be changed as :
{code}load(long windowId, boolean allPartitions){code}

3.  {code} delete(int operatorId, long windowId) {code}
Same as the save(...), operatorId is redundant.
{code}delete(long windowId){code}

Even though WindowDataManager is @Evolving, we can mark these as deprecated and 
add the new ones. 


was (Author: csingh):
Since we are deprecating these methods, I think it will be good to deprecate 
the following as
1. save(Object object, int operatorId, long windowId) 
operatorId is redundant for WindowDataManager. WindowDataManager extends 
{code}Component {code} and knows the operator id in 
setup() for which it is saving the data. Also an instance of WindowDataManager 
can write data for only the enclosing partition and not any other partition.
Add {code}save(Object object, long windowId){code} instead

2. load(int operatorId, long windowId) 
In the use cases (operators), the data is loaded for the enclosing partition or 
all the partitions so this could be changed as :
{code}load(long windowId, boolean allPartitions)  

3.  delete(int operatorId, long windowId) throws IOException;
Same as the save(...), operatorId is redundant.
{code}delete(long windowId){code}

Even though WindowDataManager is @Evolving, we can mark these as deprecated and 
add the new ones. 

> Integrate WAL to FS WindowDataManager
> -
>
> Key: APEXMALHAR-2063
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2063
> Project: Apache Apex Malhar
>  Issue Type: Improvement
>    Reporter: Chandni Singh
>    Assignee: Chandni Singh
>
> FS Window Data Manager is used to save meta-data that helps in replaying 
> tuples every completed application window after failure. For this it saves 
> meta-data in a file per window. Having multiple small size files on hdfs 
> cause issues as highlighted here:
> http://blog.cloudera.com/blog/2009/02/the-small-files-problem/
> Instead FS Window Data Manager can utilize the WAL to write data and maintain 
> a mapping of how much data was flushed to WAL each window. 
> In order to use FileSystemWAL for replaying data of a finished window, there 
> are few changes made to FileSystemWAL this is because of following:
> 1. WindowDataManager needs to reply data of every finished window. This 
> window may not be checkpointed. 
> FileSystemWAL truncates the WAL file to the checkpointed point after recovery 
> so this poses a problem. 
> WindowDataManager should be able to control recovery of FileSystemWAL.
> 2.  FileSystemWAL writes to temporary files. The mapping of temp files to 
> actual file is part of its state which is checkpointed. Since 
> WindowDataManager replays data of a window not yet checkpointed, it needs to 
> know the actual temporary file the data is being persisted to.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (APEXMALHAR-2063) Integrate WAL to FS WindowDataManager

2016-06-08 Thread Chandni Singh (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15320103#comment-15320103
 ] 

Chandni Singh commented on APEXMALHAR-2063:
---

I am close to completing the implementation of all  the methods supported in 
WindowDataManager api.

WindowDataManager API extended the StorageAgent API and I am thinking of 
deprecating the following methods which are not being used in any operator 
using WindowDataManager.
1. long[] getWindowIds(int operatorId)  
2. long[] getWindowIds() 
 
The reason to deprecate them is that the implementation to provide just the 
window ids is not optimal. 
If all the window ids[] are expected without reading the data of those windows 
then we basically re-read wal files.

> Integrate WAL to FS WindowDataManager
> -
>
> Key: APEXMALHAR-2063
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2063
> Project: Apache Apex Malhar
>  Issue Type: Improvement
>    Reporter: Chandni Singh
>    Assignee: Chandni Singh
>
> FS Window Data Manager is used to save meta-data that helps in replaying 
> tuples every completed application window after failure. For this it saves 
> meta-data in a file per window. Having multiple small size files on hdfs 
> cause issues as highlighted here:
> http://blog.cloudera.com/blog/2009/02/the-small-files-problem/
> Instead FS Window Data Manager can utilize the WAL to write data and maintain 
> a mapping of how much data was flushed to WAL each window. 
> In order to use FileSystemWAL for replaying data of a finished window, there 
> are few changes made to FileSystemWAL this is because of following:
> 1. WindowDataManager needs to reply data of every finished window. This 
> window may not be checkpointed. 
> FileSystemWAL truncates the WAL file to the checkpointed point after recovery 
> so this poses a problem. 
> WindowDataManager should be able to control recovery of FileSystemWAL.
> 2.  FileSystemWAL writes to temporary files. The mapping of temp files to 
> actual file is part of its state which is checkpointed. Since 
> WindowDataManager replays data of a window not yet checkpointed, it needs to 
> know the actual temporary file the data is being persisted to.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (APEXMALHAR-2063) Integrate WAL to FS WindowDataManager

2016-06-08 Thread Chandni Singh (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15320102#comment-15320102
 ] 

Chandni Singh commented on APEXMALHAR-2063:
---

I am close to completing the implementation of all  the methods supported in 
WindowDataManager api.

WindowDataManager API extended the StorageAgent API and I am thinking of 
deprecating the following methods which are not being used in any operator 
using WindowDataManager.
1. long[] getWindowIds(int operatorId)  
2. long[] getWindowIds() 
 
The reason to deprecate them is that the implementation to provide just the 
window ids is not optimal. 
If all the window ids[] are expected without reading the data of those windows 
then we basically re-read wal files.

> Integrate WAL to FS WindowDataManager
> -
>
> Key: APEXMALHAR-2063
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2063
> Project: Apache Apex Malhar
>  Issue Type: Improvement
>    Reporter: Chandni Singh
>    Assignee: Chandni Singh
>
> FS Window Data Manager is used to save meta-data that helps in replaying 
> tuples every completed application window after failure. For this it saves 
> meta-data in a file per window. Having multiple small size files on hdfs 
> cause issues as highlighted here:
> http://blog.cloudera.com/blog/2009/02/the-small-files-problem/
> Instead FS Window Data Manager can utilize the WAL to write data and maintain 
> a mapping of how much data was flushed to WAL each window. 
> In order to use FileSystemWAL for replaying data of a finished window, there 
> are few changes made to FileSystemWAL this is because of following:
> 1. WindowDataManager needs to reply data of every finished window. This 
> window may not be checkpointed. 
> FileSystemWAL truncates the WAL file to the checkpointed point after recovery 
> so this poses a problem. 
> WindowDataManager should be able to control recovery of FileSystemWAL.
> 2.  FileSystemWAL writes to temporary files. The mapping of temp files to 
> actual file is part of its state which is checkpointed. Since 
> WindowDataManager replays data of a window not yet checkpointed, it needs to 
> know the actual temporary file the data is being persisted to.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (APEXMALHAR-2063) Integrate WAL to FS WindowDataManager

2016-06-08 Thread Chandni Singh (JIRA)

 [ 
https://issues.apache.org/jira/browse/APEXMALHAR-2063?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chandni Singh updated APEXMALHAR-2063:
--
Description: 
FS Window Data Manager is used to save meta-data that helps in replaying tuples 
every completed application window after failure. For this it saves meta-data 
in a file per window. Having multiple small size files on hdfs cause issues as 
highlighted here:
http://blog.cloudera.com/blog/2009/02/the-small-files-problem/

Instead FS Window Data Manager can utilize the WAL to write data and maintain a 
mapping of how much data was flushed to WAL each window. 

In order to use FileSystemWAL for replaying data of a finished window, there 
are few changes made to FileSystemWAL this is because of following:

1. WindowDataManager needs to reply data of every finished window. This window 
may not be checkpointed. 
FileSystemWAL truncates the WAL file to the checkpointed point after recovery 
so this poses a problem. 
WindowDataManager should be able to control recovery of FileSystemWAL.

2.  FileSystemWAL writes to temporary files. The mapping of temp files to 
actual file is part of its state which is checkpointed. Since WindowDataManager 
replays data of a window not yet checkpointed, it needs to know the actual 
temporary file the data is being persisted to.









  was:
FS Window Data Manager is used to save meta-data that helps in replaying tuples 
every completed application window after failure. For this it saves meta-data 
in a file per window. Having multiple small size files on hdfs cause issues as 
highlighted here:
http://blog.cloudera.com/blog/2009/02/the-small-files-problem/

Instead FS Window Data Manager can utilize the WAL to write data and maintain a 
mapping of how much data was flushed to WAL each window. 

In order to use FileSystemWAL for replaying data of a finished window, there 
are few changes made to FileSystemWAL this is because of following:

1. WindowDataManager needs to reply data of every finished window. This window 
may not be checkpointed. 
FileSystemWAL truncates the WAL file to the checkpointed point after recovery 
so this poses a problem. 
WindowDataManager should be able to control recovery of FileSystemWAL.

2.  FileSystemWAL writes to temporary files. The mapping of temp files to 
actual file is part of its state which is checkpointed. Since WindowDataManager 
replays data of a window not yet checkpointed, it needs to know the actual 
temporary file the data is being persisted to.

At a high level, WindowDataManager will persist meta information on file system 
which includes following details for every window 
- start wal pointer
- end was pointer
- wal file path

This is a single file which is updated every end-window along with the actual 
data in WAL file.








> Integrate WAL to FS WindowDataManager
> -
>
> Key: APEXMALHAR-2063
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2063
> Project: Apache Apex Malhar
>  Issue Type: Improvement
>    Reporter: Chandni Singh
>    Assignee: Chandni Singh
>
> FS Window Data Manager is used to save meta-data that helps in replaying 
> tuples every completed application window after failure. For this it saves 
> meta-data in a file per window. Having multiple small size files on hdfs 
> cause issues as highlighted here:
> http://blog.cloudera.com/blog/2009/02/the-small-files-problem/
> Instead FS Window Data Manager can utilize the WAL to write data and maintain 
> a mapping of how much data was flushed to WAL each window. 
> In order to use FileSystemWAL for replaying data of a finished window, there 
> are few changes made to FileSystemWAL this is because of following:
> 1. WindowDataManager needs to reply data of every finished window. This 
> window may not be checkpointed. 
> FileSystemWAL truncates the WAL file to the checkpointed point after recovery 
> so this poses a problem. 
> WindowDataManager should be able to control recovery of FileSystemWAL.
> 2.  FileSystemWAL writes to temporary files. The mapping of temp files to 
> actual file is part of its state which is checkpointed. Since 
> WindowDataManager replays data of a window not yet checkpointed, it needs to 
> know the actual temporary file the data is being persisted to.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (APEXMALHAR-2063) Integrate WAL to FS WindowDataManager

2016-06-03 Thread Chandni Singh (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15314820#comment-15314820
 ] 

Chandni Singh commented on APEXMALHAR-2063:
---

Embedding the window id into the WAL was not necessary with the Meta file 
solution. The meta file contained offsets from/to in WAL for each window.
Here there will be just one entry per window and the entry is the incremental 
state. 

Yes naming convention + finalizing part files just after rotation will help. 
Currently the finalization is delayed until a window is committed. For replay 
of a window the finalization should be done as soon as the part file gets 
rotated. This again is because we are using it for idempotency on the input 
side and not on the output side.
 
I think we can get around without a Meta file. However, this still requires 
some changes to FSWal mainly re-factoring.

> Integrate WAL to FS WindowDataManager
> -
>
> Key: APEXMALHAR-2063
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2063
> Project: Apache Apex Malhar
>  Issue Type: Improvement
>    Reporter: Chandni Singh
>    Assignee: Chandni Singh
>
> FS Window Data Manager is used to save meta-data that helps in replaying 
> tuples every completed application window after failure. For this it saves 
> meta-data in a file per window. Having multiple small size files on hdfs 
> cause issues as highlighted here:
> http://blog.cloudera.com/blog/2009/02/the-small-files-problem/
> Instead FS Window Data Manager can utilize the WAL to write data and maintain 
> a mapping of how much data was flushed to WAL each window. 
> In order to use FileSystemWAL for replaying data of a finished window, there 
> are few changes made to FileSystemWAL this is because of following:
> 1. WindowDataManager needs to reply data of every finished window. This 
> window may not be checkpointed. 
> FileSystemWAL truncates the WAL file to the checkpointed point after recovery 
> so this poses a problem. 
> WindowDataManager should be able to control recovery of FileSystemWAL.
> 2.  FileSystemWAL writes to temporary files. The mapping of temp files to 
> actual file is part of its state which is checkpointed. Since 
> WindowDataManager replays data of a window not yet checkpointed, it needs to 
> know the actual temporary file the data is being persisted to.
> At a high level, WindowDataManager will persist meta information on file 
> system which includes following details for every window 
> - start wal pointer
> - end was pointer
> - wal file path
> This is a single file which is updated every end-window along with the actual 
> data in WAL file.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (APEXMALHAR-2063) Integrate WAL to FS WindowDataManager

2016-06-03 Thread Chandni Singh (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15314680#comment-15314680
 ] 

Chandni Singh commented on APEXMALHAR-2063:
---

I am looking into the possibility of writing window ids with the WAL entries. 
This requires creating extensions  of FSWALWriter and FSWALReader
- Meta info consists of more than entry size. It includes window id as well. 
- temp files are finalized as soon as they are rotated which ensures that there 
is only one most recent valid temporary file.
- wal content is not truncated to the last checkpoint. It is truncated to the 
point after last finished window.


> Integrate WAL to FS WindowDataManager
> -
>
> Key: APEXMALHAR-2063
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2063
> Project: Apache Apex Malhar
>  Issue Type: Improvement
>    Reporter: Chandni Singh
>    Assignee: Chandni Singh
>
> FS Window Data Manager is used to save meta-data that helps in replaying 
> tuples every completed application window after failure. For this it saves 
> meta-data in a file per window. Having multiple small size files on hdfs 
> cause issues as highlighted here:
> http://blog.cloudera.com/blog/2009/02/the-small-files-problem/
> Instead FS Window Data Manager can utilize the WAL to write data and maintain 
> a mapping of how much data was flushed to WAL each window. 
> In order to use FileSystemWAL for replaying data of a finished window, there 
> are few changes made to FileSystemWAL this is because of following:
> 1. WindowDataManager needs to reply data of every finished window. This 
> window may not be checkpointed. 
> FileSystemWAL truncates the WAL file to the checkpointed point after recovery 
> so this poses a problem. 
> WindowDataManager should be able to control recovery of FileSystemWAL.
> 2.  FileSystemWAL writes to temporary files. The mapping of temp files to 
> actual file is part of its state which is checkpointed. Since 
> WindowDataManager replays data of a window not yet checkpointed, it needs to 
> know the actual temporary file the data is being persisted to.
> At a high level, WindowDataManager will persist meta information on file 
> system which includes following details for every window 
> - start wal pointer
> - end was pointer
> - wal file path
> This is a single file which is updated every end-window along with the actual 
> data in WAL file.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (APEXMALHAR-2096) Add blockThreshold parameter to FSInputModule

2016-05-31 Thread Chandni Singh (JIRA)

 [ 
https://issues.apache.org/jira/browse/APEXMALHAR-2096?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chandni Singh resolved APEXMALHAR-2096.
---
   Resolution: Fixed
Fix Version/s: 3.5.0

> Add blockThreshold parameter to FSInputModule
> -
>
> Key: APEXMALHAR-2096
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2096
> Project: Apache Apex Malhar
>  Issue Type: Improvement
>Reporter: Priyanka Gugale
>Assignee: Priyanka Gugale
> Fix For: 3.5.0
>
>
> FileSplitter is very fast, the downstream operators can't work at that speed, 
> so we need to limit the speed with which fileSplitter works.
> One way to do is limit number of blocks emitted per window.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (APEXMALHAR-2063) Integrate WAL to FS WindowDataManager

2016-05-26 Thread Chandni Singh (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15301685#comment-15301685
 ] 

Chandni Singh commented on APEXMALHAR-2063:
---

Window Data Manager supports dynamic partitioning in operator by allowing an 
instance to read data saved for a particular window id by all operator 
instances. The following method provides that support
{code}
 /**
   * When an operator can partition itself dynamically then there is no 
guarantee that an input state which was being
   * handled by one instance previously will be handled by the same instance 
after partitioning. 
   * For eg. An {@link AbstractFileInputOperator} instance which reads a File X 
till offset l (not check-pointed) may no
   * longer be the instance that handles file X after repartitioning as no. of 
instances may have changed and file X
   * is re-hashed to another instance. 
   * The new instance wouldn't know from what point to read the File X unless 
it reads the idempotent storage of all the
   * operators for the window being replayed and fix it's state.
   *
   * @param windowId window id.
   * @return mapping of operator id to the corresponding state
   * @throws IOException
   */
  Map<Integer, Object> load(long windowId) throws IOException;
{code}
To provide the support for above with FileSystemWAL becomes complicated. 
Currently the FileSystemWAL reader and writer are assumed to be in the same 
physical partition. However, supporting above requires multiple readers which 
are in different physical partitions than the writer.

So the FileSystem WAL needs to be changed, in order to be used in read-only 
mode.

> Integrate WAL to FS WindowDataManager
> -
>
> Key: APEXMALHAR-2063
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2063
> Project: Apache Apex Malhar
>  Issue Type: Improvement
>    Reporter: Chandni Singh
>    Assignee: Chandni Singh
>
> FS Window Data Manager is used to save meta-data that helps in replaying 
> tuples every completed application window after failure. For this it saves 
> meta-data in a file per window. Having multiple small size files on hdfs 
> cause issues as highlighted here:
> http://blog.cloudera.com/blog/2009/02/the-small-files-problem/
> Instead FS Window Data Manager can utilize the WAL to write data and maintain 
> a mapping of how much data was flushed to WAL each window.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (APEXMALHAR-2101) RuntimeException from Default Bucket which is under managed state.

2016-05-24 Thread Chandni Singh (JIRA)

 [ 
https://issues.apache.org/jira/browse/APEXMALHAR-2101?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chandni Singh reassigned APEXMALHAR-2101:
-

Assignee: Chandni Singh

> RuntimeException from Default Bucket which is under managed state.
> --
>
> Key: APEXMALHAR-2101
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2101
> Project: Apache Apex Malhar
>  Issue Type: Bug
>Reporter: Chaitanya
>    Assignee: Chandni Singh
>
> Getting the following exception, while using the ManagedTimeStateImpl 
> operator:
> 2016-05-24 15:42:48,813 ERROR 
> com.datatorrent.stram.engine.StreamingContainer: Operator set 
> [OperatorDeployInfo[id=4,name=join,type=GENERIC,checkpoint={, 
> 0, 
> 0},inputs=[OperatorDeployInfo.InputDeployInfo[portName=input,streamId=GenToJoin,sourceNodeId=2,sourcePortName=outputPort,locality=,partitionMask=1,partitionKeys=[1]],
>  
> OperatorDeployInfo.InputDeployInfo[portName=product,streamId=ProductToJoin,sourceNodeId=1,sourcePortName=outputPort,locality=,partitionMask=0,partitionKeys=]],outputs=[OperatorDeployInfo.OutputDeployInfo[portName=output,streamId=JoinToConsole,bufferServer=chaitanya-HP-ENVY-15-Notebook-PC
>  stopped running due to an exception.
> java.lang.RuntimeException: while loading 0, 12
> at 
> org.apache.apex.malhar.lib.state.managed.Bucket$DefaultBucket.getValueFromTimeBucketReader(Bucket.java:346)
> at 
> org.apache.apex.malhar.lib.state.managed.Bucket$DefaultBucket.getFromReaders(Bucket.java:291)
> at 
> org.apache.apex.malhar.lib.state.managed.Bucket$DefaultBucket.get(Bucket.java:323)
> at 
> org.apache.apex.malhar.lib.state.managed.AbstractManagedStateImpl.getValueFromBucketSync(AbstractManagedStateImpl.java:288)
> at 
> org.apache.apex.malhar.lib.state.managed.ManagedTimeStateImpl.getSync(ManagedTimeStateImpl.java:56)
> at 
> com.examples.app.ManagedStateIntOperator.get(ManagedStateIntOperator.java:91)
> at 
> com.examples.app.ManagedStateIntOperator.processTuple(ManagedStateIntOperator.java:80)
> at 
> com.examples.app.ManagedStateIntOperator$2.process(ManagedStateIntOperator.java:73)
> at 
> com.examples.app.ManagedStateIntOperator$2.process(ManagedStateIntOperator.java:68)
> at com.datatorrent.api.DefaultInputPort.put(DefaultInputPort.java:79)
> at 
> com.datatorrent.stram.stream.BufferServerSubscriber$BufferReservoir.sweep(BufferServerSubscriber.java:280)
> at com.datatorrent.stram.engine.GenericNode.run(GenericNode.java:259)
> at 
> com.datatorrent.stram.engine.StreamingContainer$2.run(StreamingContainer.java:1393)
> Caused by: java.io.EOFException: Cannot seek after EOF
> at 
> org.apache.hadoop.hdfs.DFSInputStream.seek(DFSInputStream.java:1378)
> at 
> org.apache.hadoop.fs.FSDataInputStream.seek(FSDataInputStream.java:62)
> at 
> org.apache.hadoop.io.file.tfile.DTBCFile$Reader.(DTBCFile.java:674)
> at 
> org.apache.hadoop.io.file.tfile.DTFile$Reader.(DTFile.java:827)
> at 
> com.datatorrent.lib.fileaccess.DTFileReader.(DTFileReader.java:55)
> at 
> com.datatorrent.lib.fileaccess.TFileImpl$DTFileImpl.getReader(TFileImpl.java:173)
> at 
> org.apache.apex.malhar.lib.state.managed.BucketsFileSystem.getReader(BucketsFileSystem.java:96)
> at 
> org.apache.apex.malhar.lib.state.managed.Bucket$DefaultBucket.loadFileReader(Bucket.java:371)
> at 
> org.apache.apex.malhar.lib.state.managed.Bucket$DefaultBucket.getValueFromTimeBucketReader(Bucket.java:341)
> ... 12 more



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


  1   2   >