[jira] [Commented] (APEXCORE-777) Application Master may not shutdown due to incorrect numRequestedContainers counting

2017-08-15 Thread Sanjay M Pujare (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXCORE-777?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16127605#comment-16127605
 ] 

Sanjay M Pujare commented on APEXCORE-777:
--

I change my "should be" to "can be". 

In any case consider the following: we have no unit or automated tests to 
verify that the behavior hasn't changed after refactoring. During refactoring 
we are obviously going to consider the outstanding and fixed defects to see 
what new data structures and functions need to be introduced. Also while 
refactoring if you notice an obvious flaw in the old logic you would want to 
fix it in the refactored code and I suspect this bug could be one of those 
things.

> Application Master may not shutdown due to incorrect numRequestedContainers 
> counting
> 
>
> Key: APEXCORE-777
> URL: https://issues.apache.org/jira/browse/APEXCORE-777
> Project: Apache Apex Core
>  Issue Type: Bug
>Reporter: Vlad Rozov
>Priority: Minor
>
> Consider a scenario where App master requests a container from Yarn 
> (numRequestedContainers = 1). There is not enough resources and the request 
> timeouts. My understanding is that App master will re-request it again but 
> the number of requested containers will not change (one newly requested, one 
> removed). Let's assume that App master, by the time Yarn responds back 
> decides that it does not need any. If Yarn responds with one allocated 
> containers, numRequestedContainers will go to 0 (correct), but Yarn may 
> respond back with 2 allocated containers if by the time App Master sends the 
> second request it already allocated a container requested in the original 
> request (the one that timeouted) as Yarn does not guarantee that removed 
> request is fullfilled (see Yarn doc). Will not in this case 
> numRequestedContainers be -1 due to the bulk decrement?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (APEXCORE-777) Application Master may not shutdown due to incorrect numRequestedContainers counting

2017-08-15 Thread Vlad Rozov (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXCORE-777?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16127666#comment-16127666
 ] 

Vlad Rozov commented on APEXCORE-777:
-

-1 means "must not". While I am +1 on code refactoring and agree that it needs 
to be done to help with unit testing and readability/maintainability of the 
code, this particular issue must be fixed as part of this JIRA and a separate 
PR.

> Application Master may not shutdown due to incorrect numRequestedContainers 
> counting
> 
>
> Key: APEXCORE-777
> URL: https://issues.apache.org/jira/browse/APEXCORE-777
> Project: Apache Apex Core
>  Issue Type: Bug
>Reporter: Vlad Rozov
>Priority: Minor
>
> Consider a scenario where App master requests a container from Yarn 
> (numRequestedContainers = 1). There is not enough resources and the request 
> timeouts. My understanding is that App master will re-request it again but 
> the number of requested containers will not change (one newly requested, one 
> removed). Let's assume that App master, by the time Yarn responds back 
> decides that it does not need any. If Yarn responds with one allocated 
> containers, numRequestedContainers will go to 0 (correct), but Yarn may 
> respond back with 2 allocated containers if by the time App Master sends the 
> second request it already allocated a container requested in the original 
> request (the one that timeouted) as Yarn does not guarantee that removed 
> request is fullfilled (see Yarn doc). Will not in this case 
> numRequestedContainers be -1 due to the bulk decrement?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (APEXCORE-777) Application Master may not shutdown due to incorrect numRequestedContainers counting

2017-08-15 Thread Sanjay M Pujare (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXCORE-777?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16126034#comment-16126034
 ] 

Sanjay M Pujare edited comment on APEXCORE-777 at 8/15/17 6:06 PM:
---

This can be addressed as part of refactor JIRA APEXCORE-771. 

When Yarn returns 2 containers, the code processes the first allocated 
container and because it is obviously not "already allocated" it does not touch 
the counters and then removes the outstanding request from requestedResources 
Map. But later it determines the container is not needed anymore so it creates 
a release-request for that container. While processing the second 
allocated-container it cannot determine this to be "already allocated" case 
because the request was removed from the requestedResources map and so 
numRequestedContainers doesn't get incremented. It again figures out this 
container is not needed so creates a release-request for this container. But 
then numRequestedContainers stays at -1 and that is the problem.

