tolbertam commented on code in PR #2003:
URL:
https://github.com/apache/cassandra-java-driver/pull/2003#discussion_r1938353958
##########
core/src/main/java/com/datastax/oss/driver/internal/core/cql/CqlPrepareAsyncProcessor.java:
##########
@@ -141,25 +195,23 @@ public CompletionStage<PreparedStatement> process(
String sessionLogPrefix) {
try {
- CompletableFuture<PreparedStatement> result =
cache.getIfPresent(request);
- if (result == null) {
- CompletableFuture<PreparedStatement> mine = new CompletableFuture<>();
- result = cache.get(request, () -> mine);
- if (result == mine) {
- new CqlPrepareHandler(request, session, context, sessionLogPrefix)
- .handle()
- .whenComplete(
- (preparedStatement, error) -> {
- if (error != null) {
- mine.completeExceptionally(error);
- cache.invalidate(request); // Make sure failure isn't
cached indefinitely
- } else {
- mine.complete(preparedStatement);
- }
- });
- }
- }
- return result;
+ CompletableFuture<PreparedStatement> rv = new CompletableFuture<>();
Review Comment:
Wrote a test that reproduces this, so it does seem to be a problem:
```java
@Test
public void should_complete_if_already_prepared() throws Exception {
CqlSession session = SessionUtils.newSession(ccmRule,
sessionRule.keyspace());
CqlPrepareAsyncProcessor processor = findProcessor(session);
Cache<?, ?> cache = processor.getCache();
assertThat(cache.size()).isEqualTo(0);
// Prepare a statement and then wait for it to complete
String cql = "select v from test_table_1 where k = ?";
CompletableFuture<PreparedStatement> cf1 = toCompletableFuture(session,
cql);
assertThat(cache.size()).isEqualTo(1);
CqlPrepareAsyncProcessor.CacheEntry entry =
(CqlPrepareAsyncProcessor.CacheEntry)
Iterables.get(cache.asMap().values(), 0);
PreparedStatement stmt = entry.waitForResult();
assertThat(cf1.isDone()).isTrue();
assertThat(cf1.join()).isEqualTo(stmt);
// Prepare the same prepared statement, which should be completed
immediately since it was previously prepared.
CompletableFuture<PreparedStatement> cf2 = toCompletableFuture(session,
cql);
// cache should not grow
assertThat(cache.size()).isEqualTo(1);
CqlPrepareAsyncProcessor.CacheEntry newEntry =
(CqlPrepareAsyncProcessor.CacheEntry)
Iterables.get(cache.asMap().values(), 0);
// Strictly the same entry in the cache.
assertThat(entry).isSameAs(newEntry);
// Note: made futures public just to test this, not necessary, just for
demonstrating CacheEntry has this future
assertThat(newEntry.futures).contains(cf2);
// Future should be complete (where the test fails)
assertThat(cf2.isDone()).isTrue();
assertThat(cf2.join()).isEqualTo(stmt);
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]