ribaraka commented on code in PR #1868:
URL: 
https://github.com/apache/cassandra-gocql-driver/pull/1868#discussion_r2044123216


##########
session.go:
##########
@@ -743,33 +736,47 @@ func (s *Session) routingKeyInfo(ctx context.Context, 
stmt string, keyspace stri
        return routingKeyInfo, nil
 }
 
-func (b *Batch) execute(ctx context.Context, conn *Conn) *Iter {
-       return conn.executeBatch(ctx, b)
-}
-
 // Exec executes a batch operation and returns nil if successful
 // otherwise an error is returned describing the failure.
 func (b *Batch) Exec() error {
-       iter := b.session.executeBatch(b)
+       iter := b.session.executeBatch(b, nil)
+       return iter.Close()
+}
+
+// ExecContext executes a batch operation with the provided context and 
returns nil if successful
+// otherwise an error is returned describing the failure.
+func (b *Batch) ExecContext(ctx context.Context) error {
+       iter := b.session.executeBatch(b, ctx)
        return iter.Close()
 }
 
-func (s *Session) executeBatch(batch *Batch) *Iter {
+// Iter executes a batch operation and returns an Iter object
+// that can be used to access properties related to the execution like 
Iter.Attempts and Iter.Latency
+func (b *Batch) Iter() *Iter { return b.session.executeBatch(b, nil) }
+
+// Iter executes a batch operation with the provided context and returns an 
Iter object
+// that can be used to access properties related to the execution like 
Iter.Attempts and Iter.Latency
+func (b *Batch) IterContext(ctx context.Context) *Iter {
+       return b.session.executeBatch(b, ctx)
+}

Review Comment:
   nitpicks: 
   1. The `Iter` method should call the `IterContext`
   2. The comment for `IterContext` should start with the method name



##########
query_executor.go:
##########
@@ -222,16 +238,452 @@ func (q *queryExecutor) do(ctx context.Context, qry 
ExecutableQuery, hostIter Ne
        }
 
        if lastErr != nil {
-               return &Iter{err: lastErr}
+               return newErrIter(lastErr, qry.getQueryMetrics(), 
qry.Keyspace(), qry.getRoutingInfo(), qry.getKeyspaceFunc())
        }
 
-       return &Iter{err: ErrNoConnections}
+       return newErrIter(ErrNoConnections, qry.getQueryMetrics(), 
qry.Keyspace(), qry.getRoutingInfo(), qry.getKeyspaceFunc())
 }
 
