[jira] [Commented] (APEXMALHAR-2169) KafkaInputOperator: Dynamic partition is not working in case of ONE_TO_MANY partition strategy

2016-08-10 Thread Chaitanya (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15414802#comment-15414802
 ] 

Chaitanya commented on APEXMALHAR-2169:
---

Based on comments, Changing the title of this JIRA to "Remove the stuff related 
to Partition Based on throughput from Kafka Input Operator"

> KafkaInputOperator: Dynamic partition is not working in case of ONE_TO_MANY 
> partition strategy
> --
>
> Key: APEXMALHAR-2169
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2169
> Project: Apache Apex Malhar
>  Issue Type: Bug
>Reporter: Chaitanya
>Assignee: Chaitanya
>
> Dynamic Partition is not working in case of ONE_TO_MANY partition strategy.
> Affected Operator: AbstractKafkaInputOperator (0.8 version)
> Steps to reproduce: 
> (1) Created a topic with 3 partitions
> (2) Created an application as  KAFKA -> Console with below configuration:
>strategy : one_to_many
>initialPartitionCount: 2
> (3) Launched the above application.
> (4) After some time, re-partition the topic to 5
> Observations:
> (1) Operator re-partitioning is not happened.
> (2) Operator is not emitting the messages.
> (3) Following warning messages in log:
> INFO com.datatorrent.contrib.kafka.AbstractKafkaInputOperator: [ONE_TO_MANY]: 
> Repartition the operator(s) under 9223372036854775807 msgs/s and 
> 9223372036854775807 bytes/s hard limit
> WARN com.datatorrent.stram.plan.physical.PhysicalPlan: Empty partition list 
> after repartition: OperatorMeta{name=Input, 
> operator=com.datatorrent.contrib.kafka.KafkaSinglePortByteArrayInputOperator@1b822fcc,
>  attributes={Attribute{defaultValue=1024, 
> name=com.datatorrent.api.Context.OperatorContext.MEMORY_MB, 
> codec=com.datatorrent.api.StringCodec$Integer2String@4682eba5}=1024}}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (APEXMALHAR-2169) KafkaInputOperator: Dynamic partition is not working in case of ONE_TO_MANY partition strategy

2016-08-08 Thread Siyuan Hua (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15411388#comment-15411388
 ] 

Siyuan Hua commented on APEXMALHAR-2169:


[~chaithu]  
Then I think the problem is in softConstraint and hardConstraint code, it 
should never return true because default limit is Long.MAX_VALUE.  

There is something in backlog that I didn't track in Jira(my bad). But since 
you have issue here, can you please do some refactor here. 
We want to actually simplify the operator code instead of making it more and 
more complicate. And kafka input operator is there for awhile and I don't see 
any requirement/asking for dynamic partition based on throughput.
Can we take away the hardConstraint and softConstraint condition check and make 
the 2 upperbound property deprecated.  So dynamic partition by default should 
only happen when kafka partition changes. 
And for ONE_TO_MANY partition strategy, the number of operator partitions 
should stay unchanged for the whole application with the specified 
initialPartitionCount. I think there is still bug there that if new kafka 
partition is added, we always start a new partition. That is not correct.

And you can create another ticket to move all repartition based on throughput 
to a separate Partitioner so the operator code would be simple and easy to 
understand/debug



