This is an automated email from the ASF dual-hosted git repository. rxl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push: new b684151 Fix panic when scale down partitions (#601) b684151 is described below commit b6841513379ea9ca503d1e350c5f93198fc2b03f Author: xiaolong ran <r...@apache.org> AuthorDate: Thu Aug 26 16:51:20 2021 +0800 Fix panic when scale down partitions (#601) Signed-off-by: xiaolongran <xiaolong...@tencent.com> ### Motivation When the program is running, if the business is forced to delete certain sub partitions, the following error message will be caused, that is, old_partitions is greater than new_partitions, it looks like it is doing scale down partitions, and the current code logic only deals with the scenario of scale up partitions , So if the user is forced to delete some sub partitions, the following error will be encountered: ``` level=info msg="[Changed number of partitions in topic]" new_partitions=1 old_partitions=20 topic="persistent://pulsar-xxxxxxx/xxxx/gxxxxxxxx" ``` ``` panic: runtime error: index out of range [1] with length 1 goroutine 166288 [running]: github.com/apache/pulsar-client-go/pulsar.(*producer).internalCreatePartitionsProducers(0xc0070aa6e0, 0x0, 0x0) github.com/apache/pulsar-client-go/pulsar/producer_impl.go:194 +0x785 github.com/apache/pulsar-client-go/pulsar.(*producer).runBackgroundPartitionDiscovery.func1(0xc004167cd0, 0xc00559f5c0, 0xc006af6dc0, 0xc0070aa6e0) github.com/apache/pulsar-client-go/pulsar/producer_impl.go:152 +0xce created by github.com/apache/pulsar-client-go/pulsar.(*producer).runBackgroundPartitionDiscovery github.com/apache/pulsar-client-go/pulsar/producer_impl.go:144 +0xcd ``` ### Modifications Increase the processing logic of scale down partition --- pulsar/consumer_impl.go | 30 ++++++++++++++++++------------ pulsar/producer_impl.go | 27 +++++++++++++++------------ 2 files changed, 33 insertions(+), 24 deletions(-) diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go index ec7ad7d..78ae0d7 100644 --- a/pulsar/consumer_impl.go +++ b/pulsar/consumer_impl.go @@ -28,7 +28,6 @@ import ( "github.com/apache/pulsar-client-go/pulsar/internal" pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto" "github.com/apache/pulsar-client-go/pulsar/log" - "github.com/pkg/errors" ) const defaultNackRedeliveryDelay = 1 * time.Minute @@ -258,22 +257,16 @@ func (c *consumer) internalTopicSubscribeToPartitions() error { c.Lock() defer c.Unlock() + oldConsumers := c.consumers + oldNumPartitions = len(oldConsumers) if oldConsumers != nil { - oldNumPartitions = len(oldConsumers) if oldNumPartitions == newNumPartitions { c.log.Debug("Number of partitions in topic has not changed") return nil } - if oldNumPartitions > newNumPartitions { - c.log.WithField("old_partitions", oldNumPartitions). - WithField("new_partitions", newNumPartitions). - Error("Does not support scaling down operations on topic partitions") - return errors.New("Does not support scaling down operations on topic partitions") - } - c.log.WithField("old_partitions", oldNumPartitions). WithField("new_partitions", newNumPartitions). Info("Changed number of partitions in topic") @@ -281,7 +274,9 @@ func (c *consumer) internalTopicSubscribeToPartitions() error { c.consumers = make([]*partitionConsumer, newNumPartitions) - if oldConsumers != nil { + // When for some reason (eg: forced deletion of sub partition) causes oldNumPartitions> newNumPartitions, + // we need to rebuild the cache of new consumers, otherwise the array will be out of bounds. + if oldConsumers != nil && oldNumPartitions < newNumPartitions { // Copy over the existing consumer instances for i := 0; i < oldNumPartitions; i++ { c.consumers[i] = oldConsumers[i] @@ -297,12 +292,19 @@ func (c *consumer) internalTopicSubscribeToPartitions() error { receiverQueueSize := c.options.ReceiverQueueSize metadata := c.options.Properties + startPartition := oldNumPartitions partitionsToAdd := newNumPartitions - oldNumPartitions + + if partitionsToAdd < 0 { + partitionsToAdd = newNumPartitions + startPartition = 0 + } + var wg sync.WaitGroup ch := make(chan ConsumerError, partitionsToAdd) wg.Add(partitionsToAdd) - for partitionIdx := oldNumPartitions; partitionIdx < newNumPartitions; partitionIdx++ { + for partitionIdx := startPartition; partitionIdx < newNumPartitions; partitionIdx++ { partitionTopic := partitions[partitionIdx] go func(idx int, pt string) { @@ -366,7 +368,11 @@ func (c *consumer) internalTopicSubscribeToPartitions() error { return err } - c.metrics.ConsumersPartitions.Add(float64(partitionsToAdd)) + if newNumPartitions < oldNumPartitions { + c.metrics.ConsumersPartitions.Set(float64(newNumPartitions)) + } else { + c.metrics.ConsumersPartitions.Add(float64(partitionsToAdd)) + } return nil } diff --git a/pulsar/producer_impl.go b/pulsar/producer_impl.go index adf9b14..20e8d3d 100644 --- a/pulsar/producer_impl.go +++ b/pulsar/producer_impl.go @@ -26,7 +26,6 @@ import ( "github.com/apache/pulsar-client-go/pulsar/internal" "github.com/apache/pulsar-client-go/pulsar/log" - "github.com/pkg/errors" ) const ( @@ -175,21 +174,14 @@ func (p *producer) internalCreatePartitionsProducers() error { defer p.Unlock() oldProducers := p.producers + oldNumPartitions = len(oldProducers) if oldProducers != nil { - oldNumPartitions = len(oldProducers) if oldNumPartitions == newNumPartitions { p.log.Debug("Number of partitions in topic has not changed") return nil } - if oldNumPartitions > newNumPartitions { - p.log.WithField("old_partitions", oldNumPartitions). - WithField("new_partitions", newNumPartitions). - Error("Does not support scaling down operations on topic partitions") - return errors.New("Does not support scaling down operations on topic partitions") - } - p.log.WithField("old_partitions", oldNumPartitions). WithField("new_partitions", newNumPartitions). Info("Changed number of partitions in topic") @@ -198,7 +190,9 @@ func (p *producer) internalCreatePartitionsProducers() error { p.producers = make([]Producer, newNumPartitions) - if oldProducers != nil { + // When for some reason (eg: forced deletion of sub partition) causes oldNumPartitions> newNumPartitions, + // we need to rebuild the cache of new producers, otherwise the array will be out of bounds. + if oldProducers != nil && oldNumPartitions < newNumPartitions { // Copy over the existing consumer instances for i := 0; i < oldNumPartitions; i++ { p.producers[i] = oldProducers[i] @@ -211,10 +205,15 @@ func (p *producer) internalCreatePartitionsProducers() error { err error } + startPartition := oldNumPartitions partitionsToAdd := newNumPartitions - oldNumPartitions + if partitionsToAdd < 0 { + partitionsToAdd = newNumPartitions + startPartition = 0 + } c := make(chan ProducerError, partitionsToAdd) - for partitionIdx := oldNumPartitions; partitionIdx < newNumPartitions; partitionIdx++ { + for partitionIdx := startPartition; partitionIdx < newNumPartitions; partitionIdx++ { partition := partitions[partitionIdx] go func(partitionIdx int, partition string) { @@ -248,7 +247,11 @@ func (p *producer) internalCreatePartitionsProducers() error { return err } - p.metrics.ProducersPartitions.Add(float64(partitionsToAdd)) + if newNumPartitions < oldNumPartitions { + p.metrics.ProducersPartitions.Set(float64(newNumPartitions)) + } else { + p.metrics.ProducersPartitions.Add(float64(partitionsToAdd)) + } atomic.StorePointer(&p.producersPtr, unsafe.Pointer(&p.producers)) atomic.StoreUint32(&p.numPartitions, uint32(len(p.producers))) return nil