Basically a request should NEVER be removed from the requestedResources map, so 
that the code can ascertain "already allocated" cases even in cases such as 
this. We should have additional flags/states in the map to denote a request as 
"removed" (with reason for the removal) so it is possible to match later 
allocations against these "removed" requests.


was (Author: sanjaypujare):
This should be addressed as part of refactor JIRA APEXCORE-771. 

When Yarn returns 2 containers, the code processes the first allocated 
container and because it is obviously not "already allocated" it does not touch 
the counters and then removes the outstanding request from requestedResources 
Map. But later it determines the container is not needed anymore so it creates 
a release-request for that container. While processing the second 
allocated-container it cannot determine this to be "already allocated" case 
because the request was removed from the requestedResources map and so 
numRequestedContainers doesn't get incremented. It again figures out this 
container is not needed so creates a release-request for this container. But 
then numRequestedContainers stays at -1 and that is the problem.

Basically a request should NEVER be removed from the requestedResources map, so 
that the code can ascertain "already allocated" cases even in cases such as 
this. We should have additional flags/states in the map to denote a request as 
"removed" (with reason for the removal) so it is possible to match later 
allocations against these "removed" requests.

> Application Master may not shutdown due to incorrect numRequestedContainers 
> counting
> 
>
> Key: APEXCORE-777
> URL: https://issues.apache.org/jira/browse/APEXCORE-777
> Project: Apache Apex Core
>  Issue Type: Bug
>Reporter: Vlad Rozov
>Priority: Minor
>
> Consider a scenario where App master requests a container from Yarn 
> (numRequestedContainers = 1). There is not enough resources and the request 
> timeouts. My understanding is that App master will re-request it again but 
> the number of requested containers will not change (one newly requested, one 
> removed). Let's assume that App master, by the time Yarn responds back 
> decides that it does not need any. If Yarn responds with one allocated 
> containers, numRequestedContainers will go to 0 (correct), but Yarn may 
> respond back with 2 allocated containers if by the time App Master sends the 
> second request it already allocated a container requested in the original 
> request (the one that timeouted) as Yarn does not guarantee that removed 
> request is fullfilled (see Yarn doc). Will not in this case 
> numRequestedContainers be -1 due to the bulk decrement?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (APEXCORE-777) Application Master may not shutdown due to incorrect numRequestedContainers counting

2017-08-15 Thread Pramod Immaneni (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXCORE-777?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16127863#comment-16127863
 ] 

Pramod Immaneni commented on APEXCORE-777:
--

This seems like an important issue to fix. [~sanjaypujare] are you planning to 
look into this issue as you are probably familiar with this part of the code.

> Application Master may not shutdown due to incorrect numRequestedContainers 
> counting
> 
>
> Key: APEXCORE-777
> URL: https://issues.apache.org/jira/browse/APEXCORE-777
> Project: Apache Apex Core
>  Issue Type: Bug
>Reporter: Vlad Rozov
>Priority: Minor
>
> Consider a scenario where App master requests a container from Yarn 
> (numRequestedContainers = 1). There is not enough resources and the request 
> timeouts. My understanding is that App master will re-request it again but 
> the number of requested containers will not change (one newly requested, one 
> removed). Let's assume that App master, by the time Yarn responds back 
> decides that it does not need any. If Yarn responds with one allocated 
> containers, numRequestedContainers will go to 0 (correct), but Yarn may 
> respond back with 2 allocated containers if by the time App Master sends the 
> second request it already allocated a container requested in the original 
> request (the one that timeouted) as Yarn does not guarantee that removed 
> request is fullfilled (see Yarn doc). Will not in this case 
> numRequestedContainers be -1 due to the bulk decrement?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (APEXMALHAR-2034) Avro File To POJO Module