> KafkaInputOperator: Dynamic partition is not working in case of ONE_TO_MANY 
> partition strategy
> --
>
> Key: APEXMALHAR-2169
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2169
> Project: Apache Apex Malhar
>  Issue Type: Bug
>Reporter: Chaitanya
>Assignee: Chaitanya
>
> Dynamic Partition is not working in case of ONE_TO_MANY partition strategy.
> Affected Operator: AbstractKafkaInputOperator (0.8 version)
> Steps to reproduce: 
> (1) Created a topic with 3 partitions
> (2) Created an application as  KAFKA -> Console with below configuration:
>strategy : one_to_many
>initialPartitionCount: 2
> (3) Launched the above application.
> (4) After some time, re-partition the topic to 5
> Observations:
> (1) Operator re-partitioning is not happened.
> (2) Operator is not emitting the messages.
> (3) Following warning messages in log:
> INFO com.datatorrent.contrib.kafka.AbstractKafkaInputOperator: [ONE_TO_MANY]: 
> Repartition the operator(s) under 9223372036854775807 msgs/s and 
> 9223372036854775807 bytes/s hard limit
> WARN com.datatorrent.stram.plan.physical.PhysicalPlan: Empty partition list 
> after repartition: OperatorMeta{name=Input, 
> operator=com.datatorrent.contrib.kafka.KafkaSinglePortByteArrayInputOperator@1b822fcc,
>  attributes={Attribute{defaultValue=1024, 
> name=com.datatorrent.api.Context.OperatorContext.MEMORY_MB, 
> codec=com.datatorrent.api.StringCodec$Integer2String@4682eba5}=1024}}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (APEXMALHAR-2169) KafkaInputOperator: Dynamic partition is not working in case of ONE_TO_MANY partition strategy

2016-08-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15411341#comment-15411341
 ] 

ASF GitHub Bot commented on APEXMALHAR-2169:


Github user chaithu14 commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/351#discussion_r73824988
  
--- Diff: 
contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java
 ---
@@ -188,6 +188,8 @@
   @Min(1)
   private int initialPartitionCount = 1;
 
+  private boolean isPartitionBasedOnLoad = false;
--- End diff --

Is this can be achieved as follows. Please correct it, if I am wrong.
if (msgRateUpperBound != Long.MAX_VALUE) {
/// Dynamic partition based on load is enabled
} else {
/// Dynamic partition based on load is disabled
}



> KafkaInputOperator: Dynamic partition is not working in case of ONE_TO_MANY 
> partition strategy
> --
>
> Key: APEXMALHAR-2169
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2169
> Project: Apache Apex Malhar
>  Issue Type: Bug
>Reporter: Chaitanya
>Assignee: Chaitanya
>
> Dynamic Partition is not working in case of ONE_TO_MANY partition strategy.
> Affected Operator: AbstractKafkaInputOperator (0.8 version)
> Steps to reproduce: 
> (1) Created a topic with 3 partitions
> (2) Created an application as  KAFKA -> Console with below configuration:
>strategy : one_to_many
>initialPartitionCount: 2
> (3) Launched the above application.
> (4) After some time, re-partition the topic to 5
> Observations:
> (1) Operator re-partitioning is not happened.
> (2) Operator is not emitting the messages.
> (3) Following warning messages in log:
> INFO com.datatorrent.contrib.kafka.AbstractKafkaInputOperator: [ONE_TO_MANY]: 
> Repartition the operator(s) under 9223372036854775807 msgs/s and 
> 9223372036854775807 bytes/s hard limit
> WARN com.datatorrent.stram.plan.physical.PhysicalPlan: Empty partition list 
> after repartition: OperatorMeta{name=Input, 
> operator=com.datatorrent.contrib.kafka.KafkaSinglePortByteArrayInputOperator@1b822fcc,
>  attributes={Attribute{defaultValue=1024, 
> name=com.datatorrent.api.Context.OperatorContext.MEMORY_MB, 
> codec=com.datatorrent.api.StringCodec$Integer2String@4682eba5}=1024}}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (APEXMALHAR-2169) KafkaInputOperator: Dynamic partition is not working in case of ONE_TO_MANY partition strategy

2016-08-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15411342#comment-15411342
 ] 

ASF GitHub Bot commented on APEXMALHAR-2169:


Github user chaithu14 commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/351#discussion_r73824992
  
--- Diff: 
contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java
 ---
