joao-r-reis commented on code in PR #1822:
URL:
https://github.com/apache/cassandra-gocql-driver/pull/1822#discussion_r1801423859
##########
batch_test.go:
##########
@@ -84,3 +85,84 @@ func TestBatch_WithTimestamp(t *testing.T) {
t.Errorf("got ts %d, expected %d", storedTs, micros)
}
}
+
+func TestBatch_WithNowInSeconds(t *testing.T) {
+ session := createSession(t)
+ defer session.Close()
+
+ if session.cfg.ProtoVersion < protoVersion5 {
+ t.Skip("Batch now in seconds are only available on protocol >=
5")
+ }
+
+ if err := createTable(session, `CREATE TABLE batch_now_in_seconds (id
int primary key, val text)`); err != nil {
Review Comment:
Should we add `IF NOT EXISTS` to make it possible for these tests to run
multiple times on the same cluster? Normally it wouldn't matter since a test
cluster is created for each run on the github action but locally it could be
useful.
##########
frame.go:
##########
@@ -1503,18 +1523,21 @@ func (f *framer) writeQueryParams(opts *queryParams) {
}
}
- if opts.keyspace != "" {
- if f.proto > protoVersion4 {
+ // protoV5 specific things
+ if f.proto > protoVersion4 {
+ if opts.keyspace != "" {
flags |= flagWithKeyspace
- } else {
- panic(fmt.Errorf("the keyspace can only be set with
protocol 5 or higher"))
Review Comment:
We should keep this panic (or return an error) if the user tries to set the
keyspace but protocol version is not > 4
##########
cassandra_test.go:
##########
@@ -3288,3 +3289,150 @@ func TestQuery_NamedValues(t *testing.T) {
t.Fatal(err)
}
}
+
+func TestQuery_WithNowInSeconds(t *testing.T) {
+ session := createSession(t)
+ defer session.Close()
+
+ if session.cfg.ProtoVersion < protoVersion5 {
+ t.Skip("Query now in seconds are only available on protocol >=
5")
+ }
+
+ if err := createTable(session, `CREATE TABLE query_now_in_seconds (id
int primary key, val text)`); err != nil {
+ t.Fatal(err)
+ }
+
+ err := session.Query("INSERT INTO query_now_in_seconds (id, val) VALUES
(?, ?) USING TTL 20", 1, "val").
+ WithNowInSeconds(int(0)).
+ Exec()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ var remainingTTL int
+ err = session.Query(`SELECT TTL(val) FROM query_now_in_seconds WHERE id
= ?`, 1).
+ WithNowInSeconds(10).
+ Scan(&remainingTTL)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ require.Equal(t, remainingTTL, 10)
+}
+
+func TestQuery_SetKeyspace(t *testing.T) {
+ session := createSession(t)
+ defer session.Close()
+
+ if session.cfg.ProtoVersion < protoVersion5 {
+ t.Skip("keyspace for QUERY message is not supported in protocol
< 5")
+ }
+
+ const keyspaceStmt = `
+ CREATE KEYSPACE IF NOT EXISTS
gocql_query_keyspace_override_test
+ WITH replication = {
+ 'class': 'SimpleStrategy',
+ 'replication_factor': '1'
+ };
+`
+
+ err := session.Query(keyspaceStmt).Exec()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ err = createTable(session, "CREATE TABLE IF NOT EXISTS
gocql_query_keyspace_override_test.query_keyspace(id int, value text, PRIMARY
KEY (id))")
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ expectedID := 1
+ expectedText := "text"
+
+ // Testing PREPARE message
+ err = session.Query("INSERT INTO
gocql_query_keyspace_override_test.query_keyspace (id, value) VALUES (?, ?)",
expectedID, expectedText).Exec()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ var (
+ id int
+ text string
+ )
+
+ q := session.Query("SELECT * FROM
gocql_query_keyspace_override_test.query_keyspace").
+ SetKeyspace("gocql_query_keyspace_override_test")
+ err = q.Scan(&id, &text)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ require.Equal(t, expectedID, id)
+ require.Equal(t, expectedText, text)
+
+ // Testing QUERY message
+ id = 0
+ text = ""
+
+ q = session.Query("SELECT * FROM
gocql_query_keyspace_override_test.query_keyspace").
+ SetKeyspace("gocql_query_keyspace_override_test")
+ q.skipPrepare = true
+ err = q.Scan(&id, &text)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ require.Equal(t, expectedID, id)
+ require.Equal(t, expectedText, text)
+}
+
+func TestLargeSizeQuery(t *testing.T) {
+ session := createSession(t)
+ defer session.Close()
+
+ if err := createTable(session, "CREATE TABLE
gocql_test.large_size_query(id int, text_col text, PRIMARY KEY (id))"); err !=
nil {
+ t.Fatal(err)
+ }
+
+ defer session.Close()
+
+ longString := strings.Repeat("a", 500_000)
+
+ err := session.Query("INSERT INTO gocql_test.large_size_query (id,
text_col) VALUES (?, ?)", "1", longString).Exec()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ var result string
+ err = session.Query("SELECT text_col FROM
gocql_test.large_size_query").Scan(&result)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ require.Equal(t, longString, result)
+}
+
+func TestQueryCompressionNotWorthIt(t *testing.T) {
+ session := createSession(t)
+ defer session.Close()
+
+ if err := createTable(session, "CREATE TABLE
gocql_test.compression_now_worth_it(id int, text_col text, PRIMARY KEY (id))");
err != nil {
+ t.Fatal(err)
+ }
+
+ defer session.Close()
+
+ str :=
"abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890!@#$%^&*()_+"
+ err := session.Query("INSERT INTO gocql_test.large_size_query (id,
text_col) VALUES (?, ?)", "1", str).Exec()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ var result string
+ err = session.Query("SELECT text_col FROM
gocql_test.large_size_query").Scan(&result)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ require.Equal(t, str, result)
Review Comment:
What is being tested on these last two tests?
##########
compressor.go:
##########
@@ -32,6 +32,7 @@ type Compressor interface {
Name() string
Encode(data []byte) ([]byte, error)
Decode(data []byte) ([]byte, error)
+ DecodeSized(data []byte, size uint32) ([]byte, error)
Review Comment:
Hmm this would technically be a breaking change but I don't think there's
much we can do. Good thing this is going into a 2.0.0 release so we don't have
to worry about this being a breaking change 👍 .
##########
session.go:
##########
@@ -1423,6 +1427,25 @@ func (q *Query) releaseAfterExecution() {
q.decRefCount()
}
+// SetKeyspace will enable keyspace flag on the query.
+// It allows to specify the keyspace that the query should be executed in
+//
+// Only available on protocol >= 5.
+func (q *Query) SetKeyspace(keyspace string) *Query {
+ q.keyspace = keyspace
+ return q
+}
+
+// WithNowInSeconds will enable the with now_in_seconds flag on the query.
+// Also, it allows to define now_in_seconds value.
+//
+// Only available on protocol >= 5.
+func (q *Query) WithNowInSeconds(now int) *Query {
+ q.useNowInSeconds = true
+ q.nowInSecondsValue = now
Review Comment:
I wonder if we should just use `*int` on the internal field so we can check
if it's `nil` and therefore remove the need of having a separate
`useNowInSeconds` flag
##########
frame.go:
##########
@@ -1717,25 +1759,46 @@ func (f *framer) writeBatchFrame(streamID int, w
*writeBatchFrame, customPayload
if w.defaultTimestamp {
flags |= flagDefaultTimestamp
}
+ }
- if f.proto > protoVersion4 {
- f.writeUint(uint32(flags))
+ if f.proto > protoVersion4 {
+ if w.keyspace != "" {
+ flags |= flagWithKeyspace
Review Comment:
same as above, panic or return error if user tries to set keyspace but
protocol is not > 4
##########
conn.go:
##########
@@ -1430,6 +1525,24 @@ func (c *Conn) executeQuery(ctx context.Context, qry
*Query) *Iter {
case *resultVoidFrame:
return &Iter{framer: framer}
case *resultRowsFrame:
+ if x.meta.newMetadataID != nil {
+ stmtCacheKey :=
c.session.stmtsLRU.keyFor(c.host.HostID(), c.currentKeyspace, qry.stmt)
+ inflight, ok := c.session.stmtsLRU.get(stmtCacheKey)
+ if !ok {
+ // We didn't find the stmt in the cache, so we
just re-prepare it
+ return c.executeQuery(ctx, qry)
+ }
+
+ // Updating the result metadata id in prepared stmt
+ //
+ // If a RESULT/Rows message reports
+ // changed resultset metadata with the
Metadata_changed flag, the reported new
+ // resultset metadata must be used in subsequent
executions
+ inflight.preparedStatment.resultMetadataID =
x.meta.newMetadataID
+ inflight.preparedStatment.response = x.meta
Review Comment:
Hmm I think there might be a synchronization issue here as well, other
goroutines can be accessing this preparedstatement object at the same time.
Ideally the prepared statement object should be immutable so we could just
replace the entire object in the cache instead of trying to modify it. This
would get rid of the synchronization issue.
##########
conn.go:
##########
@@ -474,8 +483,12 @@ func (s *startupCoordinator) startup(ctx context.Context,
supported map[string][
case error:
return v
case *readyFrame:
+ // Connection is successfully set up and ready to use Native
Protocol v5
+ s.conn.startupCompleted = true
Review Comment:
Don't we need some kind of sychronization around this field? Or atomic
access?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]