Copilot commented on code in PR #2710:
URL: https://github.com/apache/fluss/pull/2710#discussion_r3051157863
##
fluss-server/src/test/java/org/apache/fluss/server/zk/CuratorCacheRaceConditionTest.java:
##
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.zk;
+
+import
org.apache.fluss.shaded.curator5.org.apache.curator.framework.CuratorFramework;
+import
org.apache.fluss.shaded.curator5.org.apache.curator.framework.CuratorFrameworkFactory;
+import
org.apache.fluss.shaded.curator5.org.apache.curator.framework.recipes.cache.CuratorCache;
+import
org.apache.fluss.shaded.curator5.org.apache.curator.framework.recipes.cache.CuratorCacheListener;
+import
org.apache.fluss.shaded.curator5.org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.fluss.shaded.zookeeper3.org.apache.zookeeper.AsyncCallback;
+import org.apache.fluss.shaded.zookeeper3.org.apache.zookeeper.CreateMode;
+import org.apache.fluss.shaded.zookeeper3.org.apache.zookeeper.ZooDefs;
+import org.apache.fluss.shaded.zookeeper3.org.apache.zookeeper.ZooKeeper;
+import org.apache.fluss.testutils.common.AllCallbackWrapper;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.time.Duration;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.fluss.testutils.common.CommonTestUtils.retry;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Test to demonstrate the CuratorCache race condition where NODE_CHANGED
events can be lost.
+ *
+ * Background: CuratorCache internally uses async {@code getData()} calls
to read the current ZK
+ * state when processing watcher events. It then compares the fetched data's
version with the cached
+ * version to decide whether to fire NODE_CHANGED. When two ZK mutations
happen in quick succession
+ * (e.g., node created empty via {@code creatingParentsIfNeeded}, then
immediately {@code setData}
+ * with real data), the async {@code getData()} for the first event
(NodeCreated) may be processed
+ * by ZK after the second mutation (setData) has already completed. In this
case:
+ *
+ *
+ * The getData callback for NodeCreated reads version=1 (final data) and
fires NODE_CREATED
+ * with full data
+ * The getData callback for NodeDataChanged also reads version=1, same
version as cache, so no
+ * NODE_CHANGED event is fired
+ *
+ *
+ * To reliably reproduce this race, we use two separate CuratorFramework
clients (two ZK
+ * sessions) and the raw ZK async API to pipeline create + setData
requests. The CuratorCache
+ * uses one session for its async {@code getData()} calls, while mutations go
through a different
+ * session. By pipelining create and setData on the mutation session (sending
them back-to-back
+ * without waiting for responses), the ZK server processes both before the
cache session can even
+ * send its async getData. This guarantees that the cache's getData reads the
post-setData state.
+ */
+public class CuratorCacheRaceConditionTest {
+
+@RegisterExtension
+public static final AllCallbackWrapper
ZOO_KEEPER_EXTENSION_WRAPPER =
+new AllCallbackWrapper<>(new ZooKeeperExtension());
+
+/**
+ * Demonstrates that CuratorCache can fire NODE_CREATED with full data
(version > 0) for a node
+ * that was initially created as an empty node, and consequently miss the
NODE_CHANGED event
+ * entirely.
+ *
+ * This simulates the exact pattern used in {@code
MetadataManager.createTable}:
+ *
+ *
+ * {@code registerFirstSchema} creates a child node with {@code
creatingParentsIfNeeded},
+ * which implicitly creates the parent (table) node as empty
(version 0)
+ * {@code registerTable} calls {@code setData} on the parent (table)
node with real data
+ * (version becomes 1)
+ *
+ *
+ * The test uses the raw ZK async API to pipeline create + setData on
the mutation session,
+ * ensuring they are sent back-to-back on the TCP connection. The ZK
server pr