@@ -834,17 +840,16 @@ private boolean isPartitionRequired(int opid, 
List KafkaInputOperator: Dynamic partition is not working in case of ONE_TO_MANY 
> partition strategy
> --
>
> Key: APEXMALHAR-2169
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2169
> Project: Apache Apex Malhar
>  Issue Type: Bug
>Reporter: Chaitanya
>Assignee: Chaitanya
>
> Dynamic Partition is not working in case of ONE_TO_MANY partition strategy.
> Affected Operator: AbstractKafkaInputOperator (0.8 version)
> Steps to reproduce: 
> (1) Created a topic with 3 partitions
> (2) Created an application as  KAFKA -> Console with below configuration:
>strategy : one_to_many
>initialPartitionCount: 2
> (3) Launched the above application.
> (4) After some time, re-partition the topic to 5
> Observations:
> (1) Operator re-partitioning is not happened.
> (2) Operator is not emitting the messages.
> (3) Following warning messages in log:
> INFO com.datatorrent.contrib.kafka.AbstractKafkaInputOperator: [ONE_TO_MANY]: 
> Repartition the operator(s) under 9223372036854775807 msgs/s and 
> 9223372036854775807 bytes/s hard limit
> WARN com.datatorrent.stram.plan.physical.PhysicalPlan: Empty partition list 
> after repartition: OperatorMeta{name=Input, 
> operator=com.datatorrent.contrib.kafka.KafkaSinglePortByteArrayInputOperator@1b822fcc,
>  attributes={Attribute{defaultValue=1024, 
> name=com.datatorrent.api.Context.OperatorContext.MEMORY_MB, 
> codec=com.datatorrent.api.StringCodec$Integer2String@4682eba5}=1024}}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (APEXMALHAR-2169) KafkaInputOperator: Dynamic partition is not working in case of ONE_TO_MANY partition strategy

2016-08-07 Thread Siyuan Hua (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1546#comment-1546
 ] 

Siyuan Hua commented on APEXMALHAR-2169:


[~chaithu]
I'm still not convinced. In your setup both msgRateUpperBound and 
byteRateUpperBound are unlimited. The isPartitionRequired method should not 
return true based on throughput. If isPartitionRequired return true because of 
the new kafka partition, then it will go to line 599 in definePartition method. 
 

> KafkaInputOperator: Dynamic partition is not working in case of ONE_TO_MANY 
> partition strategy
> --
>
> Key: APEXMALHAR-2169
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2169
> Project: Apache Apex Malhar
>  Issue Type: Bug
>Reporter: Chaitanya
>Assignee: Chaitanya
>
> Dynamic Partition is not working in case of ONE_TO_MANY partition strategy.
> Affected Operator: AbstractKafkaInputOperator (0.8 version)
> Steps to reproduce: 
> (1) Created a topic with 3 partitions
> (2) Created an application as  KAFKA -> Console with below configuration:
>strategy : one_to_many
>initialPartitionCount: 2
> (3) Launched the above application.
> (4) After some time, re-partition the topic to 5
> Observations:
> (1) Operator re-partitioning is not happened.
> (2) Operator is not emitting the messages.
> (3) Following warning messages in log:
> INFO com.datatorrent.contrib.kafka.AbstractKafkaInputOperator: [ONE_TO_MANY]: 
> Repartition the operator(s) under 9223372036854775807 msgs/s and 
> 9223372036854775807 bytes/s hard limit
> WARN com.datatorrent.stram.plan.physical.PhysicalPlan: Empty partition list 
> after repartition: OperatorMeta{name=Input, 
> operator=com.datatorrent.contrib.kafka.KafkaSinglePortByteArrayInputOperator@1b822fcc,
>  attributes={Attribute{defaultValue=1024, 
> name=com.datatorrent.api.Context.OperatorContext.MEMORY_MB, 
> codec=com.datatorrent.api.StringCodec$Integer2String@4682eba5}=1024}}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (APEXMALHAR-2169) KafkaInputOperator: Dynamic partition is not working in case of ONE_TO_MANY partition strategy

2016-08-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1540#comment-1540
 ] 

ASF GitHub Bot commented on APEXMALHAR-2169:


Github user siyuanh commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/351#discussion_r73811202
  
--- Diff: 
contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java
 ---
