[jira] [Commented] (APEXMALHAR-2086) Kafka Output Operator with Kafka 0.9 API

2016-06-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2086?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15358430#comment-15358430
 ] 

ASF GitHub Bot commented on APEXMALHAR-2086:


Github user tweise commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/298#discussion_r69250081
  
--- Diff: 
kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java
 ---
@@ -0,0 +1,374 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.kafka;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.wal.FSWindowDataManager;
+import org.apache.apex.malhar.lib.wal.WindowDataManager;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.Operator;
+
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
+
+/**
+ * Kafka output operator with exactly once processing semantics under 
certain conditions.,
+ *
+ *  This operator uses *Key* to distinguish the messages written by 
particular instance of the Output operator.
--- End diff --

I think this needs more explanation. What makes it exactly once, are there 
other restrictions or assumptions?


> Kafka Output Operator with Kafka 0.9 API
> 
>
> Key: APEXMALHAR-2086
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2086
> Project: Apache Apex Malhar
>  Issue Type: New Feature
>Reporter: Sandesh
>Assignee: Sandesh
>
> Goal : 2 Operartors for Kafka Output
>   1. Simple Kafka Output Operator 
> - Supports Atleast Once 
> - Expose most used producer properties as class properties
>   2. Exactly Once Kafka Output ( Not possible in all the cases, will be 
> documented later )
> 
> Design for Exactly Once
> Window Data Manager - Stores the Kafka partitions offsets.
> Kafka Key - Used by the operator = AppID#OperatorId
> During recovery. Partially written window is re-created using the following  
> approach:
> Tuples between the largest recovery offsets and the current offset are 
> checked. Based on the key, tuples written by the other entities are 
> discarded. 
> Only tuples which are not in the recovered set are emitted.
> Tuples needs to be unique within the window.
>   



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (APEXMALHAR-2086) Kafka Output Operator with Kafka 0.9 API

2016-06-30 Thread Sandesh (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2086?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15358427#comment-15358427
 ] 

Sandesh commented on APEXMALHAR-2086:
-

Most of the work went into writing Exactly Once output operator, which uses new 
Consumer API during recovery.

> Kafka Output Operator with Kafka 0.9 API
> 
>
> Key: APEXMALHAR-2086
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2086
> Project: Apache Apex Malhar
>  Issue Type: New Feature
>Reporter: Sandesh
>Assignee: Sandesh
>
> Goal : 2 Operartors for Kafka Output
>   1. Simple Kafka Output Operator 
> - Supports Atleast Once 
> - Expose most used producer properties as class properties
>   2. Exactly Once Kafka Output ( Not possible in all the cases, will be 
> documented later )
> 
> Design for Exactly Once
> Window Data Manager - Stores the Kafka partitions offsets.
> Kafka Key - Used by the operator = AppID#OperatorId
> During recovery. Partially written window is re-created using the following  
> approach:
> Tuples between the largest recovery offsets and the current offset are 
> checked. Based on the key, tuples written by the other entities are 
> discarded. 
> Only tuples which are not in the recovered set are emitted.
> Tuples needs to be unique within the window.
>   



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (APEXMALHAR-2086) Kafka Output Operator with Kafka 0.9 API

2016-06-30 Thread Thomas Weise (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2086?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15358398#comment-15358398
 ] 

Thomas Weise commented on APEXMALHAR-2086:
--

Isn't the producer API in 0.9 same as 0.8.x? If so, why do we need this 
operator.

What is different between the existing operator and this? What problems are 
being addressed?


> Kafka Output Operator with Kafka 0.9 API
> 
>
> Key: APEXMALHAR-2086
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2086
> Project: Apache Apex Malhar
>  Issue Type: New Feature
>Reporter: Sandesh
>Assignee: Sandesh
>
> Goal : 2 Operartors for Kafka Output
>   1. Simple Kafka Output Operator 
> - Supports Atleast Once 
> - Expose most used producer properties as class properties
>   2. Exactly Once Kafka Output ( Not possible in all the cases, will be 
> documented later )
> 
> Design for Exactly Once
> Window Data Manager - Stores the Kafka partitions offsets.
> Kafka Key - Used by the operator = AppID#OperatorId
> During recovery. Partially written window is re-created using the following  
> approach:
> Tuples between the largest recovery offsets and the current offset are 
> checked. Based on the key, tuples written by the other entities are 
> discarded. 
> Only tuples which are not in the recovered set are emitted.
> Tuples needs to be unique within the window.
>   



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (APEXMALHAR-2045) Add bandwidth control feature to Apex

2016-06-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2045?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15358369#comment-15358369
 ] 

ASF GitHub Bot commented on APEXMALHAR-2045:


Github user asfgit closed the pull request at:

https://github.com/apache/apex-malhar/pull/279


> Add bandwidth control feature to Apex
> -
>
> Key: APEXMALHAR-2045
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2045
> Project: Apache Apex Malhar
>  Issue Type: Task
>Reporter: Priyanka Gugale
>Assignee: Priyanka Gugale
> Fix For: 3.5.0
>
>
>  bandwidth restrictions on input operator for number of bytes to be consumed 
> per second.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (APEXMALHAR-2045) Add bandwidth control feature to Apex

2016-06-30 Thread Vlad Rozov (JIRA)

 [ 
https://issues.apache.org/jira/browse/APEXMALHAR-2045?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vlad Rozov resolved APEXMALHAR-2045.

   Resolution: Done
Fix Version/s: 3.5.0

> Add bandwidth control feature to Apex
> -
>
> Key: APEXMALHAR-2045
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2045
> Project: Apache Apex Malhar
>  Issue Type: Task
>Reporter: Priyanka Gugale
>Assignee: Priyanka Gugale
> Fix For: 3.5.0
>
>
>  bandwidth restrictions on input operator for number of bytes to be consumed 
> per second.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] apex-malhar pull request #279: APEXMALHAR-2045: Adding bandwidth control fea...

2016-06-30 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/apex-malhar/pull/279


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (APEXMALHAR-2069) FileSplitterInput and TimeBasedDirectoryScanner - move operational fields initialization from constructor to setup

2016-06-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15358192#comment-15358192
 ] 

ASF GitHub Bot commented on APEXMALHAR-2069:


Github user sanjaypujare closed the pull request at:

https://github.com/apache/apex-malhar/pull/329


> FileSplitterInput and TimeBasedDirectoryScanner - move operational fields 
> initialization from constructor to setup
> --
>
> Key: APEXMALHAR-2069
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2069
> Project: Apache Apex Malhar
>  Issue Type: Improvement
>Reporter: Vlad Rozov
>
> For example, there is no need for scanService to be initialized in the 
> constructor. It should be done during operator setup().



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (APEXMALHAR-2069) FileSplitterInput and TimeBasedDirectoryScanner - move operational fields initialization from constructor to setup

2016-06-30 Thread Sanjay M Pujare (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15358074#comment-15358074
 ] 

Sanjay M Pujare commented on APEXMALHAR-2069:
-

[~csingh] Can you comment on how we can redesign the tests after this change? 
The failing tests are:

  
FileSplitterInputTest.testFirstWindowAfterRecovery:353->testIdempotencyWithBlocksThreshold:336
 
  FileSplitterInputTest.testIdempotency:231 » Runtime multiple calls to setup() 
...
  FileSplitterInputTest.testIdempotencyWithBlocksThreshold:336 » Runtime 
multipl...
  FileSplitterInputTest.testRecoveryOfBlockMetadataIterator:517 » Runtime 
multip...

> FileSplitterInput and TimeBasedDirectoryScanner - move operational fields 
> initialization from constructor to setup
> --
>
> Key: APEXMALHAR-2069
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2069
> Project: Apache Apex Malhar
>  Issue Type: Improvement
>Reporter: Vlad Rozov
>
> For example, there is no need for scanService to be initialized in the 
> constructor. It should be done during operator setup().



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (APEXMALHAR-2069) FileSplitterInput and TimeBasedDirectoryScanner - move operational fields initialization from constructor to setup

2016-06-30 Thread Sanjay M Pujare (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15358061#comment-15358061
 ] 

Sanjay M Pujare commented on APEXMALHAR-2069:
-

> IMO, it is safer to create new execution service each time setup is called 
> and possibly assert or throw RuntimeException if scanService is not null to 
> avoid misusage in unit tests.

That's a simple fix, but then we'll have to redesign or recreate Idempotency 
related unit tests in FileSplitterInputTest and I would like to weigh the 
trade-offs of making this change.

> FileSplitterInput and TimeBasedDirectoryScanner - move operational fields 
> initialization from constructor to setup
> --
>
> Key: APEXMALHAR-2069
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2069
> Project: Apache Apex Malhar
>  Issue Type: Improvement
>Reporter: Vlad Rozov
>
> For example, there is no need for scanService to be initialized in the 
> constructor. It should be done during operator setup().



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (APEXMALHAR-2130) implement scalable windowed storage

2016-06-30 Thread David Yan (JIRA)

 [ 
https://issues.apache.org/jira/browse/APEXMALHAR-2130?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Yan updated APEXMALHAR-2130:
--
Description: 
This feature is used for supporting windowing.

The storage needs to have the following features:
1. Spillable key value storage (integrate with APEXMALHAR-2026)
2. Upon checkpoint, it saves a snapshot for the entire data set with the 
checkpointing window id.  This should be done incrementally (ManagedState) to 
avoid wasting space with unchanged data
3. When recovering, it takes the recovery window id and restores to that 
snapshot
4. When a window is committed, all windows with a lower ID should be purged 
from the store.
5. It should implement the WindowedStorage and WindowedKeyedStorage interfaces, 
and because of 2 and 3, we may want to add methods to the WindowedStorage 
interface so that the implementation of WindowedOperator can notify the storage 
of checkpointing, recovering and committing of a window.


  was:
This feature is used for supporting windowing.

The storage needs to have the following features:
1. Spillable key value storage (integrate with APEXMALHAR-2026)
2. Upon checkpoint, it saves a snapshot for the entire data set with the 
checkpointing window id
3. When recovering, it takes the recovery window id and restores to that 
snapshot
4. It should implement the WindowedStorage and WindowedKeyedStorage interfaces, 
and because of 2 and 3, we may want to add methods to the WindowedStorage 
interface so that the implementation of WindowedOperator can notify the storage 
of checkpointing, recovering and committing of a window.
5. Item 2 should be done incrementally (ManagedState) to avoid wasting space 
with unchanged data
6. When a window is committed, all window with a lower ID should be purged from 
the store.


> implement scalable windowed storage
> ---
>
> Key: APEXMALHAR-2130
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2130
> Project: Apache Apex Malhar
>  Issue Type: Task
>Reporter: bright chen
>Assignee: bright chen
>
> This feature is used for supporting windowing.
> The storage needs to have the following features:
> 1. Spillable key value storage (integrate with APEXMALHAR-2026)
> 2. Upon checkpoint, it saves a snapshot for the entire data set with the 
> checkpointing window id.  This should be done incrementally (ManagedState) to 
> avoid wasting space with unchanged data
> 3. When recovering, it takes the recovery window id and restores to that 
> snapshot
> 4. When a window is committed, all windows with a lower ID should be purged 
> from the store.
> 5. It should implement the WindowedStorage and WindowedKeyedStorage 
> interfaces, and because of 2 and 3, we may want to add methods to the 
> WindowedStorage interface so that the implementation of WindowedOperator can 
> notify the storage of checkpointing, recovering and committing of a window.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (APEXMALHAR-2130) implement scalable windowed storage

2016-06-30 Thread David Yan (JIRA)

 [ 
https://issues.apache.org/jira/browse/APEXMALHAR-2130?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Yan updated APEXMALHAR-2130:
--
Description: 
This feature is used for supporting windowing.

The storage needs to have the following features:
1. Spillable key value storage (integrate with APEXMALHAR-2026)
2. Upon checkpoint, it saves a snapshot for the entire data set with the 
checkpointing window id
3. When recovering, it takes the recovery window id and restores to that 
snapshot
4. It should implement the WindowedStorage and WindowedKeyedStorage interfaces, 
and because of 2 and 3, we may want to add methods to the WindowedStorage 
interface so that the implementation of WindowedOperator can notify the storage 
of checkpointing and recovering.
5. Item 2 should be done incrementally (ManagedState) to avoid wasting space 
with unchanged data
6. When a window is committed, all window with a lower ID should be purged from 
the store.

  was:
This feature is used for supporting windowing.

The storage needs to have the following features:
1. Spillable key value storage (integrate with APEXMALHAR-2026)
2. Upon checkpoint, it saves a snapshot for the entire data set with the 
checkpointing window id
3. When recovering, it takes the recovery window id and restores to that 
snapshot
4. It should implement the WindowedStorage and WindowedKeyedStorage interfaces, 
and because of 2 and 3, we may want to add methods to the WindowedStorage 
interface so that the implementation of WindowedOperator can notify the storage 
of checkpointing and recovering.
5. Item 2 should be done incrementally (ManagedState) to avoid wasting space 
with unchanged data



> implement scalable windowed storage
> ---
>
> Key: APEXMALHAR-2130
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2130
> Project: Apache Apex Malhar
>  Issue Type: Task
>Reporter: bright chen
>Assignee: bright chen
>
> This feature is used for supporting windowing.
> The storage needs to have the following features:
> 1. Spillable key value storage (integrate with APEXMALHAR-2026)
> 2. Upon checkpoint, it saves a snapshot for the entire data set with the 
> checkpointing window id
> 3. When recovering, it takes the recovery window id and restores to that 
> snapshot
> 4. It should implement the WindowedStorage and WindowedKeyedStorage 
> interfaces, and because of 2 and 3, we may want to add methods to the 
> WindowedStorage interface so that the implementation of WindowedOperator can 
> notify the storage of checkpointing and recovering.
> 5. Item 2 should be done incrementally (ManagedState) to avoid wasting space 
> with unchanged data
> 6. When a window is committed, all window with a lower ID should be purged 
> from the store.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (APEXMALHAR-2130) implement scalable windowed storage

2016-06-30 Thread David Yan (JIRA)

 [ 
https://issues.apache.org/jira/browse/APEXMALHAR-2130?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Yan updated APEXMALHAR-2130:
--
Description: 
This feature is used for supporting windowing.

The storage needs to have the following features:
1. Spillable key value storage (integrate with APEXMALHAR-2026)
2. Upon checkpoint, it saves a snapshot for the entire data set with the 
checkpointing window id
3. When recovering, it takes the recovery window id and restores to that 
snapshot
4. It should implement the WindowedStorage and WindowedKeyedStorage interfaces, 
and because of 2 and 3, we may want to add methods to the WindowedStorage 
interface so that the implementation of WindowedOperator can notify the storage 
of checkpointing and recovering.
5. Item 2 should be done incrementally (ManagedState) to avoid wasting space 
with unchanged data


  was:
This feature is used for supporting windowing.

The storage needs to have the following features:
1. Spillable key value storage (integrate with APEXMALHAR-2026)
2. Upon checkpoint, it saves a snapshot for the entire data set with the 
checkpointing window id
3. When recovering, it takes the recovery window id and restores to that 
snapshot
4. It should implement the WindowedStorage and WindowedKeyedStorage interfaces, 
and because of 2 and 3, we may want to add methods to the WindowedStorage 
interface so that the implementation of WindowedOperator can notify the storage 
of checkpointing and recovering.
5. Item 2 should be done incrementally (ManagedState



> implement scalable windowed storage
> ---
>
> Key: APEXMALHAR-2130
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2130
> Project: Apache Apex Malhar
>  Issue Type: Task
>Reporter: bright chen
>Assignee: bright chen
>
> This feature is used for supporting windowing.
> The storage needs to have the following features:
> 1. Spillable key value storage (integrate with APEXMALHAR-2026)
> 2. Upon checkpoint, it saves a snapshot for the entire data set with the 
> checkpointing window id
> 3. When recovering, it takes the recovery window id and restores to that 
> snapshot
> 4. It should implement the WindowedStorage and WindowedKeyedStorage 
> interfaces, and because of 2 and 3, we may want to add methods to the 
> WindowedStorage interface so that the implementation of WindowedOperator can 
> notify the storage of checkpointing and recovering.
> 5. Item 2 should be done incrementally (ManagedState) to avoid wasting space 
> with unchanged data



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (APEXMALHAR-2130) implement scalable windowed storage

2016-06-30 Thread David Yan (JIRA)

 [ 
https://issues.apache.org/jira/browse/APEXMALHAR-2130?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Yan updated APEXMALHAR-2130:
--
Description: 
This feature is used for supporting windowing.

The storage needs to have the following features:
1. Spillable key value storage (integrate with APEXMALHAR-2026)
2. Upon checkpoint, it saves a snapshot for the entire data set with the 
checkpointing window id
3. When recovering, it takes the recovery window id and restores to that 
snapshot
4. It should implement the WindowedStorage and WindowedKeyedStorage interfaces, 
and because of 2 and 3, we may want to add methods to the WindowedStorage 
interface so that the implementation of WindowedOperator can notify the storage 
of checkpointing and recovering.
5. Item 2 should be done incrementally (ManagedState


  was:
This feature is used for supporting windowing.

The storage needs to have the following features:
1. Spillable key value storage (integrate with APEXMALHAR-2026)
2. Upon checkpoint, it saves a snapshot for the entire data set with the 
checkpointing window id
3. When recovering, it takes the recovery window id and restores to that 
snapshot
4. It should implement the WindowedStorage and WindowedKeyedStorage interfaces, 
and because of 2 and 3, we may want to add methods to the WindowedStorage 
interface so that the implementation of WindowedOperator can notify the storage 
of checkpointing and recovering.



> implement scalable windowed storage
> ---
>
> Key: APEXMALHAR-2130
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2130
> Project: Apache Apex Malhar
>  Issue Type: Task
>Reporter: bright chen
>Assignee: bright chen
>
> This feature is used for supporting windowing.
> The storage needs to have the following features:
> 1. Spillable key value storage (integrate with APEXMALHAR-2026)
> 2. Upon checkpoint, it saves a snapshot for the entire data set with the 
> checkpointing window id
> 3. When recovering, it takes the recovery window id and restores to that 
> snapshot
> 4. It should implement the WindowedStorage and WindowedKeyedStorage 
> interfaces, and because of 2 and 3, we may want to add methods to the 
> WindowedStorage interface so that the implementation of WindowedOperator can 
> notify the storage of checkpointing and recovering.
> 5. Item 2 should be done incrementally (ManagedState



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (APEXMALHAR-2130) implement scalable windowed storage

2016-06-30 Thread David Yan (JIRA)

 [ 
https://issues.apache.org/jira/browse/APEXMALHAR-2130?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Yan updated APEXMALHAR-2130:
--
Description: 
This feature is used for supporting windowing.

The storage needs to have the following features:
1. Spillable key value storage (integrate with APEXMALHAR-2026)
2. Upon checkpoint, it saves a snapshot for the entire data set with the 
checkpointing window id
3. When recovering, it takes the recovery window id and restores to that 
snapshot
4. It should implement the WindowedStorage and WindowedKeyedStorage interfaces, 
and because of 2 and 3, we may want to add methods to the WindowedStorage 
interface so that the implementation of WindowedOperator can notify the storage 
of checkpointing and recovering.


  was:This feature is used for supporting windowing.


> implement scalable windowed storage
> ---
>
> Key: APEXMALHAR-2130
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2130
> Project: Apache Apex Malhar
>  Issue Type: Task
>Reporter: bright chen
>Assignee: bright chen
>
> This feature is used for supporting windowing.
> The storage needs to have the following features:
> 1. Spillable key value storage (integrate with APEXMALHAR-2026)
> 2. Upon checkpoint, it saves a snapshot for the entire data set with the 
> checkpointing window id
> 3. When recovering, it takes the recovery window id and restores to that 
> snapshot
> 4. It should implement the WindowedStorage and WindowedKeyedStorage 
> interfaces, and because of 2 and 3, we may want to add methods to the 
> WindowedStorage interface so that the implementation of WindowedOperator can 
> notify the storage of checkpointing and recovering.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (APEXMALHAR-2069) FileSplitterInput and TimeBasedDirectoryScanner - move operational fields initialization from constructor to setup

2016-06-30 Thread Sanjay M Pujare (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15357592#comment-15357592
 ] 

Sanjay M Pujare commented on APEXMALHAR-2069:
-

Yes, I noticed that but the actual test that is doing this is 

testIdempotencyWithBlocksThreshold()

And testFirstWindowAfterRecovery() just calls this test (which concerns me a 
bit). 

In any case my observation is that the test behavior is not consistent and even 
in isolation I can get it to fail. I think the test (or tests) should be 
cleaned up to make them repeatable and consistent. 

> FileSplitterInput and TimeBasedDirectoryScanner - move operational fields 
> initialization from constructor to setup
> --
>
> Key: APEXMALHAR-2069
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2069
> Project: Apache Apex Malhar
>  Issue Type: Improvement
>Reporter: Vlad Rozov
>
> For example, there is no need for scanService to be initialized in the 
> constructor. It should be done during operator setup().



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (APEXMALHAR-2130) implement scalable windowed storage

2016-06-30 Thread bright chen (JIRA)

 [ 
https://issues.apache.org/jira/browse/APEXMALHAR-2130?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

bright chen updated APEXMALHAR-2130:

Description: This feature is used for supporting windowing.

> implement scalable windowed storage
> ---
>
> Key: APEXMALHAR-2130
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2130
> Project: Apache Apex Malhar
>  Issue Type: Task
>Reporter: bright chen
>Assignee: bright chen
>
> This feature is used for supporting windowing.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (APEXMALHAR-2069) FileSplitterInput and TimeBasedDirectoryScanner - move operational fields initialization from constructor to setup

2016-06-30 Thread Sanjay M Pujare (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15357517#comment-15357517
 ] 

Sanjay M Pujare commented on APEXMALHAR-2069:
-

Without the null check I get the following failure:

  FileSplitterInputTest.testFirstWindowAfterRecovery:369 Files expected:<1> but 
was:<2>

I implicitly assumed the unit test is trying to enforce a contract that was 
violated by my earlier change so I made this change. I'll track this down and 
see if the unit test is in fact "misusing" anything and needs to be fixed. Also 
the unit test has some dependency issues since running it in isolation makes it 
pass.

> FileSplitterInput and TimeBasedDirectoryScanner - move operational fields 
> initialization from constructor to setup
> --
>
> Key: APEXMALHAR-2069
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2069
> Project: Apache Apex Malhar
>  Issue Type: Improvement
>Reporter: Vlad Rozov
>
> For example, there is no need for scanService to be initialized in the 
> constructor. It should be done during operator setup().



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (APEXMALHAR-2129) ManagedState: Add a disable purging option

2016-06-30 Thread Thomas Weise (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2129?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15357484#comment-15357484
 ] 

Thomas Weise commented on APEXMALHAR-2129:
--

It still isn't clear to me why this would be needed. Please first clarify the 
use case. A batch can be seen as a global window and we are already 
implementing these semantics: APEXMALHAR-2085


> ManagedState: Add a disable purging option
> --
>
> Key: APEXMALHAR-2129
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2129
> Project: Apache Apex Malhar
>  Issue Type: Task
>Reporter: Bhupesh Chawda
>Assignee: Chandni Singh
>
> Have an option that can disable purging of data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] apex-malhar pull request #326: APEXMALHAR-2116 Added FS record reader operat...

2016-06-30 Thread yogidevendra
Github user yogidevendra commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/326#discussion_r69111489
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordReaderModule.java 
---
@@ -0,0 +1,332 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.lib.fs;
+
+import javax.validation.constraints.Min;
+
+import javax.validation.constraints.NotNull;
+import javax.validation.constraints.Size;
+
+import org.apache.apex.malhar.lib.fs.FSRecordReader.RECORD_READER_MODE;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.Module;
+import com.datatorrent.common.partitioner.StatelessPartitioner;
+import com.datatorrent.lib.codec.KryoSerializableStreamCodec;
+import com.datatorrent.lib.io.block.BlockMetadata;
+import com.datatorrent.lib.io.block.FSSliceReader;
+import com.datatorrent.lib.io.fs.FileSplitterInput;
+
+/**
+ * This module is used for reading records/tuples from FileSystem. Records 
can
+ * be read in parallel using multiple partitions of record reader operator.
+ * (Ordering is not guaranteed when records are read in parallel)
+ *
+ * Input directory is scanned at specified interval to poll for new data.
+ * 
+ * The module reads data in parallel, following parameters can be 
configured
+ * 
+ * 1. files: list of file(s)/directories to read
+ * 2. filePatternRegularExp: Files names matching given regex will be 
read
+ * 3. scanIntervalMillis: interval between two scans to discover new files 
in
+ * input directory
+ * 4. recursive: if scan recursively input directories
+ * 5. blockSize: block size used to read input blocks of file
+ * 6. readersCount: count of readers to read input file
+ * 7. sequentialFileRead: If emit file blocks in sequence?
+ * 8. blocksThreshold: number of blocks emitted per window
+ */
+@org.apache.hadoop.classification.InterfaceStability.Evolving
+public class FSRecordReaderModule implements Module
+{
+  @NotNull
+  @Size(min = 1)
+  private String files;
+  private String filePatternRegularExp;
+  @Min(0)
+  private long scanIntervalMillis;
+  private boolean recursive = true;
+  private boolean sequentialFileRead = false;
+  private int readersCount;
+  @Min(1)
+  protected int blocksThreshold;
+
+  public final transient ProxyOutputPort records = new 
ProxyOutputPort();
+
+  /**
+   * Criteria for record split
+   */
+  private RECORD_READER_MODE mode;
+
+  /**
+   * Length for fixed width record
+   */
+  private int recordLength;
+
+  public FileSplitterInput createFileSplitter()
+  {
+return new FileSplitterInput();
+  }
+
+  public FSRecordReader createBlockReader()
+  {
+FSRecordReader recordReader = new FSRecordReader();
+recordReader.setMode(mode);
+recordReader.setRecordLength(recordLength);
+
+return recordReader;
+  }
+
+  @Override
+  public void populateDAG(DAG dag, Configuration configuration)
+  {
+FileSplitterInput fileSplitter = dag.addOperator("FileSplitter", 
createFileSplitter());
+FSRecordReader recordReader = dag.addOperator("BlockReader", 
createBlockReader());
+
+dag.addStream("BlockMetadata", fileSplitter.blocksMetadataOutput, 
recordReader.blocksMetadataInput);
+
+if (sequentialFileRead) {
+  dag.setInputPortAttribute(recordReader.blocksMetadataInput, 
Context.PortContext.STREAM_CODEC,
+  new SequentialFileBlockMetadataCodec());
+}
+
+FileSplitterInput.TimeBasedDirectoryScanner fileScanner = 
fileSplitter.getScanner();
+fileScanner.setFiles(files);
+if (scanIntervalMillis != 0) {
+  fileScanner.setScanIntervalMillis(scanIntervalMillis);
+}
+

[jira] [Commented] (APEXMALHAR-2129) ManagedState: Add a disable purging option

2016-06-30 Thread Bhupesh Chawda (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2129?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15356775#comment-15356775
 ] 

Bhupesh Chawda commented on APEXMALHAR-2129:


Okay, sure.
Just another point: Disabling purging may retain data in managed state. We must 
also make sure the data never expires. Something like setting a very large 
expiry perhaps?

> ManagedState: Add a disable purging option
> --
>
> Key: APEXMALHAR-2129
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2129
> Project: Apache Apex Malhar
>  Issue Type: Task
>Reporter: Bhupesh Chawda
>Assignee: Chandni Singh
>
> Have an option that can disable purging of data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)