lucasbru commented on code in PR #14779:
URL: https://github.com/apache/kafka/pull/14779#discussion_r1397139312


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1102,46 +1207,94 @@ public void subscribe(Pattern pattern, 
ConsumerRebalanceListener listener) {
         subscribeInternal(pattern, Optional.of(listener));
     }
 
+    /**
+     * Acquire the light lock and ensure that the consumer hasn't been closed.
+     *
+     * @throws IllegalStateException If the consumer has been closed
+     */
+    private void acquireAndEnsureOpen() {
+        acquire();
+        if (this.closed) {
+            release();
+            throw new IllegalStateException("This consumer has already been 
closed.");
+        }
+    }
+
+    /**
+     * Acquire the light lock protecting this consumer from multithreaded 
access. Instead of blocking
+     * when the lock is not available, however, we just throw an exception 
(since multithreaded usage is not
+     * supported).
+     *
+     * @throws ConcurrentModificationException if another thread already has 
the lock
+     */
+    private void acquire() {
+        final Thread thread = Thread.currentThread();
+        final long threadId = thread.getId();
+        if (threadId != currentThread.get() && 
!currentThread.compareAndSet(NO_CURRENT_THREAD, threadId))
+            throw new ConcurrentModificationException("KafkaConsumer is not 
safe for multi-threaded access. " +
+                "currentThread(name: " + thread.getName() + ", id: " + 
threadId + ")" +
+                " otherThread(id: " + currentThread.get() + ")"
+            );
+        refCount.incrementAndGet();
+    }
+
+    /**
+     * Release the light lock protecting the consumer from multithreaded 
access.
+     */
+    private void release() {
+        if (refCount.decrementAndGet() == 0)
+            currentThread.set(NO_CURRENT_THREAD);
+    }
+
     private void subscribeInternal(Pattern pattern, 
Optional<ConsumerRebalanceListener> listener) {
-        maybeThrowInvalidGroupIdException();
-        if (pattern == null || pattern.toString().isEmpty())
-            throw new IllegalArgumentException("Topic pattern to subscribe to 
cannot be " + (pattern == null ?
+        acquireAndEnsureOpen();

Review Comment:
   I took a tiny bit of artistic freedom here to make the lock handling more 
consistent. I think this is still at the level where we can diverge from the 
original consumer, but if you feel strongly about it, we can also check for the 
lock after checking the `groupId`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to