@@ -834,17 +840,16 @@ private boolean isPartitionRequired(int opid, 
List KafkaInputOperator: Dynamic partition is not working in case of ONE_TO_MANY 
> partition strategy
> --
>
> Key: APEXMALHAR-2169
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2169
> Project: Apache Apex Malhar
>  Issue Type: Bug
>Reporter: Chaitanya
>Assignee: Chaitanya
>
> Dynamic Partition is not working in case of ONE_TO_MANY partition strategy.
> Affected Operator: AbstractKafkaInputOperator (0.8 version)
> Steps to reproduce: 
> (1) Created a topic with 3 partitions
> (2) Created an application as  KAFKA -> Console with below configuration:
>strategy : one_to_many
>initialPartitionCount: 2
> (3) Launched the above application.
> (4) After some time, re-partition the topic to 5
> Observations:
> (1) Operator re-partitioning is not happened.
> (2) Operator is not emitting the messages.
> (3) Following warning messages in log:
> INFO com.datatorrent.contrib.kafka.AbstractKafkaInputOperator: [ONE_TO_MANY]: 
> Repartition the operator(s) under 9223372036854775807 msgs/s and 
> 9223372036854775807 bytes/s hard limit
> WARN com.datatorrent.stram.plan.physical.PhysicalPlan: Empty partition list 
> after repartition: OperatorMeta{name=Input, 
> operator=com.datatorrent.contrib.kafka.KafkaSinglePortByteArrayInputOperator@1b822fcc,
>  attributes={Attribute{defaultValue=1024, 
> name=com.datatorrent.api.Context.OperatorContext.MEMORY_MB, 
> codec=com.datatorrent.api.StringCodec$Integer2String@4682eba5}=1024}}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (APEXMALHAR-2169) KafkaInputOperator: Dynamic partition is not working in case of ONE_TO_MANY partition strategy

2016-08-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15408973#comment-15408973
 ] 

ASF GitHub Bot commented on APEXMALHAR-2169:


Github user tushargosavi commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/351#discussion_r73647526
  
--- Diff: 
contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java
 ---
@@ -188,6 +188,8 @@
   @Min(1)
   private int initialPartitionCount = 1;
 
+  private boolean isPartitionBasedOnLoad = false;
--- End diff --

Add a comment about this flag.


> KafkaInputOperator: Dynamic partition is not working in case of ONE_TO_MANY 
> partition strategy
> --
>
> Key: APEXMALHAR-2169
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2169
> Project: Apache Apex Malhar
>  Issue Type: Bug
>Reporter: Chaitanya
>Assignee: Chaitanya
>
> Dynamic Partition is not working in case of ONE_TO_MANY partition strategy.
> Affected Operator: AbstractKafkaInputOperator (0.8 version)
> Steps to reproduce: 
> (1) Created a topic with 3 partitions
> (2) Created an application as  KAFKA -> Console with below configuration:
>strategy : one_to_many
>initialPartitionCount: 2
> (3) Launched the above application.
> (4) After some time, re-partition the topic to 5
> Observations:
> (1) Operator re-partitioning is not happened.
> (2) Operator is not emitting the messages.
> (3) Following warning messages in log:
> INFO com.datatorrent.contrib.kafka.AbstractKafkaInputOperator: [ONE_TO_MANY]: 
> Repartition the operator(s) under 9223372036854775807 msgs/s and 
> 9223372036854775807 bytes/s hard limit
> WARN com.datatorrent.stram.plan.physical.PhysicalPlan: Empty partition list 
> after repartition: OperatorMeta{name=Input, 
> operator=com.datatorrent.contrib.kafka.KafkaSinglePortByteArrayInputOperator@1b822fcc,
>  attributes={Attribute{defaultValue=1024, 
> name=com.datatorrent.api.Context.OperatorContext.MEMORY_MB, 
> codec=com.datatorrent.api.StringCodec$Integer2String@4682eba5}=1024}}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (APEXMALHAR-2169) KafkaInputOperator: Dynamic partition is not working in case of ONE_TO_MANY partition strategy

2016-08-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15408949#comment-15408949
 ] 

ASF GitHub Bot commented on APEXMALHAR-2169:


Github user tushargosavi commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/351#discussion_r73646280
  
--- Diff: 
contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java
 ---
@@ -628,6 +630,11 @@ else if (newWaitingPartition.size() != 0) {
   }
 }
 