2017-08-15 Thread Saumya Mohan (JIRA)

 [ 
https://issues.apache.org/jira/browse/APEXMALHAR-2034?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Saumya Mohan updated APEXMALHAR-2034:
-
Summary: Avro File To POJO Module  (was: Avro File Input Operator)

> Avro File To POJO Module
> 
>
> Key: APEXMALHAR-2034
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2034
> Project: Apache Apex Malhar
>  Issue Type: New Feature
>Reporter: devendra tagare
>Assignee: Saumya Mohan
>
> This JIRA is used to create a Module on top of AvroFileInputOperator and 
> AvroToPojo operator. The stream between the two operators will be set to 
> CONTAINER_LOCAL which is required as Avro objects are not serialized by Kryo. 
> This will help users to directly use the module which has the locality set to 
> CONTAINER_LOCAL.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (APEXMALHAR-2034) Avro File Input Operator

2017-08-15 Thread Saumya Mohan (JIRA)

 [ 
https://issues.apache.org/jira/browse/APEXMALHAR-2034?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Saumya Mohan updated APEXMALHAR-2034:
-
Description: 

This JIRA is used to create a Module on top of AvroFileInputOperator and 
AvroToPojo operator. The stream between the two operators will be set to 
CONTAINER_LOCAL which is required as Avro objects are not serialized by Kryo. 
This will help users to directly use the module which has the locality set to 
CONTAINER_LOCAL.

  was:
This operator would extend the AbstractFileInputOperator to read Avro Container 
files.

Input would be an Avro Container File.
Output would be a GenericRecord.

There would be 2 additional optional ports,
1.FilePort - for completed files.
2.FailedRecordsPort - this will capture fileName,Offset & error message as a 
string for handling by a downstream operator.

This operator can be used in isolation or with the AvroToPojo operator to read 
an Avro record and convert it to a POJO.

-
This JIRA is used to create a Module on top of AvroFileInputOperator and 
AvroToPojo operator. The stream between the two operators will be set to 
CONTAINER_LOCAL which is required as Avro objects are not serialized by Kryo. 
This will help users to directly use the module which has the locality set to 
CONTAINER_LOCAL.


> Avro File Input Operator
> 
>
> Key: APEXMALHAR-2034
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2034
> Project: Apache Apex Malhar
>  Issue Type: New Feature
>Reporter: devendra tagare
>Assignee: Saumya Mohan
>
> This JIRA is used to create a Module on top of AvroFileInputOperator and 
> AvroToPojo operator. The stream between the two operators will be set to 
> CONTAINER_LOCAL which is required as Avro objects are not serialized by Kryo. 
> This will help users to directly use the module which has the locality set to 
> CONTAINER_LOCAL.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (APEXMALHAR-2034) Avro File To POJO Module

2017-08-15 Thread Saumya Mohan (JIRA)

 [ 
https://issues.apache.org/jira/browse/APEXMALHAR-2034?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Saumya Mohan updated APEXMALHAR-2034:
-
Description: 
Issue:
Avro objects are not serialized by Kryo causing the Avro GenericRecord to not 
be available to downstream operators if users don't explicitly mark the stream 
locality at container_local or thread_local. 

Solution:
This JIRA is used to create a Module on top of AvroFileInputOperator and 
AvroToPojo operators such that downstream operators will access POJO instead of 
Avro GenericRecord.

In this Module, the stream between the two encapsulated operators 
(AvroFileInputOperator and AvroToPojo) is set to CONTAINER_LOCAL. 

This new module removes the exposure of GenericRecord to downstream operators 
and instead exposes the created POJO to downstream operators.




  was:

This JIRA is used to create a Module on top of AvroFileInputOperator and 
AvroToPojo operator. The stream between the two operators will be set to 
CONTAINER_LOCAL which is required as Avro objects are not serialized by Kryo. 
This will help users to directly use the module which has the locality set to 
CONTAINER_LOCAL.


> Avro File To POJO Module
> 
>
> Key: APEXMALHAR-2034
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2034
> Project: Apache Apex Malhar
>  Issue Type: New Feature
>Reporter: devendra tagare
>Assignee: Saumya Mohan
>
> Issue:
> Avro objects are not serialized by Kryo causing the Avro GenericRecord to not 
> be available to downstream operators if users don't explicitly mark the 
> stream locality at container_local or thread_local. 
> Solution:
> This JIRA is used to create a Module on top of AvroFileInputOperator and 
> AvroToPojo operators such that downstream operators will access POJO instead 
> of Avro GenericRecord.
> In this Module, the stream between the two encapsulated operators 
> (AvroFileInputOperator and AvroToPojo) is set to CONTAINER_LOCAL. 
> This new module removes the exposure of GenericRecord to downstream operators 
> and instead exposes the created POJO to downstream operators.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (APEXCORE-743) Killed container is shown as running

2017-08-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXCORE-743?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16127955#comment-16127955
 ] 

ASF GitHub Bot commented on APEXCORE-743:
-

sandeshh closed pull request #543: APEXCORE-743 Killed container shown as 
running
URL: https://github.com/apache/apex-core/pull/543
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Killed container is shown as running
> 
>
> Key: APEXCORE-743
> URL: https://issues.apache.org/jira/browse/APEXCORE-743
> Project: Apache Apex Core
>  Issue Type: Bug
>Reporter: Sandesh
>Assignee: Sandesh
>
> Here is the behavior
> 1. Container Heartbeat timeout happened
> 2. AppMaster sends the request to kill the container
> 3. Container is killed
> 4.  AppMaster state is not updated and no new container was allocated
> After analyzing the code here is the possible reason
> 1. Send the kill request to NM
> 2. Container killed by NM, but NM callback doesn't happen. RecoverContainer 
> is called in NM callback, which in this case is not called.
> 3. AppMaster state is not updated
> Possible fix.
> Have a timeout for NM callback, so that if NM doesn't respond that the 
> container is killed in time, call the RecoverContainer. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (APEXCORE-743) Killed container is shown as running

2017-08-15 Thread Sandesh (JIRA)

 [ 
https://issues.apache.org/jira/browse/APEXCORE-743?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sandesh resolved APEXCORE-743.
--
   Resolution: Fixed
Fix Version/s: 3.7.0

> Killed container is shown as running
> 
>
> Key: APEXCORE-743
> URL: https://issues.apache.org/jira/browse/APEXCORE-743
> Project: Apache Apex Core
>  Issue Type: Bug
>Reporter: Sandesh
>Assignee: Sandesh
> Fix For: 3.7.0
>
>
> Here is the behavior
> 1. Container Heartbeat timeout happened
> 2. AppMaster sends the request to kill the container
> 3. Container is killed
> 4.  AppMaster state is not updated and no new container was allocated
> After analyzing the code here is the possible reason
> 1. Send the kill request to NM
> 2. Container killed by NM, but NM callback doesn't happen. RecoverContainer 
> is called in NM callback, which in this case is not called.
> 3. AppMaster state is not updated
> Possible fix.
> Have a timeout for NM callback, so that if NM doesn't respond that the 
> container is killed in time, call the RecoverContainer. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[Design discussion] - Kudu Input operator

2017-08-15 Thread Ananth G
Hello All,

The implementation for Apex Kudu Input Operator is ready for a pull request. 
Before raising the pull request, I would like to get any inputs regarding the 
design and incorporate any feedback before raising the pull request in the next 
couple of days for the following JIRA.

https://issues.apache.org/jira/browse/APEXMALHAR-2472 


The following are the main features that would be supported by the Input 
operator:

- The input operator would be used to scan all or some rows of a single kudu 
table.
- Each Kudu row is translated to a POJO for downstream operators. 
- The Input operator would accept an SQL expression ( described in detail 
below) that would be parsed to generate the equivalent scanner code for the 
Kudu Table. This is because Kudu Table API does not support an SQL expressions 
- The SQL expression would have additional options that would help in Apache 
Apex design patterns ( Ex: Sending a control tuple message after a query is 
successfully processed )
- The Input operator works on a continuous basis i.e. it would accept the next 
query once the current query is complete)
- The operator will work in a distributed fashion for the input query. This 
essentially means for a single input query, the scan work is distributed among 
all of the physical instances of the input operator.
- Kudu splits a table into chunks of data regions called Tablets. The tablets 
are replicated and partitioned  (range and hash partitions are supported ) in 
Kudu according to the Kudu Table definition. The operator allows partitioning 
of the Input Operator to be done in 2 ways. 
- Map many Kudu Tablets to one partition of the Apex Kudu Input operator
- One Kudu Tablet maps to one partition of the Apex Kudu Input operator
- The partitioning does not change on a per query basis. This is because of the 
complex use cases that would arise. For example, if the query is touching only 
a few rows before the next query is accepted, it would result in a lot of churn 
in terms of operator serialize/deserialze, YARN allocation requests etc. Also 
supporting per query partition planning leads to possibly very complex 
implementation and poor resource usage as all physical instances of the 
operator have to wait for its peers to complete its scan and wait for next 
checkpoint to get repartitioned.
- The partitioner splits the work load of a single query in a round robin 
fashion. After a query plan is generated , each scan token range is distributed 
equally among the physical operator instances.
- The operator allows for two modes of scanning for an application ( Cannot be 
changed on a per query basis ) 
- Consistent Order scanner - only one tablet scan thread is active at 
any given instance of time for a given query
- Random Order scanner - Many threads are active to scan Kudu tablets 
in parallel
- As can be seen, Consistent order scanner would be slower but would help in 
better “exactly once” implementations if the correct method is overridden in 
the operator.
- The operator introduces the DisruptorBlockingQueue for a low latency buffer 
management. LMAX disruptor library was considered and based on some other 
discussion threads on other Apache projects, settled on the ConversantMedia 
implementation of the Disruptor Blocking queue. This blocking queue is used 
when the kudu scanner thread wants to send the scanned row into the input 
operators main thread emitTuples() call.
- The operator allows for exactly once semantics if the user specifies the 
logic for reconciling a possible duplicate row in situations when the operator 
is resuming from a checkpoint. This is done by overriding a method that returns 
a boolean ( true to emit the tuple and false to suppress the tuple ) when the 
operating is working in the reconciling window phase. As can be seen, this 
reconciling phase is only active at the max for one window.
- The operator uses the FSWindowManager to manage metadata at the end of every 
window. From resumption at a checkpoint, the operator will still scan the Kudu 
tablets but simply not emit all rows that were already streamed downstream. 
Subsequently when the operator is in the reconciling window, the method 
described above is invoked to allow for duplicates filter. After this 
reconciling window, the operator works in the normal mode of operation.
- The following are the additional configurable aspects of the operator
- Max tuples per window
- Spin policy and the buffer size for the Disruptor Blocking Queue
- Mechanism to provide custom control tuples if required
- Setting the number of Physical operator instances via the API if 
required. 
- Setting the fault Tolerance. If fault tolerant , an alternative 
replica of the Kudu tablet is picked up for scanning if the initial tablet 
fails for whatever reason. However this slows down the scan throughput. Hence 
it is 

[jira] [Updated] (APEXMALHAR-2034) Avro File To POJO Module

2017-08-15 Thread Saumya Mohan (JIRA)

 [ 
https://issues.apache.org/jira/browse/APEXMALHAR-2034?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Saumya Mohan updated APEXMALHAR-2034:
-
Description: 
Issue:
Avro objects are not serialized by Kryo causing the Avro GenericRecord to not 
be available to downstream operators if users don't explicitly mark the stream 
locality at container_local or thread_local. 

Solution:
This JIRA is used to create a Module on top of AvroFileInputOperator and 
AvroToPojo operators such that downstream operators will access POJO instead of 
Avro GenericRecord.  It, therefore, removes the exposure of GenericRecord to 
downstream operators and instead exposes the created POJO to downstream 
operators.

In this Module, the stream between the two encapsulated operators 
(AvroFileInputOperator and AvroToPojo) is set to CONTAINER_LOCAL.

Along with this new module, existing avro support files are moved from contrib 
module to a new 'avro' module.






  was:
Issue:
Avro objects are not serialized by Kryo causing the Avro GenericRecord to not 
be available to downstream operators if users don't explicitly mark the stream 
locality at container_local or thread_local. 

Solution:
This JIRA is used to create a Module on top of AvroFileInputOperator and 
AvroToPojo operators such that downstream operators will access POJO instead of 
Avro GenericRecord.

In this Module, the stream between the two encapsulated operators 
(AvroFileInputOperator and AvroToPojo) is set to CONTAINER_LOCAL. 

This new module removes the exposure of GenericRecord to downstream operators 
and instead exposes the created POJO to downstream operators.





> Avro File To POJO Module
> 
>
> Key: APEXMALHAR-2034
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2034
> Project: Apache Apex Malhar
>  Issue Type: New Feature
>Reporter: devendra tagare
>Assignee: Saumya Mohan
>
> Issue:
> Avro objects are not serialized by Kryo causing the Avro GenericRecord to not 
> be available to downstream operators if users don't explicitly mark the 
> stream locality at container_local or thread_local. 
> Solution:
> This JIRA is used to create a Module on top of AvroFileInputOperator and 
> AvroToPojo operators such that downstream operators will access POJO instead 
> of Avro GenericRecord.  It, therefore, removes the exposure of GenericRecord 
> to downstream operators and instead exposes the created POJO to downstream 
> operators.
> In this Module, the stream between the two encapsulated operators 
> (AvroFileInputOperator and AvroToPojo) is set to CONTAINER_LOCAL.
> Along with this new module, existing avro support files are moved from 
> contrib module to a new 'avro' module.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (APEXMALHAR-2034) Avro File Input Operator

2017-08-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16127681#comment-16127681
 ] 

ASF GitHub Bot commented on APEXMALHAR-2034:


SaumyaMohan closed pull request #663: APEXMALHAR-2034 Create Avro Module to 
encapsulate Container File to Generic Record to POJO conversion
URL: https://github.com/apache/apex-malhar/pull/663
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Avro File Input Operator
> 
>
> Key: APEXMALHAR-2034
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2034
> Project: Apache Apex Malhar
>  Issue Type: New Feature
>Reporter: devendra tagare
>Assignee: Saumya Mohan
>
> This operator would extend the AbstractFileInputOperator to read Avro 
> Container files.
> Input would be an Avro Container File.
> Output would be a GenericRecord.
> There would be 2 additional optional ports,
> 1.FilePort - for completed files.
> 2.FailedRecordsPort - this will capture fileName,Offset & error message as a 
> string for handling by a downstream operator.
> This operator can be used in isolation or with the AvroToPojo operator to 
> read an Avro record and convert it to a POJO.
> -
> This JIRA is used to create a Module on top of AvroFileInputOperator and 
> AvroToPojo operator. The stream between the two operators will be set to 
> CONTAINER_LOCAL which is required as Avro objects are not serialized by Kryo. 
> This will help users to directly use the module which has the locality set to 
> CONTAINER_LOCAL.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (APEXMALHAR-2034) Avro File Input Operator

2017-08-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16127684#comment-16127684
 ] 

ASF GitHub Bot commented on APEXMALHAR-2034:


SaumyaMohan opened a new pull request #665: APEXMALHAR-2034 Adding new Avro 
Module to encapsulate Container File to GenericRecord to POJO transformation
URL: https://github.com/apache/apex-malhar/pull/665
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Avro File Input Operator
> 
>
> Key: APEXMALHAR-2034
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2034
> Project: Apache Apex Malhar
>  Issue Type: New Feature
>Reporter: devendra tagare
>Assignee: Saumya Mohan
>
> This operator would extend the AbstractFileInputOperator to read Avro 
> Container files.
> Input would be an Avro Container File.
> Output would be a GenericRecord.
> There would be 2 additional optional ports,
> 1.FilePort - for completed files.
> 2.FailedRecordsPort - this will capture fileName,Offset & error message as a 
> string for handling by a downstream operator.
> This operator can be used in isolation or with the AvroToPojo operator to 
> read an Avro record and convert it to a POJO.
> -
> This JIRA is used to create a Module on top of AvroFileInputOperator and 
> AvroToPojo operator. The stream between the two operators will be set to 
> CONTAINER_LOCAL which is required as Avro objects are not serialized by Kryo. 
> This will help users to directly use the module which has the locality set to 
> CONTAINER_LOCAL.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (APEXMALHAR-2472) Implement Kudu Input Operator

2017-08-15 Thread Ananth (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2472?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16128171#comment-16128171
 ] 

Ananth commented on APEXMALHAR-2472:


The following are the main features that would be supported by the Input 
operator:

- The input operator would be used to scan all or some rows of a single kudu 
table.
- Each Kudu row is translated to a POJO for downstream operators. 
- The Input operator would accept an SQL expression ( described in detail 
below) that would be parsed to generate the equivalent scanner code for the 
Kudu Table. This is because Kudu Table API does not support an SQL expressions 
- The SQL expression would have additional options that would help in Apache 
Apex design patterns ( Ex: Sending a control tuple message after a query is 
successfully processed )
- The Input operator works on a continuous basis i.e. it would accept the next 
query once the current query is complete)
- The operator will work in a distributed fashion for the input query. This 
essentially means for a single input query, the scan work is distributed among 
all of the physical instances of the input operator.
- Kudu splits a table into chunks of data regions called Tablets. The tablets 
are replicated and partitioned  (range and hash partitions are supported ) in 
Kudu according to the Kudu Table definition. The operator allows partitioning 
of the Input Operator to be done in 2 ways. 
- Map many Kudu Tablets to one partition of the Apex Kudu Input operator
- One Kudu Tablet maps to one partition of the Apex Kudu Input operator
- The partitioning does not change on a per query basis. This is because of the 
complex use cases that would arise. For example, if the query is touching only 
a few rows before the next query is accepted, it would result in a lot of churn 
in terms of operator serialize/deserialze, YARN allocation requests etc. Also 
supporting per query partition planning leads to possibly very complex 
implementation and poor resource usage as all physical instances of the 
operator have to wait for its peers to complete its scan and wait for next 
checkpoint to get repartitioned.
- The partitioner splits the work load of a single query in a round robin 
fashion. After a query plan is generated , each scan token range is distributed 
equally among the physical operator instances.
- The operator allows for two modes of scanning for an application ( Cannot be 
changed on a per query basis ) 
- Consistent Order scanner - only one tablet scan thread is active at 
any given instance of time for a given query
- Random Order scanner - Many threads are active to scan Kudu tablets 
in parallel
- As can be seen, Consistent order scanner would be slower but would help in 
better “exactly once” implementations if the correct method is overridden in 
the operator.
- The operator introduces the DisruptorBlockingQueue for a low latency buffer 
management. LMAX disruptor library was considered and based on some other 
discussion threads on other Apache projects, settled on the ConversantMedia 
implementation of the Disruptor Blocking queue. This blocking queue is used 
when the kudu scanner thread wants to send the scanned row into the input 
operators main thread emitTuples() call.
- The operator allows for exactly once semantics if the user specifies the 
logic for reconciling a possible duplicate row in situations when the operator 
is resuming from a checkpoint. This is done by overriding a method that returns 
a boolean ( true to emit the tuple and false to suppress the tuple ) when the 
operating is working in the reconciling window phase. As can be seen, this 
reconciling phase is only active at the max for one window.
- The operator uses the FSWindowManager to manage metadata at the end of every 
window. From resumption at a checkpoint, the operator will still scan the Kudu 
tablets but simply not emit all rows that were already streamed downstream. 
Subsequently when the operator is in the reconciling window, the method 
described above is invoked to allow for duplicates filter. After this 
reconciling window, the operator works in the normal mode of operation.
- The following are the additional configurable aspects of the operator
- Max tuples per window
- Spin policy and the buffer size for the Disruptor Blocking Queue
- Mechanism to provide custom control tuples if required
- Setting the number of Physical operator instances via the API if 
required. 
- Setting the fault Tolerance. If fault tolerant , an alternative 
replica of the Kudu tablet is picked up for scanning if the initial tablet 
fails for whatever reason. However this slows down the scan throughput. Hence 
it is configurable by the end user.


Some notes regarding the SQL expression for the operator:

- The operator uses ANTLR4 to parse the SQL expression.
- The parser is