This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new b684151  Fix panic when scale down partitions (#601)
b684151 is described below

commit b6841513379ea9ca503d1e350c5f93198fc2b03f
Author: xiaolong ran <r...@apache.org>
AuthorDate: Thu Aug 26 16:51:20 2021 +0800

    Fix panic when scale down partitions (#601)
    
    Signed-off-by: xiaolongran <xiaolong...@tencent.com>
    
    
    
    ### Motivation
    
    When the program is running, if the business is forced to delete certain 
sub partitions, the following error message will be caused, that is, 
old_partitions is greater than new_partitions, it looks like it is doing scale 
down partitions, and the current code logic only deals with the scenario of 
scale up partitions , So if the user is forced to delete some sub partitions, 
the following error will be encountered:
    
    ```
    level=info msg="[Changed number of partitions in topic]" new_partitions=1 
old_partitions=20 topic="persistent://pulsar-xxxxxxx/xxxx/gxxxxxxxx"
    ```
    
    ```
    panic: runtime error: index out of range [1] with length 1
    
    goroutine 166288 [running]:
    
github.com/apache/pulsar-client-go/pulsar.(*producer).internalCreatePartitionsProducers(0xc0070aa6e0,
 0x0, 0x0)
            github.com/apache/pulsar-client-go/pulsar/producer_impl.go:194 
+0x785
    
github.com/apache/pulsar-client-go/pulsar.(*producer).runBackgroundPartitionDiscovery.func1(0xc004167cd0,
 0xc00559f5c0, 0xc006af6dc0, 0xc0070aa6e0)
           github.com/apache/pulsar-client-go/pulsar/producer_impl.go:152 +0xce
    created by 
github.com/apache/pulsar-client-go/pulsar.(*producer).runBackgroundPartitionDiscovery
           github.com/apache/pulsar-client-go/pulsar/producer_impl.go:144 +0xcd
    ```
    
    ### Modifications
    
    Increase the processing logic of scale down partition
---
 pulsar/consumer_impl.go | 30 ++++++++++++++++++------------
 pulsar/producer_impl.go | 27 +++++++++++++++------------
 2 files changed, 33 insertions(+), 24 deletions(-)

diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index ec7ad7d..78ae0d7 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -28,7 +28,6 @@ import (
        "github.com/apache/pulsar-client-go/pulsar/internal"
        pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
        "github.com/apache/pulsar-client-go/pulsar/log"
-       "github.com/pkg/errors"
 )
 
 const defaultNackRedeliveryDelay = 1 * time.Minute
@@ -258,22 +257,16 @@ func (c *consumer) internalTopicSubscribeToPartitions() 
error {
 
        c.Lock()
        defer c.Unlock()
+
        oldConsumers := c.consumers
+       oldNumPartitions = len(oldConsumers)
 
        if oldConsumers != nil {
-               oldNumPartitions = len(oldConsumers)
                if oldNumPartitions == newNumPartitions {
                        c.log.Debug("Number of partitions in topic has not 
changed")
                        return nil
                }
 
-               if oldNumPartitions > newNumPartitions {
-                       c.log.WithField("old_partitions", oldNumPartitions).
-                               WithField("new_partitions", newNumPartitions).
-                               Error("Does not support scaling down operations 
on topic partitions")
-                       return errors.New("Does not support scaling down 
operations on topic partitions")
-               }
-
                c.log.WithField("old_partitions", oldNumPartitions).
                        WithField("new_partitions", newNumPartitions).
                        Info("Changed number of partitions in topic")
@@ -281,7 +274,9 @@ func (c *consumer) internalTopicSubscribeToPartitions() 
error {
 
        c.consumers = make([]*partitionConsumer, newNumPartitions)
 
-       if oldConsumers != nil {
+       // When for some reason (eg: forced deletion of sub partition) causes 
oldNumPartitions> newNumPartitions,
+       // we need to rebuild the cache of new consumers, otherwise the array 
will be out of bounds.
+       if oldConsumers != nil && oldNumPartitions < newNumPartitions {
                // Copy over the existing consumer instances
                for i := 0; i < oldNumPartitions; i++ {
                        c.consumers[i] = oldConsumers[i]
@@ -297,12 +292,19 @@ func (c *consumer) internalTopicSubscribeToPartitions() 
error {
        receiverQueueSize := c.options.ReceiverQueueSize
        metadata := c.options.Properties
 
+       startPartition := oldNumPartitions
        partitionsToAdd := newNumPartitions - oldNumPartitions
+
+       if partitionsToAdd < 0 {
+               partitionsToAdd = newNumPartitions
+               startPartition = 0
+       }
+
        var wg sync.WaitGroup
        ch := make(chan ConsumerError, partitionsToAdd)
        wg.Add(partitionsToAdd)
 
-       for partitionIdx := oldNumPartitions; partitionIdx < newNumPartitions; 
partitionIdx++ {
+       for partitionIdx := startPartition; partitionIdx < newNumPartitions; 
partitionIdx++ {
                partitionTopic := partitions[partitionIdx]
 
                go func(idx int, pt string) {
@@ -366,7 +368,11 @@ func (c *consumer) internalTopicSubscribeToPartitions() 
error {
                return err
        }
 
-       c.metrics.ConsumersPartitions.Add(float64(partitionsToAdd))
+       if newNumPartitions < oldNumPartitions {
+               c.metrics.ConsumersPartitions.Set(float64(newNumPartitions))
+       } else {
+               c.metrics.ConsumersPartitions.Add(float64(partitionsToAdd))
+       }
        return nil
 }
 
diff --git a/pulsar/producer_impl.go b/pulsar/producer_impl.go
index adf9b14..20e8d3d 100644
--- a/pulsar/producer_impl.go
+++ b/pulsar/producer_impl.go
@@ -26,7 +26,6 @@ import (
 
        "github.com/apache/pulsar-client-go/pulsar/internal"
        "github.com/apache/pulsar-client-go/pulsar/log"
-       "github.com/pkg/errors"
 )
 
 const (
@@ -175,21 +174,14 @@ func (p *producer) internalCreatePartitionsProducers() 
error {
        defer p.Unlock()
 
        oldProducers := p.producers
+       oldNumPartitions = len(oldProducers)
 
        if oldProducers != nil {
-               oldNumPartitions = len(oldProducers)
                if oldNumPartitions == newNumPartitions {
                        p.log.Debug("Number of partitions in topic has not 
changed")
                        return nil
                }
 
-               if oldNumPartitions > newNumPartitions {
-                       p.log.WithField("old_partitions", oldNumPartitions).
-                               WithField("new_partitions", newNumPartitions).
-                               Error("Does not support scaling down operations 
on topic partitions")
-                       return errors.New("Does not support scaling down 
operations on topic partitions")
-               }
-
                p.log.WithField("old_partitions", oldNumPartitions).
                        WithField("new_partitions", newNumPartitions).
                        Info("Changed number of partitions in topic")
@@ -198,7 +190,9 @@ func (p *producer) internalCreatePartitionsProducers() 
error {
 
        p.producers = make([]Producer, newNumPartitions)
 
-       if oldProducers != nil {
+       // When for some reason (eg: forced deletion of sub partition) causes 
oldNumPartitions> newNumPartitions,
+       // we need to rebuild the cache of new producers, otherwise the array 
will be out of bounds.
+       if oldProducers != nil && oldNumPartitions < newNumPartitions {
                // Copy over the existing consumer instances
                for i := 0; i < oldNumPartitions; i++ {
                        p.producers[i] = oldProducers[i]
@@ -211,10 +205,15 @@ func (p *producer) internalCreatePartitionsProducers() 
error {
                err       error
        }
 
+       startPartition := oldNumPartitions
        partitionsToAdd := newNumPartitions - oldNumPartitions
+       if partitionsToAdd < 0 {
+               partitionsToAdd = newNumPartitions
+               startPartition = 0
+       }
        c := make(chan ProducerError, partitionsToAdd)
 
-       for partitionIdx := oldNumPartitions; partitionIdx < newNumPartitions; 
partitionIdx++ {
+       for partitionIdx := startPartition; partitionIdx < newNumPartitions; 
partitionIdx++ {
                partition := partitions[partitionIdx]
 
                go func(partitionIdx int, partition string) {
@@ -248,7 +247,11 @@ func (p *producer) internalCreatePartitionsProducers() 
error {
                return err
        }
 
-       p.metrics.ProducersPartitions.Add(float64(partitionsToAdd))
+       if newNumPartitions < oldNumPartitions {
+               p.metrics.ProducersPartitions.Set(float64(newNumPartitions))
+       } else {
+               p.metrics.ProducersPartitions.Add(float64(partitionsToAdd))
+       }
        atomic.StorePointer(&p.producersPtr, unsafe.Pointer(&p.producers))
        atomic.StoreUint32(&p.numPartitions, uint32(len(p.producers)))
        return nil

Reply via email to