+if (kPIntakeRate.size() == 0) {
--- End diff --

add a comment for this condition.


> KafkaInputOperator: Dynamic partition is not working in case of ONE_TO_MANY 
> partition strategy
> --
>
> Key: APEXMALHAR-2169
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2169
> Project: Apache Apex Malhar
>  Issue Type: Bug
>Reporter: Chaitanya
>Assignee: Chaitanya
>
> Dynamic Partition is not working in case of ONE_TO_MANY partition strategy.
> Affected Operator: AbstractKafkaInputOperator (0.8 version)
> Steps to reproduce: 
> (1) Created a topic with 3 partitions
> (2) Created an application as  KAFKA -> Console with below configuration:
>strategy : one_to_many
>initialPartitionCount: 2
> (3) Launched the above application.
> (4) After some time, re-partition the topic to 5
> Observations:
> (1) Operator re-partitioning is not happened.
> (2) Operator is not emitting the messages.
> (3) Following warning messages in log:
> INFO com.datatorrent.contrib.kafka.AbstractKafkaInputOperator: [ONE_TO_MANY]: 
> Repartition the operator(s) under 9223372036854775807 msgs/s and 
> 9223372036854775807 bytes/s hard limit
> WARN com.datatorrent.stram.plan.physical.PhysicalPlan: Empty partition list 
> after repartition: OperatorMeta{name=Input, 
> operator=com.datatorrent.contrib.kafka.KafkaSinglePortByteArrayInputOperator@1b822fcc,
>  attributes={Attribute{defaultValue=1024, 
> name=com.datatorrent.api.Context.OperatorContext.MEMORY_MB, 
> codec=com.datatorrent.api.StringCodec$Integer2String@4682eba5}=1024}}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (APEXMALHAR-2169) KafkaInputOperator: Dynamic partition is not working in case of ONE_TO_MANY partition strategy

2016-07-27 Thread Chaitanya (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15396882#comment-15396882
 ] 

Chaitanya commented on APEXMALHAR-2169:
---

Siyuan,

   I am looking for dynamic partition based on metadata change. Before metadata 
change, operator partitioned happened based on throughput. Issue is in 
partition based on throughput. 

Issue is at line 625, latest stats may not contain counters. If the latest 
stats does not contain counters then definePartitions() return empty partition 
list. (Refer the warning messages under JIRA description).





> KafkaInputOperator: Dynamic partition is not working in case of ONE_TO_MANY 
> partition strategy
> --
>
> Key: APEXMALHAR-2169
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2169
> Project: Apache Apex Malhar
>  Issue Type: Bug
>Reporter: Chaitanya
>Assignee: Chaitanya
>
> Dynamic Partition is not working in case of ONE_TO_MANY partition strategy.
> Affected Operator: AbstractKafkaInputOperator (0.8 version)
> Steps to reproduce: 
> (1) Created a topic with 3 partitions
> (2) Created an application as  KAFKA -> Console with below configuration:
>strategy : one_to_many
>initialPartitionCount: 2
> (3) Launched the above application.
> (4) After some time, re-partition the topic to 5
> Observations:
> (1) Operator re-partitioning is not happened.
> (2) Operator is not emitting the messages.
> (3) Following warning messages in log:
> INFO com.datatorrent.contrib.kafka.AbstractKafkaInputOperator: [ONE_TO_MANY]: 
> Repartition the operator(s) under 9223372036854775807 msgs/s and 
> 9223372036854775807 bytes/s hard limit
> WARN com.datatorrent.stram.plan.physical.PhysicalPlan: Empty partition list 
> after repartition: OperatorMeta{name=Input, 
> operator=com.datatorrent.contrib.kafka.KafkaSinglePortByteArrayInputOperator@1b822fcc,
>  attributes={Attribute{defaultValue=1024, 
> name=com.datatorrent.api.Context.OperatorContext.MEMORY_MB, 
> codec=com.datatorrent.api.StringCodec$Integer2String@4682eba5}=1024}}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (APEXMALHAR-2169) KafkaInputOperator: Dynamic partition is not working in case of ONE_TO_MANY partition strategy

2016-07-27 Thread Siyuan Hua (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15396353#comment-15396353
 ] 

Siyuan Hua commented on APEXMALHAR-2169:


[~chaithu] 

Can you elaborate more on this ticket? Is it dynamic partition based on 
throughput or metadata change or both?
If you want dynamic partition happen because more kafka partitions are added, I 
think the code should jump into line 599 where newWaitingPartition should be 
non-empty, right?



> KafkaInputOperator: Dynamic partition is not working in case of ONE_TO_MANY 
> partition strategy
> --
>
> Key: APEXMALHAR-2169
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2169
> Project: Apache Apex Malhar
>  Issue Type: Bug
>Reporter: Chaitanya
>Assignee: Chaitanya
>
> Dynamic Partition is not working in case of ONE_TO_MANY partition strategy.
> Affected Operator: AbstractKafkaInputOperator (0.8 version)
> Steps to reproduce: 
> (1) Created a topic with 3 partitions
> (2) Created an application as  KAFKA -> Console with below configuration:
>strategy : one_to_many
>initialPartitionCount: 2
> (3) Launched the above application.
> (4) After some time, re-partition the topic to 5
> Observations:
> (1) Operator re-partitioning is not happened.
> (2) Operator is not emitting the messages.
> (3) Following warning messages in log:
> INFO com.datatorrent.contrib.kafka.AbstractKafkaInputOperator: [ONE_TO_MANY]: 
> Repartition the operator(s) under 9223372036854775807 msgs/s and 
> 9223372036854775807 bytes/s hard limit
> WARN com.datatorrent.stram.plan.physical.PhysicalPlan: Empty partition list 
> after repartition: OperatorMeta{name=Input, 
> operator=com.datatorrent.contrib.kafka.KafkaSinglePortByteArrayInputOperator@1b822fcc,
>  attributes={Attribute{defaultValue=1024, 
> name=com.datatorrent.api.Context.OperatorContext.MEMORY_MB, 
> codec=com.datatorrent.api.StringCodec$Integer2String@4682eba5}=1024}}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (APEXMALHAR-2169) KafkaInputOperator: Dynamic partition is not working in case of ONE_TO_MANY partition strategy

2016-07-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15395513#comment-15395513
 ] 

