This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 3cc83197036 Support unique key first binary (e.g. MySQL VARBINARY)
column exact splitting (#38041)
3cc83197036 is described below
commit 3cc83197036c037d4d5fe8b7e5822e590938c912
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Sat Feb 14 15:05:10 2026 +0800
Support unique key first binary (e.g. MySQL VARBINARY) column exact
splitting (#38041)
* Support unique key first binary column (e.g. MySQL VARBINARY) exact
splitting
* Update RELEASE-NOTES.md
---
RELEASE-NOTES.md | 1 +
.../position/type/pk/UniqueKeyIngestPosition.java | 25 ++++++-
.../position/InventoryPositionCalculator.java | 9 +++
.../position/exact/BinaryPositionHandler.java | 48 ++++++++++++
.../type/pk/UniqueKeyIngestPositionTest.java | 50 +++++++++++++
.../position/exact/BinaryPositionHandlerTest.java | 86 ++++++++++++++++++++++
6 files changed, 218 insertions(+), 1 deletion(-)
diff --git a/RELEASE-NOTES.md b/RELEASE-NOTES.md
index 0f29327e3a4..5a30efe17c9 100644
--- a/RELEASE-NOTES.md
+++ b/RELEASE-NOTES.md
@@ -86,6 +86,7 @@
1. Pipeline: Support unique key first integer column exact or estimated
splitting based on data sparseness -
[#37542](https://github.com/apache/shardingsphere/pull/37542)
1. Pipeline: Support unique key first big integer column splitting -
[#37574](https://github.com/apache/shardingsphere/pull/37574)
1. Pipeline: Support unique key first string column exact splitting -
[#37543](https://github.com/apache/shardingsphere/pull/37543)
+1. Pipeline: Support unique key first binary (e.g. MySQL VARBINARY) column
exact splitting - [#38041](https://github.com/apache/shardingsphere/pull/38041)
1. Pipeline: Support multi-columns unique key non-first column nullable -
[#37647](https://github.com/apache/shardingsphere/pull/37647)
1. Encrypt: Support handling show create view result decoration in encrypt -
[#37299](https://github.com/apache/shardingsphere/pull/37299)
1. JDBC: Enhance ResultSetUtils to support flexible string date/time
conversions - [37424](https://github.com/apache/shardingsphere/pull/37424)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/type/pk/UniqueKeyIngestPosition.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/type/pk/UniqueKeyIngestPosition.java
index 453146d8d9a..fad894bc803 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/type/pk/UniqueKeyIngestPosition.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/type/pk/UniqueKeyIngestPosition.java
@@ -27,6 +27,7 @@ import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.quer
import
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
import java.math.BigInteger;
+import java.util.Base64;
import java.util.List;
/**
@@ -62,7 +63,11 @@ public final class UniqueKeyIngestPosition<T> implements
IngestPosition {
String upper = null == upperBound ? null : upperBound.toString();
return ofString(range.isLowerInclusive() ? Range.closed(lower,
upper) : Range.openClosed(lower, upper));
}
- // TODO support more types, e.g. byte[] (MySQL varbinary)
+ if (lowerBound instanceof byte[]) {
+ byte[] lower = (byte[]) lowerBound;
+ byte[] upper = (byte[]) upperBound;
+ return ofBinary(range.isLowerInclusive() ? Range.closed(lower,
upper) : Range.openClosed(lower, upper));
+ }
return ofUnsplit();
}
@@ -93,6 +98,16 @@ public final class UniqueKeyIngestPosition<T> implements
IngestPosition {
return new UniqueKeyIngestPosition<>('s', range);
}
+ /**
+ * Create binary unique key ingest position.
+ *
+ * @param range range
+ * @return binary unique key ingest position
+ */
+ public static UniqueKeyIngestPosition<byte[]> ofBinary(final Range<byte[]>
range) {
+ return new UniqueKeyIngestPosition<>('b', range);
+ }
+
/**
* Create unsplit unique key ingest position.
*
@@ -123,6 +138,10 @@ public final class UniqueKeyIngestPosition<T> implements
IngestPosition {
return ofInteger(Range.closed(lower, upper));
case 's':
return ofString(Range.closed(lowerBound, upperBound));
+ case 'b':
+ byte[] lowerBytes = Strings.isNullOrEmpty(lowerBound) ? null :
Base64.getDecoder().decode(lowerBound);
+ byte[] upperBytes = Strings.isNullOrEmpty(upperBound) ? null :
Base64.getDecoder().decode(upperBound);
+ return ofBinary(Range.closed(lowerBytes, upperBytes));
case 'u':
return ofUnsplit();
default:
@@ -148,6 +167,10 @@ public final class UniqueKeyIngestPosition<T> implements
IngestPosition {
encodedLowerBound = null == lowerBound ? "" :
lowerBound.toString();
encodedUpperBound = null == upperBound ? "" :
upperBound.toString();
break;
+ case 'b':
+ encodedLowerBound = null == lowerBound ? "" :
Base64.getEncoder().encodeToString((byte[]) lowerBound);
+ encodedUpperBound = null == upperBound ? "" :
Base64.getEncoder().encodeToString((byte[]) upperBound);
+ break;
default:
throw new RuntimeException("Unknown unique key position type:
" + getType());
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/InventoryPositionCalculator.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/InventoryPositionCalculator.java
index f2503eef846..bb925eb82b5 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/InventoryPositionCalculator.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/InventoryPositionCalculator.java
@@ -26,6 +26,7 @@ import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.Uniq
import
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
import
org.apache.shardingsphere.data.pipeline.core.preparer.inventory.calculator.InventoryDataSparsenessCalculator;
import
org.apache.shardingsphere.data.pipeline.core.preparer.inventory.calculator.position.estimated.InventoryPositionEstimatedCalculator;
+import
org.apache.shardingsphere.data.pipeline.core.preparer.inventory.calculator.position.exact.BinaryPositionHandler;
import
org.apache.shardingsphere.data.pipeline.core.preparer.inventory.calculator.position.exact.IntegerPositionHandler;
import
org.apache.shardingsphere.data.pipeline.core.preparer.inventory.calculator.position.exact.InventoryPositionExactCalculator;
import
org.apache.shardingsphere.data.pipeline.core.preparer.inventory.calculator.position.exact.StringPositionHandler;
@@ -68,6 +69,9 @@ public final class InventoryPositionCalculator {
if (dataTypeOption.isStringDataType(firstColumnDataType)) {
return getStringPositions();
}
+ if (dataTypeOption.isBinaryDataType(firstColumnDataType)) {
+ return getBinaryPositions();
+ }
log.info("Unsupported unique key type, unique key columns: {}",
uniqueKeyColumns);
return Collections.singletonList(UniqueKeyIngestPosition.ofUnsplit());
}
@@ -85,4 +89,9 @@ public final class InventoryPositionCalculator {
String uniqueKey = uniqueKeyColumns.get(0).getName();
return InventoryPositionExactCalculator.getPositions(qualifiedTable,
uniqueKey, shardingSize, dataSource, new StringPositionHandler());
}
+
+ private List<IngestPosition> getBinaryPositions() {
+ String uniqueKey = uniqueKeyColumns.get(0).getName();
+ return InventoryPositionExactCalculator.getPositions(qualifiedTable,
uniqueKey, shardingSize, dataSource, new BinaryPositionHandler());
+ }
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/exact/BinaryPositionHandler.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/exact/BinaryPositionHandler.java
new file mode 100644
index 00000000000..25ea2767cd8
--- /dev/null
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/exact/BinaryPositionHandler.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.shardingsphere.data.pipeline.core.preparer.inventory.calculator.position.exact;
+
+import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.Range;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.UniqueKeyIngestPosition;
+
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+/**
+ * Binary position handler.
+ *
+ * <p>Handles VARBINARY/BINARY unique key type, using Base64 encoding for text
serialization.</p>
+ */
+public final class BinaryPositionHandler implements
DataTypePositionHandler<byte[]> {
+
+ @Override
+ public byte[] readColumnValue(final ResultSet resultSet, final int
columnIndex) throws SQLException {
+ return resultSet.getBytes(columnIndex);
+ }
+
+ @Override
+ public void setPreparedStatementValue(final PreparedStatement
preparedStatement, final int parameterIndex, final byte[] value) throws
SQLException {
+ preparedStatement.setBytes(parameterIndex, value);
+ }
+
+ @Override
+ public UniqueKeyIngestPosition<byte[]> createIngestPosition(final
Range<byte[]> range) {
+ return UniqueKeyIngestPosition.ofBinary(range);
+ }
+}
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/type/pk/UniqueKeyIngestPositionTest.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/type/pk/UniqueKeyIngestPositionTest.java
index fb15f5e0cbc..045e40b9e8a 100644
---
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/type/pk/UniqueKeyIngestPositionTest.java
+++
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/type/pk/UniqueKeyIngestPositionTest.java
@@ -27,6 +27,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.isA;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
class UniqueKeyIngestPositionTest {
@@ -146,4 +147,53 @@ class UniqueKeyIngestPositionTest {
assertNull(actual.getLowerBound());
assertNull(actual.getUpperBound());
}
+
+ @Test
+ void assertEncodeBinary() {
+ byte[] lowerBound = new byte[]{0x01, 0x02, 0x03};
+ byte[] upperBound = new byte[]{0x04, 0x05, 0x06};
+ String encoded =
UniqueKeyIngestPosition.ofBinary(Range.closed(lowerBound, upperBound)).encode();
+ assertThat(encoded, is("b,AQID,BAUG"));
+ }
+
+ @Test
+ void assertEncodeBinaryWithNullValue() {
+ assertThat(UniqueKeyIngestPosition.ofBinary(Range.closed(null,
null)).encode(), is("b,,"));
+ }
+
+ @Test
+ void assertDecodeBinaryPosition() {
+ UniqueKeyIngestPosition<?> actual =
UniqueKeyIngestPosition.decode("b,AQID,BAUG");
+ assertThat(actual, isA(UniqueKeyIngestPosition.class));
+ assertThat(actual.getType(), is('b'));
+ assertArrayEquals(new byte[]{0x01, 0x02, 0x03}, (byte[])
actual.getLowerBound());
+ assertArrayEquals(new byte[]{0x04, 0x05, 0x06}, (byte[])
actual.getUpperBound());
+ }
+
+ @Test
+ void assertDecodeBinaryPositionWithNullValue() {
+ UniqueKeyIngestPosition<?> actual =
UniqueKeyIngestPosition.decode("b,,");
+ assertThat(actual.getType(), is('b'));
+ assertNull(actual.getLowerBound());
+ assertNull(actual.getUpperBound());
+ }
+
+ @Test
+ void assertNewInstanceWithBinaryRange() {
+ byte[] lowerBound = new byte[]{0x01, 0x02};
+ byte[] upperBound = new byte[]{0x03, 0x04};
+ UniqueKeyIngestPosition<?> actual =
UniqueKeyIngestPosition.newInstance(Range.closed(lowerBound, upperBound));
+ assertThat(actual.getType(), is('b'));
+ assertArrayEquals(lowerBound, (byte[]) actual.getLowerBound());
+ assertArrayEquals(upperBound, (byte[]) actual.getUpperBound());
+ }
+
+ @Test
+ void assertNewInstanceWithBinaryNullEndRange() {
+ byte[] lowerBound = new byte[]{0x01, 0x02};
+ UniqueKeyIngestPosition<?> actual =
UniqueKeyIngestPosition.newInstance(Range.closed(lowerBound, null));
+ assertThat(actual.getType(), is('b'));
+ assertArrayEquals(lowerBound, (byte[]) actual.getLowerBound());
+ assertNull(actual.getUpperBound());
+ }
}
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/exact/BinaryPositionHandlerTest.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/exact/BinaryPositionHandlerTest.java
new file mode 100644
index 00000000000..9d77e4105c4
--- /dev/null
+++
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/exact/BinaryPositionHandlerTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.shardingsphere.data.pipeline.core.preparer.inventory.calculator.position.exact;
+
+import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.Range;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.UniqueKeyIngestPosition;
+import org.junit.jupiter.api.Test;
+
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+class BinaryPositionHandlerTest {
+
+ private final BinaryPositionHandler handler = new BinaryPositionHandler();
+
+ @Test
+ void assertReadColumnValue() throws Exception {
+ byte[] expectedValue = new byte[]{0x01, 0x02, 0x03};
+ ResultSet resultSet = mock(ResultSet.class);
+ when(resultSet.getBytes(1)).thenReturn(expectedValue);
+ byte[] actualValue = handler.readColumnValue(resultSet, 1);
+ assertThat(actualValue, is(expectedValue));
+ }
+
+ @Test
+ void assertReadNullColumnValue() throws Exception {
+ ResultSet resultSet = mock(ResultSet.class);
+ when(resultSet.getBytes(1)).thenReturn(null);
+ byte[] actualValue = handler.readColumnValue(resultSet, 1);
+ assertThat(actualValue, is((byte[]) null));
+ }
+
+ @Test
+ void assertSetPreparedStatementValue() throws Exception {
+ byte[] value = new byte[]{0x01, 0x02, 0x03};
+ PreparedStatement preparedStatement = mock(PreparedStatement.class);
+ handler.setPreparedStatementValue(preparedStatement, 1, value);
+ verify(preparedStatement).setBytes(1, value);
+ }
+
+ @Test
+ void assertSetNullPreparedStatementValue() throws Exception {
+ PreparedStatement preparedStatement = mock(PreparedStatement.class);
+ handler.setPreparedStatementValue(preparedStatement, 1, null);
+ verify(preparedStatement).setBytes(1, null);
+ }
+
+ @Test
+ void assertCreateIngestPosition() {
+ byte[] lowerBound = new byte[]{0x01, 0x02};
+ byte[] upperBound = new byte[]{0x03, 0x04};
+ UniqueKeyIngestPosition<byte[]> position =
handler.createIngestPosition(Range.closed(lowerBound, upperBound));
+ assertThat(position.getType(), is('b'));
+ assertThat(position.getLowerBound(), is(lowerBound));
+ assertThat(position.getUpperBound(), is(upperBound));
+ }
+
+ @Test
+ void assertCreateIngestPositionWithNullBounds() {
+ UniqueKeyIngestPosition<byte[]> position =
handler.createIngestPosition(Range.closed(null, null));
+ assertThat(position.getType(), is('b'));
+ assertThat(position.getLowerBound(), is((byte[]) null));
+ assertThat(position.getUpperBound(), is((byte[]) null));
+ }
+}