-func (q *queryExecutor) run(ctx context.Context, qry ExecutableQuery, hostIter 
NextHost, results chan<- *Iter) {
+func (q *queryExecutor) run(ctx context.Context, qry internalRequest, hostIter 
NextHost, results chan<- *Iter) {
        select {
        case results <- q.do(ctx, qry, hostIter):
        case <-ctx.Done():
        }
-       qry.releaseAfterExecution()
+}
+
+type queryOptions struct {
+       stmt string
+
+       // Paging
+       pageSize        int
+       disableAutoPage bool
+
+       // Monitoring
+       trace    Tracer
+       observer QueryObserver
+
+       // Parameters
+       values  []interface{}
+       binding func(q *QueryInfo) ([]interface{}, error)
+
+       // Timestamp
+       defaultTimestamp      bool
+       defaultTimestampValue int64
+
+       // Consistency
+       serialCons SerialConsistency
+
+       // Protocol flag
+       disableSkipMetadata bool
+
+       customPayload     map[string][]byte
+       prefetch          float64
+       rt                RetryPolicy
+       spec              SpeculativeExecutionPolicy
+       context           context.Context
+       idempotent        bool
+       keyspace          string
+       skipPrepare       bool
+       routingKey        []byte
+       nowInSecondsValue *int
+       hostID            string
+
+       // getKeyspace is field so that it can be overriden in tests
+       getKeyspace func() string
+}
+
+func newQueryOptions(q *Query, ctx context.Context) *queryOptions {
+       var newPageState, newRoutingKey []byte
+       if q.initialPageState != nil {
+               pageState := q.initialPageState
+               newPageState = make([]byte, len(pageState))
+               copy(newPageState, pageState)
+       }

Review Comment:
   `newPageState` seems redundant here.



##########
query_executor.go:
##########
@@ -201,16 +210,375 @@ func (q *queryExecutor) do(ctx context.Context, qry 
ExecutableQuery, hostIter Ne
        }
 
        if lastErr != nil {
-               return &Iter{err: lastErr}
+               return newErrIter(lastErr, qry.getQueryMetrics())
        }
 
-       return &Iter{err: ErrNoConnections}
+       return newErrIter(ErrNoConnections, qry.getQueryMetrics())
 }
 
-func (q *queryExecutor) run(ctx context.Context, qry ExecutableQuery, hostIter 
NextHost, results chan<- *Iter) {
+func (q *queryExecutor) run(ctx context.Context, qry internalRequest, hostIter 
NextHost, results chan<- *Iter) {
        select {
        case results <- q.do(ctx, qry, hostIter):
        case <-ctx.Done():
        }
-       qry.releaseAfterExecution()
+}
+
+type queryOptions struct {
+       stmt                  string
+       values                []interface{}
+       initialConsistency    Consistency
+       pageSize              int
+       initialPageState      []byte
+       prefetch              float64
+       trace                 Tracer
+       observer              QueryObserver
+       rt                    RetryPolicy
+       spec                  SpeculativeExecutionPolicy
+       binding               func(q *QueryInfo) ([]interface{}, error)
+       serialCons            SerialConsistency
+       defaultTimestamp      bool
+       defaultTimestampValue int64
+       disableSkipMetadata   bool
+       context               context.Context
+       idempotent            bool
+       customPayload         map[string][]byte
+       keyspace              string
+       disableAutoPage       bool
+       skipPrepare           bool
+       routingKey            []byte
+
+       // getKeyspace is field so that it can be overriden in tests
+       getKeyspace func() string
+}
+
+func newQueryOptions(q *Query) *queryOptions {
+       return &queryOptions{
+               stmt:                  q.stmt,
+               values:                q.values,
+               initialConsistency:    q.initialConsistency,
+               pageSize:              q.pageSize,
+               initialPageState:      q.initialPageState,
+               prefetch:              q.prefetch,
+               trace:                 q.trace,
+               observer:              q.observer,
+               rt:                    q.rt,
+               spec:                  q.spec,
+               binding:               q.binding,
+               serialCons:            q.serialCons,
+               defaultTimestamp:      q.defaultTimestamp,
+               defaultTimestampValue: q.defaultTimestampValue,
+               disableSkipMetadata:   q.disableSkipMetadata,
+               context:               q.Context(),
+               idempotent:            q.idempotent,
+               customPayload:         q.customPayload,
+               disableAutoPage:       q.disableAutoPage,
+               skipPrepare:           q.skipPrepare,
+               routingKey:            q.routingKey,
+               getKeyspace:           q.getKeyspace,
+       }
+}
+
+type internalQuery struct {
+       originalQuery *Query
+       qryOpts       *queryOptions
+       pageState     []byte
+       metrics       *queryMetrics
+       refCount      uint32

Review Comment:
   still persist. but changed its position :sweat_smile: 



##########
query_executor.go:
##########
@@ -222,16 +238,452 @@ func (q *queryExecutor) do(ctx context.Context, qry 
ExecutableQuery, hostIter Ne
        }
 
        if lastErr != nil {
-               return &Iter{err: lastErr}
+               return newErrIter(lastErr, qry.getQueryMetrics(), 
qry.Keyspace(), qry.getRoutingInfo(), qry.getKeyspaceFunc())
        }
 
-       return &Iter{err: ErrNoConnections}
+       return newErrIter(ErrNoConnections, qry.getQueryMetrics(), 
qry.Keyspace(), qry.getRoutingInfo(), qry.getKeyspaceFunc())
 }
 
-func (q *queryExecutor) run(ctx context.Context, qry ExecutableQuery, hostIter 
NextHost, results chan<- *Iter) {
+func (q *queryExecutor) run(ctx context.Context, qry internalRequest, hostIter 
NextHost, results chan<- *Iter) {
        select {
        case results <- q.do(ctx, qry, hostIter):
        case <-ctx.Done():
        }
-       qry.releaseAfterExecution()
+}
+
+type queryOptions struct {
+       stmt string
+
+       // Paging
+       pageSize        int
+       disableAutoPage bool
+
+       // Monitoring
+       trace    Tracer
+       observer QueryObserver
+
+       // Parameters
+       values  []interface{}
+       binding func(q *QueryInfo) ([]interface{}, error)
+
+       // Timestamp
+       defaultTimestamp      bool
+       defaultTimestampValue int64
+
+       // Consistency
+       serialCons SerialConsistency
+
+       // Protocol flag
+       disableSkipMetadata bool
+
+       customPayload     map[string][]byte
+       prefetch          float64
+       rt                RetryPolicy
+       spec              SpeculativeExecutionPolicy
+       context           context.Context
+       idempotent        bool
+       keyspace          string
+       skipPrepare       bool
+       routingKey        []byte
+       nowInSecondsValue *int
+       hostID            string
+
+       // getKeyspace is field so that it can be overriden in tests
+       getKeyspace func() string
+}
+
+func newQueryOptions(q *Query, ctx context.Context) *queryOptions {
+       var newPageState, newRoutingKey []byte
+       if q.initialPageState != nil {
+               pageState := q.initialPageState
+               newPageState = make([]byte, len(pageState))
+               copy(newPageState, pageState)
+       }
+       if q.routingKey != nil {
+               routingKey := q.routingKey
+               newRoutingKey = make([]byte, len(routingKey))
+               copy(newRoutingKey, routingKey)
+       }
+       if ctx == nil {
+               ctx = q.Context()
+       }
+       return &queryOptions{
+               stmt:                  q.stmt,
+               values:                q.values,
+               pageSize:              q.pageSize,
+               prefetch:              q.prefetch,
+               trace:                 q.trace,
+               observer:              q.observer,
+               rt:                    q.rt,
+               spec:                  q.spec,
+               binding:               q.binding,
+               serialCons:            q.serialCons,
+               defaultTimestamp:      q.defaultTimestamp,
+               defaultTimestampValue: q.defaultTimestampValue,
+               disableSkipMetadata:   q.disableSkipMetadata,
+               context:               ctx,
+               idempotent:            q.idempotent,
+               customPayload:         q.customPayload,
+               disableAutoPage:       q.disableAutoPage,
+               skipPrepare:           q.skipPrepare,
+               routingKey:            newRoutingKey,
+               getKeyspace:           q.getKeyspace,
+               nowInSecondsValue:     q.nowInSecondsValue,
+               keyspace:              q.keyspace,
+               hostID:                q.hostID,
+       }
+}
+
+type internalQuery struct {
+       originalQuery *Query
+       qryOpts       *queryOptions
+       pageState     []byte
+       metrics       *queryMetrics
+       refCount      uint32
+       conn          *Conn
+       consistency   uint32
+       session       *Session
+       routingInfo   *queryRoutingInfo
+}
+
+func newInternalQuery(q *Query, ctx context.Context) *internalQuery {
+       var newPageState []byte
+       if q.initialPageState != nil {
+               pageState := q.initialPageState
+               newPageState = make([]byte, len(pageState))
+               copy(newPageState, pageState)
+       }
+       return &internalQuery{
+               originalQuery: q,
+               qryOpts:       newQueryOptions(q, ctx),
+               metrics:       &queryMetrics{m: make(map[string]*hostMetrics)},
+               consistency:   uint32(q.initialConsistency),
+               pageState:     newPageState,
+               conn:          nil,
+               session:       q.session,
+               routingInfo:   &queryRoutingInfo{},
+       }
+}
+
+// Attempts returns the number of times the query was executed.
+func (q *internalQuery) Attempts() int {
+       return q.metrics.attempts()
+}
+
+func (q *internalQuery) attempt(keyspace string, end, start time.Time, iter 
*Iter, host *HostInfo) {
+       latency := end.Sub(start)
+       attempt, metricsForHost := q.metrics.attempt(1, latency, host, 
q.qryOpts.observer != nil)
+
+       if q.qryOpts.observer != nil {
+               q.qryOpts.observer.ObserveQuery(q.qryOpts.context, 
ObservedQuery{
+                       Keyspace:  keyspace,
+                       Statement: q.qryOpts.stmt,
+                       Values:    q.qryOpts.values,
+                       Start:     start,
+                       End:       end,
+                       Rows:      iter.numRows,
+                       Host:      host,
+                       Metrics:   metricsForHost,
+                       Err:       iter.err,
+                       Attempt:   attempt,
+                       Query:     q.originalQuery,
+               })
+       }
+}
+
+func (q *internalQuery) execute(ctx context.Context, conn *Conn) *Iter {
+       return conn.executeQuery(ctx, q)
+}
+
+func (q *internalQuery) retryPolicy() RetryPolicy {
+       return q.qryOpts.rt
+}
+
+func (q *internalQuery) speculativeExecutionPolicy() 
SpeculativeExecutionPolicy {
+       return q.qryOpts.spec
+}
+
+func (q *internalQuery) GetRoutingKey() ([]byte, error) {
+       if q.qryOpts.routingKey != nil {
+               return q.qryOpts.routingKey, nil
+       }
+
+       if q.qryOpts.binding != nil && len(q.qryOpts.values) == 0 {
+               // If this query was created using session.Bind we wont have 
the query
+               // values yet, so we have to pass down to the next policy.
+               // TODO: Remove this and handle this case
+               return nil, nil
+       }
+
+       // try to determine the routing key
+       routingKeyInfo, err := q.session.routingKeyInfo(q.Context(), 
q.qryOpts.stmt, q.qryOpts.keyspace)
+       if err != nil {
+               return nil, err
+       }
+
+       if routingKeyInfo != nil {
+               q.routingInfo.mu.Lock()
+               q.routingInfo.keyspace = routingKeyInfo.keyspace
+               q.routingInfo.table = routingKeyInfo.table
+               q.routingInfo.mu.Unlock()
+       }
+       return createRoutingKey(routingKeyInfo, q.qryOpts.values)
+}
+
+func (q *internalQuery) Keyspace() string {
+       if q.qryOpts.getKeyspace != nil {
+               return q.qryOpts.getKeyspace()
+       }
+
+       qrKs := q.routingInfo.getKeyspace()
+       if qrKs != "" {
+               return qrKs
+       }
+       if q.qryOpts.keyspace != "" {
+               return q.qryOpts.keyspace
+       }
+
+       if q.session == nil {
+               return ""
+       }
+       // TODO(chbannis): this should be parsed from the query or we should let
+       // this be set by users.
+       return q.session.cfg.Keyspace
+}
+
+func (q *internalQuery) Table() string {
+       return q.routingInfo.getTable()
+}
+
+func (q *internalQuery) IsIdempotent() bool {
+       return q.qryOpts.idempotent
+}
+
+func (q *internalQuery) getQueryMetrics() *queryMetrics {
+       return q.metrics
+}
+
+func (q *internalQuery) SetConsistency(c Consistency) {
+       atomic.StoreUint32(&q.consistency, uint32(c))
+}
+
+func (q *internalQuery) GetConsistency() Consistency {
+       return Consistency(atomic.LoadUint32(&q.consistency))
+}
+
+func (q *internalQuery) Context() context.Context {
+       return q.qryOpts.context
+}
+
+func (q *internalQuery) Statement() Statement {
+       return q.originalQuery
+}
+
+func (q *internalQuery) GetHostID() string {
+       return q.qryOpts.hostID
+}
+
+func (q *internalQuery) getRoutingInfo() *queryRoutingInfo {
+       return q.routingInfo
+}
+
+func (q *internalQuery) getKeyspaceFunc() func() string {
+       return q.qryOpts.getKeyspace
+}
+
+type batchOptions struct {
+       trace    Tracer
+       observer BatchObserver
+
+       bType   BatchType
+       entries []BatchEntry
+
+       defaultTimestamp      bool
+       defaultTimestampValue int64
+
+       serialCons SerialConsistency
+
+       customPayload map[string][]byte
+       rt            RetryPolicy
+       spec          SpeculativeExecutionPolicy
+       context       context.Context
+       keyspace      string
+       idempotent    bool
+       routingKey    []byte
+       nowInSeconds  *int
+}
+
+func newBatchOptions(b *Batch, ctx context.Context) *batchOptions {
+       // make a new array so if user keeps appending entries on the Batch 
object it doesn't affect this execution
+       newEntries := make([]BatchEntry, len(b.Entries))
+       for i, e := range b.Entries {
+               newEntries[i] = e
+       }
+       var newRoutingKey []byte
+       if b.routingKey != nil {
+               routingKey := b.routingKey
+               newRoutingKey = make([]byte, len(routingKey))
+               copy(newRoutingKey, routingKey)
+       }
+       if ctx == nil {
+               ctx = b.Context()
+       }
+       return &batchOptions{
+               bType:                 b.Type,
+               entries:               newEntries,
+               customPayload:         b.CustomPayload,
+               rt:                    b.rt,
+               spec:                  b.spec,
+               trace:                 b.trace,
+               observer:              b.observer,
+               serialCons:            b.serialCons,
+               defaultTimestamp:      b.defaultTimestamp,
+               defaultTimestampValue: b.defaultTimestampValue,
+               context:               ctx,
+               keyspace:              b.Keyspace(),
+               idempotent:            b.IsIdempotent(),
+               routingKey:            newRoutingKey,
+               nowInSeconds:          b.nowInSeconds,
+       }
+}
+
+type internalBatch struct {
+       originalBatch *Batch
+       batchOpts     *batchOptions
+       metrics       *queryMetrics
+       consistency   uint32
+       routingInfo   *queryRoutingInfo
+       session       *Session
+}
+
+func newInternalBatch(batch *Batch, ctx context.Context) *internalBatch {
+       return &internalBatch{
+               originalBatch: batch,
+               batchOpts:     newBatchOptions(batch, ctx),
+               metrics:       &queryMetrics{m: make(map[string]*hostMetrics)},
+               routingInfo:   &queryRoutingInfo{},
+               session:       batch.session,
+       }
+}

Review Comment:
   `consistency` is missing



##########
query_executor.go:
##########
@@ -27,33 +27,51 @@ package gocql
 import (
        "context"
        "sync"
+       "sync/atomic"
        "time"
 )
 
-type ExecutableQuery interface {
-       borrowForExecution()    // Used to ensure that the query stays alive 
for lifetime of a particular execution goroutine.
-       releaseAfterExecution() // Used when a goroutine finishes its execution 
attempts, either with ok result or an error.
-       execute(ctx context.Context, conn *Conn) *Iter
-       attempt(keyspace string, end, start time.Time, iter *Iter, host 
*HostInfo)
-       retryPolicy() RetryPolicy
-       speculativeExecutionPolicy() SpeculativeExecutionPolicy
+// Deprecated: Will be removed in a future major release. Also Query and Batch 
no longer implement this interface.
+//
+// Please use Statement (for Query / Batch objects) or ExecutableStatement (in 
HostSelectionPolicy implementations) instead.
+type ExecutableQuery = ExecutableStatement
+
+// ExecutableStatement is an interface that represents a query or batch 
statement that
+// exposes the correct functions for the HostSelectionPolicy to operate 
correctly.
+type ExecutableStatement interface {
        GetRoutingKey() ([]byte, error)
        Keyspace() string
        Table() string
        IsIdempotent() bool
        GetHostID() string
+       Statement() Statement
+}
 
-       withContext(context.Context) ExecutableQuery
+// Statement is an interface that represents a CQL statement that the driver 
can execute
+// (currently Query and Batch via Session.Query and Session.Batch)
+type Statement interface {
+       Iter() *Iter
+       Exec() error
+}

Review Comment:
   adding here `IterContext` and `ExecContetx`? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org

Reply via email to