ASF GitHub Bot commented on APEXMALHAR-2169:


GitHub user chaithu14 opened a pull request:

https://github.com/apache/apex-malhar/pull/351

APEXMALHAR-2169 Fixed the issue of dynamic partitioning in case of 
ONE_TO_MANY partition strategy



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/chaithu14/incubator-apex-malhar 
APEXMALHAR-2169-Kafka-DP-many

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/apex-malhar/pull/351.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #351






> KafkaInputOperator: Dynamic partition is not working in case of ONE_TO_MANY 
> partition strategy
> --
>
> Key: APEXMALHAR-2169
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2169
> Project: Apache Apex Malhar
>  Issue Type: Bug
>Reporter: Chaitanya
>Assignee: Chaitanya
>
> Dynamic Partition is not working in case of ONE_TO_MANY partition strategy.
> Affected Operator: AbstractKafkaInputOperator (0.8 version)
> Steps to reproduce: 
> (1) Created a topic with 3 partitions
> (2) Created an application as  KAFKA -> Console with below configuration:
>strategy : one_to_many
>initialPartitionCount: 2
> (3) Launched the above application.
> (4) After some time, re-partition the topic to 5
> Observations:
> (1) Operator re-partitioning is not happened.
> (2) Operator is not emitting the messages.
> (3) Following warning messages in log:
> INFO com.datatorrent.contrib.kafka.AbstractKafkaInputOperator: [ONE_TO_MANY]: 
> Repartition the operator(s) under 9223372036854775807 msgs/s and 
> 9223372036854775807 bytes/s hard limit
> WARN com.datatorrent.stram.plan.physical.PhysicalPlan: Empty partition list 
> after repartition: OperatorMeta{name=Input, 
> operator=com.datatorrent.contrib.kafka.KafkaSinglePortByteArrayInputOperator@1b822fcc,
>  attributes={Attribute{defaultValue=1024, 
> name=com.datatorrent.api.Context.OperatorContext.MEMORY_MB, 
> codec=com.datatorrent.api.StringCodec$Integer2String@4682eba5}=1024}}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)