tengu-alt commented on code in PR #1307:
URL:
https://github.com/apache/cassandra-gocql-driver/pull/1307#discussion_r1986986302
##########
session.go:
##########
@@ -1990,3 +1990,82 @@ func NewErrProtocol(format string, args ...interface{})
error {
// BatchSizeMaximum is the maximum number of statements a batch operation can
have.
// This limit is set by cassandra and could change in the future.
const BatchSizeMaximum = 65535
+
+// PartitionedBatch helps to partition batches by host using
tokenAwareHostPolicy
+type PartitionedBatch struct {
+ session *Session
+ statement string
+ tokenRing *tokenRing
+ batchTyp BatchType
+ batches map[string]*Batch
+ idempotent bool
+ consistency Consistency
+}
+
+func (s *Session) NewPartitionedBatch(statement string, typ BatchType,
idempotent bool, consistency Consistency) (*PartitionedBatch, error) {
+
+ // partitioned batch useless for other host policies
+ p, ok := s.policy.(*tokenAwareHostPolicy)
+ if !ok {
+ return nil, errors.New("tokenAwareHostPolicy should be used as
HostSelectionPolicy for current session")
+ }
+
+ return &PartitionedBatch{
+ session: s,
+ statement: statement,
+ tokenRing: p.tokenRing.Load().(*tokenRing),
+ batchTyp: typ,
+ batches: make(map[string]*Batch),
+ idempotent: idempotent,
+ consistency: consistency,
+ }, nil
+}
+
+const emptyHostID = "emptyHostID"
+
+func (b *PartitionedBatch) Query(args ...interface{}) error {
+ routingKey, err := b.getRoutingKey(context.Background(), b.statement,
args...)
+ if err != nil {
+ return err
+ }
+
+ hostID := emptyHostID
+ if host, _ := b.tokenRing.GetHostForPartitionKey(routingKey); host !=
nil {
+ hostID = host.HostID()
+ }
+
+ batch, ok := b.batches[hostID]
+ if !ok {
+ batch = b.session.NewBatch(b.batchTyp)
Review Comment:
`session.NewBatch()` was deprecated
##########
session.go:
##########
@@ -1990,3 +1990,82 @@ func NewErrProtocol(format string, args ...interface{})
error {
// BatchSizeMaximum is the maximum number of statements a batch operation can
have.
// This limit is set by cassandra and could change in the future.
const BatchSizeMaximum = 65535
+
+// PartitionedBatch helps to partition batches by host using
tokenAwareHostPolicy
+type PartitionedBatch struct {
+ session *Session
+ statement string
+ tokenRing *tokenRing
+ batchTyp BatchType
+ batches map[string]*Batch
+ idempotent bool
+ consistency Consistency
+}
+
+func (s *Session) NewPartitionedBatch(statement string, typ BatchType,
idempotent bool, consistency Consistency) (*PartitionedBatch, error) {
+
+ // partitioned batch useless for other host policies
+ p, ok := s.policy.(*tokenAwareHostPolicy)
+ if !ok {
+ return nil, errors.New("tokenAwareHostPolicy should be used as
HostSelectionPolicy for current session")
+ }
+
+ return &PartitionedBatch{
+ session: s,
+ statement: statement,
+ tokenRing: p.tokenRing.Load().(*tokenRing),
+ batchTyp: typ,
+ batches: make(map[string]*Batch),
+ idempotent: idempotent,
+ consistency: consistency,
+ }, nil
+}
+
+const emptyHostID = "emptyHostID"
+
+func (b *PartitionedBatch) Query(args ...interface{}) error {
+ routingKey, err := b.getRoutingKey(context.Background(), b.statement,
args...)
+ if err != nil {
+ return err
+ }
+
+ hostID := emptyHostID
+ if host, _ := b.tokenRing.GetHostForPartitionKey(routingKey); host !=
nil {
+ hostID = host.HostID()
Review Comment:
We can't get hosts using `GetHostForPartitionKey()` since it was removed in
#1404. I think it can be changed to `GetHostForToken`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]