Re: [PR] [kv] Introduce TabletState to support persistence of auto-inc buffer and row count [fluss]
wuchong merged PR #2651: URL: https://github.com/apache/fluss/pull/2651 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] [kv] Introduce TabletState to support persistence of auto-inc buffer and row count [fluss]
wuchong commented on code in PR #2651:
URL: https://github.com/apache/fluss/pull/2651#discussion_r2797198818
##
fluss-rpc/src/main/proto/FlussApi.proto:
##
@@ -1184,4 +1199,34 @@ message PbKvSnapshotLeaseForBucket {
optional int64 partition_id = 1;
required int32 bucket_id = 2;
required int64 snapshot_id = 3;
+}
+
+message PbTableStatsReqForBucket {
+ optional int64 partition_id = 1;
+ required int32 bucket_id = 2;
+}
+
+message PbTableStatsRespForBucket {
+ optional int32 error_code = 1;
+ optional string error_message = 2;
+ optional int64 partition_id = 3;
+ required int32 bucket_id = 4;
+
+ // --- Table-level stats ---
+ // The number of rows in this bucket.
+ // For KV tables: the number of unique keys (live rows).
+ // For Log tables: total number of log records (highWatermark -
logStartOffset).
+ // Absent if row count is not available (e.g., WAL changelog mode or legacy
tables).
+ optional int64 row_count = 5;
+
+ // The data size in bytes of this bucket.
+ // For KV tables: the size of the KV store.
+ // For Log tables: the size of the log segments.
+ // Reserved for future use.
+ // optional int64 data_size_bytes = 6;
+
+ // --- Column-level stats (future) ---
+ // Per-column statistics, keyed by column index.
+ // Only populated when target_columns is specified in the request.
+ // repeated PbColumnStats column_stats = 10;
Review Comment:
No, just some hints for future
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] [kv] Introduce TabletState to support persistence of auto-inc buffer and row count [fluss]
loserwang1024 commented on code in PR #2651:
URL: https://github.com/apache/fluss/pull/2651#discussion_r2796617518
##
fluss-rpc/src/main/proto/FlussApi.proto:
##
@@ -1184,4 +1199,34 @@ message PbKvSnapshotLeaseForBucket {
optional int64 partition_id = 1;
required int32 bucket_id = 2;
required int64 snapshot_id = 3;
+}
+
+message PbTableStatsReqForBucket {
+ optional int64 partition_id = 1;
+ required int32 bucket_id = 2;
+}
+
+message PbTableStatsRespForBucket {
+ optional int32 error_code = 1;
+ optional string error_message = 2;
+ optional int64 partition_id = 3;
+ required int32 bucket_id = 4;
+
+ // --- Table-level stats ---
+ // The number of rows in this bucket.
+ // For KV tables: the number of unique keys (live rows).
+ // For Log tables: total number of log records (highWatermark -
logStartOffset).
+ // Absent if row count is not available (e.g., WAL changelog mode or legacy
tables).
+ optional int64 row_count = 5;
+
+ // The data size in bytes of this bucket.
+ // For KV tables: the size of the KV store.
+ // For Log tables: the size of the log segments.
+ // Reserved for future use.
+ // optional int64 data_size_bytes = 6;
+
+ // --- Column-level stats (future) ---
+ // Per-column statistics, keyed by column index.
+ // Only populated when target_columns is specified in the request.
+ // repeated PbColumnStats column_stats = 10;
Review Comment:
is this code in use?
##
fluss-rpc/src/main/proto/FlussApi.proto:
##
@@ -1184,4 +1199,34 @@ message PbKvSnapshotLeaseForBucket {
optional int64 partition_id = 1;
required int32 bucket_id = 2;
required int64 snapshot_id = 3;
+}
+
+message PbTableStatsReqForBucket {
+ optional int64 partition_id = 1;
+ required int32 bucket_id = 2;
+}
+
+message PbTableStatsRespForBucket {
+ optional int32 error_code = 1;
+ optional string error_message = 2;
+ optional int64 partition_id = 3;
+ required int32 bucket_id = 4;
+
+ // --- Table-level stats ---
+ // The number of rows in this bucket.
+ // For KV tables: the number of unique keys (live rows).
+ // For Log tables: total number of log records (highWatermark -
logStartOffset).
+ // Absent if row count is not available (e.g., WAL changelog mode or legacy
tables).
+ optional int64 row_count = 5;
+
+ // The data size in bytes of this bucket.
+ // For KV tables: the size of the KV store.
+ // For Log tables: the size of the log segments.
+ // Reserved for future use.
+ // optional int64 data_size_bytes = 6;
Review Comment:
is this code in use?
##
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/PushdownUtils.java:
##
@@ -365,76 +360,16 @@ public static Collection limitScan(
}
}
-public static long countLogTable(TablePath tablePath, Configuration
flussConfig) {
+public static long countTable(TablePath tablePath, Configuration
flussConfig) {
try (Connection connection =
ConnectionFactory.createConnection(flussConfig);
Admin flussAdmin = connection.getAdmin()) {
-TableInfo tableInfo = flussAdmin.getTableInfo(tablePath).get();
-int bucketCount = tableInfo.getNumBuckets();
-Collection buckets =
-IntStream.range(0,
bucketCount).boxed().collect(Collectors.toList());
-List partitionInfos;
-if (tableInfo.isPartitioned()) {
-partitionInfos =
flussAdmin.listPartitionInfos(tablePath).get();
-} else {
-partitionInfos = Collections.singletonList(null);
-}
-
-List> countFutureList =
-offsetLengthes(flussAdmin, tablePath, partitionInfos,
buckets);
-// wait for all the response
-CompletableFuture.allOf(countFutureList.toArray(new
CompletableFuture[0])).join();
-long count = 0;
-for (CompletableFuture countFuture : countFutureList) {
-count += countFuture.get();
-}
-return count;
+TableStats tableStats = flussAdmin.getTableStats(tablePath).get();
Review Comment:
What if the fluss server not support getTableStats? Can flink job still work
here?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] [kv] Introduce TabletState to support persistence of auto-inc buffer and row count [fluss]
luoyuxia commented on code in PR #2651:
URL: https://github.com/apache/fluss/pull/2651#discussion_r2796658302
##
fluss-server/src/main/java/org/apache/fluss/server/kv/prewrite/KvPreWriteBuffer.java:
##
@@ -332,6 +373,8 @@ public int hashCode() {
@Override
public String toString() {
return "KvEntry{"
++ "changeType="
++ changeType
+ "key="
Review Comment:
nit:
```suggestion
+ ", key="
```
##
fluss-common/src/main/java/org/apache/fluss/metadata/TableStats.java:
##
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.fluss.metadata;
+
+import java.util.Objects;
+
+/** Statistics of a table. */
Review Comment:
nit: may add `@PublicEvolving` and `since` as it's exposed to user?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] [kv] Introduce TabletState to support persistence of auto-inc buffer and row count [fluss]
wuchong commented on code in PR #2651:
URL: https://github.com/apache/fluss/pull/2651#discussion_r2796579889
##
fluss-server/src/main/java/org/apache/fluss/server/kv/prewrite/KvPreWriteBuffer.java:
##
@@ -218,9 +220,11 @@ public void flush(long exclusiveUpToLogSequenceNumber)
throws IOException {
Value value = entry.getValue();
if (value.value != null) {
flushedCount += 1;
+rowCountDiff += 1;
Review Comment:
Yes, this code is not correctly handle row count diff. I refactored this
part into track operation type.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] [kv] Introduce TabletState to support persistence of auto-inc buffer and row count [fluss]
platinumhamburg commented on code in PR #2651:
URL: https://github.com/apache/fluss/pull/2651#discussion_r2795891242
##
fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/TabletState.java:
##
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.fluss.server.kv.snapshot;
+
+import org.apache.fluss.server.kv.autoinc.AutoIncIDRange;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+
+/**
+ * TabletState represents the state of a kv tablet at a certain log offset. It
contains the flushed
+ * log offset, the row count of the tablet at that log offset, and the
auto-increment ID ranges of
+ * the tablet at that log offset. The row count and auto-increment ID ranges
are optional, and may
+ * be null if the information is not available or not needed for a particular
use case.
+ */
+public class TabletState {
Review Comment:
Consider adding `toString()` for easier debugging and logging.
##
fluss-server/src/main/java/org/apache/fluss/server/kv/prewrite/KvPreWriteBuffer.java:
##
@@ -218,9 +220,11 @@ public void flush(long exclusiveUpToLogSequenceNumber)
throws IOException {
Value value = entry.getValue();
if (value.value != null) {
flushedCount += 1;
+rowCountDiff += 1;
Review Comment:
The `KvPreWriteBuffer` cannot distinguish between INSERT (new key) and
UPDATE (overwriting existing key) - both are PUT operations. Row count will be
inflated over time as more UPDATEs occur. A low-cost fix would be to track
operation type (INSERT vs UPDATE) when writing to the buffer, since
`processUpsert()` already queries `oldValue` to determine this.
##
fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/AutoIncIDRange.java:
##
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.fluss.server.kv.autoinc;
+
+import java.util.Objects;
+
+/**
+ * Represents a range of IDs allocated for auto-increment purposes. This class
encapsulates the
+ * auto-increment column id, the start and end of the ID range. The range is
[start, end]. There is
+ * possible that the start > end which means the range is empty.
+ */
+public class AutoIncIDRange {
+
+private final int columnId;
+private final long start;
+private final long end;
+
+public AutoIncIDRange(int columnId, long start, long end) {
+this.columnId = columnId;
+this.start = start;
+this.end = end;
+}
+
+/**
+ * Returns the column ID of the auto-increment column that associated with
this auto-increment
+ * ID range.
+ */
+public int getColumnId() {
+return columnId;
+}
+
+/** Returns the starting ID of the range (inclusive). */
+public long getStart() {
+return start;
+}
+
+/** Returns the ending ID of the range (inclusive). */
+public long getEnd() {
+return end;
+}
+
+/** Checks if the ID range is empty (i.e., start is greater than end). */
+public boolean isEmpty() {
+return start > end;
+}
+
+@Override
+public boolean equals(Object o) {
+if (o == null || getClass() != o.getClass()) {
+return false;
+}
+AutoIncIDRange autoIncIdRange = (AutoIncIDRange) o;
+return start == autoIncIdRange.start && end == autoIncIdRange.end;
+}
+
+@Overr
