Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]
wirybeaver commented on PR #17235: URL: https://github.com/apache/pinot/pull/17235#issuecomment-3807462325 > Can you please create a parent issue along with the design doc, and then link all the PRs to the parent issue? It is easier to track all the PRs under this topic Yeah, the parent issue was created before: https://github.com/apache/pinot/issues/16563 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]
Jackie-Jiang commented on PR #17235: URL: https://github.com/apache/pinot/pull/17235#issuecomment-3807332593 Can you please create a parent issue along with the design doc, and then link all the PRs to the parent issue? It is easier to track all the PRs under this topic -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]
chenboat merged PR #17235: URL: https://github.com/apache/pinot/pull/17235 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]
deemoliu commented on PR #17235: URL: https://github.com/apache/pinot/pull/17235#issuecomment-3750860005 LGTM -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]
wirybeaver commented on code in PR #17235:
URL: https://github.com/apache/pinot/pull/17235#discussion_r2677047497
##
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##
@@ -1729,8 +1734,21 @@ public void updateDatabaseConfig(DatabaseConfig
databaseConfig) {
*/
public void addTable(TableConfig tableConfig)
throws IOException {
+addTable(tableConfig, Collections.emptyList());
+ }
+
+ /**
+ * Performs validations of table config and adds the table to zookeeper
Review Comment:
Oh, I just copy the javadoc of original addTable.
/**
* Performs validations of table config and adds the table to zookeeper
* @throws InvalidTableConfigException if validations fail
* @throws TableAlreadyExistsException for offline tables only if the
table already exists
*/
public void addTable(TableConfig tableConfig)
throws IOException {
addTable(tableConfig, Collections.emptyList());
}
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]
wirybeaver commented on code in PR #17235:
URL: https://github.com/apache/pinot/pull/17235#discussion_r2677047071
##
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PartitionGroupInfo.java:
##
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.realtime;
+
+import org.apache.pinot.controller.helix.core.WatermarkInductionResult;
+import org.apache.pinot.spi.stream.LongMsgOffset;
+import org.apache.pinot.spi.stream.PartitionGroupMetadata;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
+
+
+/**
+ * The {@code PartitionGroupInfo} class represents the metadata and sequence
number for a partition group.
+ * It encapsulates the {@link PartitionGroupMetadata} and the sequence number
associated with it.
+ */
+public class PartitionGroupInfo {
Review Comment:
Replace with Pair class
##
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##
@@ -1729,8 +1734,21 @@ public void updateDatabaseConfig(DatabaseConfig
databaseConfig) {
*/
public void addTable(TableConfig tableConfig)
throws IOException {
+addTable(tableConfig, Collections.emptyList());
+ }
+
+ /**
+ * Performs validations of table config and adds the table to zookeeper
Review Comment:
done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]
wirybeaver commented on code in PR #17235:
URL: https://github.com/apache/pinot/pull/17235#discussion_r2673535750
##
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PartitionGroupInfo.java:
##
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.realtime;
+
+import org.apache.pinot.controller.helix.core.WatermarkInductionResult;
+import org.apache.pinot.spi.stream.LongMsgOffset;
+import org.apache.pinot.spi.stream.PartitionGroupMetadata;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
+
+
+/**
+ * The {@code PartitionGroupInfo} class represents the metadata and sequence
number for a partition group.
+ * It encapsulates the {@link PartitionGroupMetadata} and the sequence number
associated with it.
+ */
+public class PartitionGroupInfo {
Review Comment:
From a pure Kafka consumer point of view, the segment sequence number
doesn't seem to belong to a PartitionGroupMeta? I
I am also concerned that the change of the constructor and adding the
sequence number might cause review rejections. Many places already use the
class PartitonGroupMeta. Adding a wrapper class would bring benefit of less
existing code changes.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]
wirybeaver commented on code in PR #17235:
URL: https://github.com/apache/pinot/pull/17235#discussion_r2673535750
##
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PartitionGroupInfo.java:
##
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.realtime;
+
+import org.apache.pinot.controller.helix.core.WatermarkInductionResult;
+import org.apache.pinot.spi.stream.LongMsgOffset;
+import org.apache.pinot.spi.stream.PartitionGroupMetadata;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
+
+
+/**
+ * The {@code PartitionGroupInfo} class represents the metadata and sequence
number for a partition group.
+ * It encapsulates the {@link PartitionGroupMetadata} and the sequence number
associated with it.
+ */
+public class PartitionGroupInfo {
Review Comment:
From a pure Kafka consumer point of view, the segment sequence number
doesn't seem to belong to a PartitionGroupMeta? I
I am also concerned that the change of the constructor and adding the
sequence number might cause review rejections. Adding a wrapper class would
bring benefit of less existing code changes.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]
chenboat commented on code in PR #17235:
URL: https://github.com/apache/pinot/pull/17235#discussion_r2673507545
##
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PartitionGroupInfo.java:
##
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.realtime;
+
+import org.apache.pinot.controller.helix.core.WatermarkInductionResult;
+import org.apache.pinot.spi.stream.LongMsgOffset;
+import org.apache.pinot.spi.stream.PartitionGroupMetadata;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
+
+
+/**
+ * The {@code PartitionGroupInfo} class represents the metadata and sequence
number for a partition group.
+ * It encapsulates the {@link PartitionGroupMetadata} and the sequence number
associated with it.
+ */
+public class PartitionGroupInfo {
Review Comment:
why do we need any wrapper class like this? Can we reuse
PartitionGroupMetadata or add to it instead of creating a new class?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]
wirybeaver commented on code in PR #17235:
URL: https://github.com/apache/pinot/pull/17235#discussion_r2673501184
##
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java:
##
@@ -449,6 +451,25 @@ public String getPauselessTableDebugInfo(
}
}
+
+ @GET
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("/tables/{tableName}/consumerWatermarks")
+ @Authorize(targetType = TargetType.TABLE, paramName = "tableName", action =
Actions.Table.GET_IDEAL_STATE)
+ @ApiOperation(value = "Get table ideal state", notes = "Get table ideal
state")
Review Comment:
good catch.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]
chenboat commented on code in PR #17235:
URL: https://github.com/apache/pinot/pull/17235#discussion_r2673428063
##
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##
@@ -1729,8 +1734,21 @@ public void updateDatabaseConfig(DatabaseConfig
databaseConfig) {
*/
public void addTable(TableConfig tableConfig)
throws IOException {
+addTable(tableConfig, Collections.emptyList());
+ }
+
+ /**
+ * Performs validations of table config and adds the table to zookeeper
Review Comment:
Is there any restriction on this API? Realtime table only? Can it add upsert
tables? If so, please clarify in the javadoc.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]
chenboat commented on code in PR #17235:
URL: https://github.com/apache/pinot/pull/17235#discussion_r2673411779
##
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java:
##
@@ -449,6 +451,25 @@ public String getPauselessTableDebugInfo(
}
}
+
+ @GET
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("/tables/{tableName}/consumerWatermarks")
+ @Authorize(targetType = TargetType.TABLE, paramName = "tableName", action =
Actions.Table.GET_IDEAL_STATE)
+ @ApiOperation(value = "Get table ideal state", notes = "Get table ideal
state")
Review Comment:
Why is it about "table ideal state"? Is it typo?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]
wirybeaver commented on code in PR #17235:
URL: https://github.com/apache/pinot/pull/17235#discussion_r2670884310
##
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##
@@ -4715,6 +4739,51 @@ public QueryWorkloadManager getQueryWorkloadManager() {
return _queryWorkloadManager;
}
+ /**
+ * Retrieves the consumer watermark for a given real-time table.
+ * The watermark represents the next offset to be consumed for each
partition group.
+ * If the latest segment of a partition is in a DONE state, the watermark is
the end offset of the completed segment.
+ * Otherwise, it is the start offset of the current consuming segment.
+ *
+ * @param tableName The name of the real-time table (without type suffix).
+ * @return A {@link WatermarkInductionResult} containing a list of
watermarks for each partition group.
+ * @throws TableNotFoundException if the specified real-time table does not
exist.
+ * @throws IllegalStateException if the IdealState for the table is not
found.
+ */
+ public WatermarkInductionResult getConsumerWatermarks(String tableName)
throws TableNotFoundException {
+String tableNameWithType =
TableNameBuilder.REALTIME.tableNameWithType(tableName);
+if (!hasRealtimeTable(tableName)) {
+ throw new TableNotFoundException("Table " + tableNameWithType + " does
not exist");
+}
+TableConfig tableConfig = getTableConfig(tableNameWithType);
+Preconditions.checkNotNull(tableConfig, "Table " + tableNameWithType +
"exists but null tableConfig");
+List streamConfigs =
IngestionConfigUtils.getStreamConfigs(tableConfig);
+IdealState idealState = _helixAdmin
+.getResourceIdealState(getHelixClusterName(), tableNameWithType);
+if (idealState == null) {
+ throw new IllegalStateException("Null IdealState of the table " +
tableNameWithType);
+}
+List lst = _pinotLLCRealtimeSegmentManager
+.getPartitionGroupConsumptionStatusList(idealState, streamConfigs);
+List watermarks =
lst.stream().map(status -> {
+ int seq = status.getSequenceNumber();
+ long startOffset;
+ try {
+if ("DONE".equalsIgnoreCase(status.getStatus())) {
+ Preconditions.checkNotNull(status.getEndOffset());
+ startOffset =
NumberUtils.parseLong(status.getEndOffset().toString());
+ seq++;
+} else {
Review Comment:
The testGetConsumerWatermarks already contains both cases.
```
PartitionGroupConsumptionStatus doneStatus = new
PartitionGroupConsumptionStatus(0, 100,
new LongMsgOffset(123), new LongMsgOffset(456), "done");
PartitionGroupConsumptionStatus inProgressStatus =
new PartitionGroupConsumptionStatus(1, 200, new LongMsgOffset(789),
null, "IN_PROGRESS");
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]
abhishekbafna commented on code in PR #17235:
URL: https://github.com/apache/pinot/pull/17235#discussion_r2668973896
##
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/CopyTablePayload.java:
##
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api.resources;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonGetter;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.util.Map;
+import javax.annotation.Nullable;
+
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class CopyTablePayload {
+
+ private String _sourceClusterUri;
+ private Map _headers;
+ /**
+ * Broker tenant for the new table.
+ * MUST NOT contain the tenant type suffix, i.e. _BROKER.
+ */
+ private String _brokerTenant;
+ /**
+ * Server tenant for the new table.
+ * MUST NOT contain the tenant type suffix, i.e. _REALTIME or _OFFLINE.
+ */
+ private String _serverTenant;
+
+ /**
+ * The instanceAssignmentConfig's tagPoolConfig contains full tenant name.
We will use this field to let user specify
+ * the replacement relation from source cluster's full tenant to target
cluster's full tenant.
+ */
+ private Map _tagPoolReplacementMap;
+
+ private boolean _verbose = false;
+ private boolean _dryRun = true;
Review Comment:
These should be API parameters and not copy payload. The copy payload should
only contain table config specific key-value pairs only.
##
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##
@@ -283,6 +290,136 @@ public ConfigSuccessResponse addTable(String
tableConfigStr,
}
}
+ @POST
+ @Path("/tables/{tableName}/copy")
+ @Authorize(targetType = TargetType.TABLE, action =
Actions.Table.CREATE_TABLE)
+ @Produces(MediaType.APPLICATION_JSON)
+ @ApiOperation(value = "Copy a table's schema and config from another
cluster", notes = "Non upsert table only")
+ public CopyTableResponse copyTable(
+ @ApiParam(value = "Name of the table", required = true)
@PathParam("tableName") String tableName, String payload,
+ @Context HttpHeaders headers) {
+try {
+ LOGGER.info("[copyTable] received request for table: {}, payload: {}",
tableName, payload);
+ tableName = DatabaseUtils.translateTableName(tableName, headers);
+
+ if
(_pinotHelixResourceManager.getTableConfig(TableNameBuilder.REALTIME.tableNameWithType(tableName))
!= null
+ ||
_pinotHelixResourceManager.getTableConfig(TableNameBuilder.OFFLINE.tableNameWithType(tableName))
!= null) {
+throw new TableAlreadyExistsException("Table config for " + tableName
++ " already exists. If this is unexpected, try deleting the table
to remove all metadata associated"
++ " with it before attempting to recreate.");
+ }
+
+ CopyTablePayload copyTablePayload = JsonUtils.stringToObject(payload,
CopyTablePayload.class);
+ String sourceControllerUri = copyTablePayload.getSourceClusterUri();
+ Map requestHeaders = copyTablePayload.getHeaders();
+
+ LOGGER.info("[copyTable] Start copying table: {} from source: {}",
tableName, sourceControllerUri);
+
+ ControllerRequestURLBuilder urlBuilder =
ControllerRequestURLBuilder.baseUrl(sourceControllerUri);
+
+ URI schemaUri = new URI(urlBuilder.forTableSchemaGet(tableName));
+ SimpleHttpResponse schemaResponse = HttpClient.wrapAndThrowHttpException(
+ HttpClient.getInstance().sendGetRequest(schemaUri, requestHeaders));
+ String schemaJson = schemaResponse.getResponse();
+ Schema schema = Schema.fromString(schemaJson);
+
+ URI tableConfigUri = new URI(urlBuilder.forTableGet(tableName));
+ SimpleHttpResponse tableConfigResponse =
HttpClient.wrapAndThrowHttpException(
+ HttpClient.getInstance().sendGetRequest(tableConfigUri,
requestHeaders));
+ String tableConfigJson = tableConfigResponse.getResponse();
+ LOGGER.info("[copyTable] Fetched table config for table: {}", tableName);
+ JsonNode
Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]
wirybeaver commented on code in PR #17235:
URL: https://github.com/apache/pinot/pull/17235#discussion_r2666991674
##
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java:
##
@@ -449,6 +451,25 @@ public String getPauselessTableDebugInfo(
}
}
+
+ @GET
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("/tables/{tableName}/consumerWatermarks")
+ @Authorize(targetType = TargetType.TABLE, paramName = "tableName", action =
Actions.Table.GET_IDEAL_STATE)
+ @ApiOperation(value = "Get table ideal state", notes = "Get table ideal
state")
+ public WatermarkInductionResult inductConsumingWatermark(
+ @ApiParam(value = "Name of the realtime table", required = true)
@PathParam("tableName") String tableName,
+ @Context HttpHeaders headers) {
+try {
+ String table = DatabaseUtils.translateTableName(tableName, headers);
Review Comment:
the segment itself has cdc, likely include the table name
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]
wirybeaver commented on code in PR #17235:
URL: https://github.com/apache/pinot/pull/17235#discussion_r2666991156
##
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##
@@ -283,6 +289,109 @@ public ConfigSuccessResponse addTable(String
tableConfigStr,
}
}
+ @POST
+ @Path("/tables/{tableName}/copy")
+ @Authorize(targetType = TargetType.TABLE, action =
Actions.Table.CREATE_TABLE)
+ @Produces(MediaType.APPLICATION_JSON)
+ @ApiOperation(value = "Copy a table's schema and config from another
cluster", notes = "Non upsert table only")
+ public CopyTableResponse copyTable(
Review Comment:
Yeah, already support the dryrun mode with a boolean flag in the input
payload
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]
wirybeaver commented on PR #17235: URL: https://github.com/apache/pinot/pull/17235#issuecomment-3717213852 MSE against 1.3 failed but succeed against 1.4 and master -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]
wirybeaver commented on PR #17235: URL: https://github.com/apache/pinot/pull/17235#issuecomment-3717126545 @abhishekbafna I add the dryRun as a boolean flag in the payload. The reply contains schema, modified table config and watermarkResult as well. The write operation (schema and table config addition) is moved to the end. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]
wirybeaver commented on code in PR #17235:
URL: https://github.com/apache/pinot/pull/17235#discussion_r2666889807
##
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##
@@ -283,6 +289,118 @@ public ConfigSuccessResponse addTable(String
tableConfigStr,
}
}
+ @POST
+ @Path("/tables/{tableName}/copy")
+ @Authorize(targetType = TargetType.TABLE, action =
Actions.Table.CREATE_TABLE)
+ @Produces(MediaType.APPLICATION_JSON)
+ @ApiOperation(value = "Copy a table's schema and config from another
cluster", notes = "Non upsert table only")
+ public CopyTableResponse copyTable(
+ @ApiParam(value = "Name of the table", required = true)
@PathParam("tableName") String tableName, String payload,
+ @Context HttpHeaders headers) {
+try {
+ LOGGER.info("[copyTable] received request for table: {}, payload: {}",
tableName, payload);
+ tableName = DatabaseUtils.translateTableName(tableName, headers);
+ CopyTablePayload copyTablePayload = JsonUtils.stringToObject(payload,
CopyTablePayload.class);
+ String sourceControllerUri = copyTablePayload.getSourceClusterUri();
+ Map requestHeaders = copyTablePayload.getHeaders();
+ String brokerTenant = copyTablePayload.getBrokerTenant();
+ String serverTenant = copyTablePayload.getServerTenant();
+ Map tagReplacementMap =
copyTablePayload.getTagPoolReplacementMap();
+
+ LOGGER.info("[copyTable] Start copying table: {} from source: {}",
tableName, sourceControllerUri);
+
+ // Fetch and add schema
+ URI schemaUri = new URI(sourceControllerUri + "/tables/" + tableName +
"/schema");
+ SimpleHttpResponse schemaResponse = HttpClient.wrapAndThrowHttpException(
+ HttpClient.getInstance().sendGetRequest(schemaUri, requestHeaders));
+ String schemaJson = schemaResponse.getResponse();
+ Schema schema = Schema.fromString(schemaJson);
+ _pinotHelixResourceManager.addSchema(schema, true, false);
+ LOGGER.info("[copyTable] Successfully added schema for table: {},
schema: {}", tableName, schema);
+
+ // Fetch and add table configs
+ URI tableConfigUri = new URI(sourceControllerUri + "/tables/" +
tableName);
+ SimpleHttpResponse tableConfigResponse =
HttpClient.wrapAndThrowHttpException(
+ HttpClient.getInstance().sendGetRequest(tableConfigUri,
requestHeaders));
+ String tableConfigJson = tableConfigResponse.getResponse();
+ LOGGER.info("[copyTable] Fetched table config for table: {},
tableConfig: {}", tableName, tableConfigJson);
+ JsonNode tableConfigNode = JsonUtils.stringToJsonNode(tableConfigJson);
+
+ boolean hasOffline = tableConfigNode.has(TableType.OFFLINE.name());
+ if (tableConfigNode.has(TableType.REALTIME.name())) {
+ObjectNode realtimeTableConfigNode = (ObjectNode)
tableConfigNode.get(TableType.REALTIME.name());
+tweakRealtimeTableConfig(realtimeTableConfigNode, brokerTenant,
serverTenant, tagReplacementMap);
+TableConfig realtimeTableConfig =
JsonUtils.jsonNodeToObject(realtimeTableConfigNode, TableConfig.class);
+if (realtimeTableConfig.getUpsertConfig() != null) {
+ return new CopyTableResponse("fail", "upsert table copy not
supported");
+}
+LOGGER.info("[copyTable] Successfully fetched and tweaked table config
for table: {}, tableConfig: {}",
+tableName, realtimeTableConfig.toString());
+
+URI watermarkUri = new URI(sourceControllerUri + "/tables/" +
tableName + "/consumerWatermarks");
+SimpleHttpResponse watermarkResponse =
HttpClient.wrapAndThrowHttpException(
+HttpClient.getInstance().sendGetRequest(watermarkUri,
requestHeaders));
+String watermarkJson = watermarkResponse.getResponse();
+LOGGER.info("[copyTable] Fetched watermarks for table: {}. Result:
{}", tableName, watermarkJson);
+WatermarkInductionResult watermarkInductionResult =
+JsonUtils.stringToObject(watermarkJson,
WatermarkInductionResult.class);
+
+List partitionGroupInfos =
watermarkInductionResult.getWatermarks().stream()
+.map(PartitionGroupInfo::from)
+.collect(Collectors.toList());
+
+// Add the table with designated starting kafka offset and segment
sequence number to create consuming segments
+_pinotHelixResourceManager.addTable(realtimeTableConfig,
partitionGroupInfos);
+if (hasOffline) {
+ return new CopyTableResponse("warn", "detect offline; copy real-time
segments only");
+}
+return new CopyTableResponse("success", "");
+ }
+ return new CopyTableResponse("fail", "copying offline table's segments
is not supported yet");
Review Comment:
yeah, throw exception
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comme
Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]
wirybeaver commented on code in PR #17235:
URL: https://github.com/apache/pinot/pull/17235#discussion_r2666889336
##
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##
@@ -283,6 +289,118 @@ public ConfigSuccessResponse addTable(String
tableConfigStr,
}
}
+ @POST
+ @Path("/tables/{tableName}/copy")
+ @Authorize(targetType = TargetType.TABLE, action =
Actions.Table.CREATE_TABLE)
+ @Produces(MediaType.APPLICATION_JSON)
+ @ApiOperation(value = "Copy a table's schema and config from another
cluster", notes = "Non upsert table only")
+ public CopyTableResponse copyTable(
+ @ApiParam(value = "Name of the table", required = true)
@PathParam("tableName") String tableName, String payload,
+ @Context HttpHeaders headers) {
+try {
+ LOGGER.info("[copyTable] received request for table: {}, payload: {}",
tableName, payload);
+ tableName = DatabaseUtils.translateTableName(tableName, headers);
+ CopyTablePayload copyTablePayload = JsonUtils.stringToObject(payload,
CopyTablePayload.class);
+ String sourceControllerUri = copyTablePayload.getSourceClusterUri();
+ Map requestHeaders = copyTablePayload.getHeaders();
+ String brokerTenant = copyTablePayload.getBrokerTenant();
+ String serverTenant = copyTablePayload.getServerTenant();
+ Map tagReplacementMap =
copyTablePayload.getTagPoolReplacementMap();
+
+ LOGGER.info("[copyTable] Start copying table: {} from source: {}",
tableName, sourceControllerUri);
+
+ // Fetch and add schema
+ URI schemaUri = new URI(sourceControllerUri + "/tables/" + tableName +
"/schema");
+ SimpleHttpResponse schemaResponse = HttpClient.wrapAndThrowHttpException(
+ HttpClient.getInstance().sendGetRequest(schemaUri, requestHeaders));
+ String schemaJson = schemaResponse.getResponse();
+ Schema schema = Schema.fromString(schemaJson);
+ _pinotHelixResourceManager.addSchema(schema, true, false);
+ LOGGER.info("[copyTable] Successfully added schema for table: {},
schema: {}", tableName, schema);
+
+ // Fetch and add table configs
+ URI tableConfigUri = new URI(sourceControllerUri + "/tables/" +
tableName);
+ SimpleHttpResponse tableConfigResponse =
HttpClient.wrapAndThrowHttpException(
+ HttpClient.getInstance().sendGetRequest(tableConfigUri,
requestHeaders));
+ String tableConfigJson = tableConfigResponse.getResponse();
+ LOGGER.info("[copyTable] Fetched table config for table: {},
tableConfig: {}", tableName, tableConfigJson);
+ JsonNode tableConfigNode = JsonUtils.stringToJsonNode(tableConfigJson);
+
+ boolean hasOffline = tableConfigNode.has(TableType.OFFLINE.name());
+ if (tableConfigNode.has(TableType.REALTIME.name())) {
+ObjectNode realtimeTableConfigNode = (ObjectNode)
tableConfigNode.get(TableType.REALTIME.name());
+tweakRealtimeTableConfig(realtimeTableConfigNode, brokerTenant,
serverTenant, tagReplacementMap);
+TableConfig realtimeTableConfig =
JsonUtils.jsonNodeToObject(realtimeTableConfigNode, TableConfig.class);
+if (realtimeTableConfig.getUpsertConfig() != null) {
+ return new CopyTableResponse("fail", "upsert table copy not
supported");
+}
Review Comment:
yeah, you are right. the add schema is moved after validation check
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]
wirybeaver commented on code in PR #17235:
URL: https://github.com/apache/pinot/pull/17235#discussion_r2666379437
##
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##
@@ -1789,10 +1807,16 @@ public void addTable(TableConfig tableConfig)
// Add ideal state
_helixAdmin.addResource(_helixClusterName, tableNameWithType,
idealState);
LOGGER.info("Adding table {}: Added ideal state for offline table",
tableNameWithType);
- } else {
+ } else if (consumeMeta == null || consumeMeta.isEmpty()) {
// Add ideal state with the first CONSUMING segment
_pinotLLCRealtimeSegmentManager.setUpNewTable(tableConfig, idealState);
LOGGER.info("Adding table {}: Added ideal state with first consuming
segment", tableNameWithType);
+ } else {
+// Add ideal state with the first CONSUMING segment with designated
partition consuming metadata
+// Add ideal state with the first CONSUMING segment
+_pinotLLCRealtimeSegmentManager.setUpNewTable(tableConfig, idealState,
consumeMeta);
+LOGGER.info("Adding table {}: Added consuming segments ideal state
given the designated consuming metadata",
Review Comment:
This routine can be shared for other potential purposes. for example, a use
can create a table with kafka offset they wanted to ingestion from
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]
abhishekbafna commented on code in PR #17235:
URL: https://github.com/apache/pinot/pull/17235#discussion_r2664718282
##
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##
@@ -283,6 +289,118 @@ public ConfigSuccessResponse addTable(String
tableConfigStr,
}
}
+ @POST
+ @Path("/tables/{tableName}/copy")
+ @Authorize(targetType = TargetType.TABLE, action =
Actions.Table.CREATE_TABLE)
+ @Produces(MediaType.APPLICATION_JSON)
+ @ApiOperation(value = "Copy a table's schema and config from another
cluster", notes = "Non upsert table only")
+ public CopyTableResponse copyTable(
+ @ApiParam(value = "Name of the table", required = true)
@PathParam("tableName") String tableName, String payload,
+ @Context HttpHeaders headers) {
+try {
+ LOGGER.info("[copyTable] received request for table: {}, payload: {}",
tableName, payload);
+ tableName = DatabaseUtils.translateTableName(tableName, headers);
+ CopyTablePayload copyTablePayload = JsonUtils.stringToObject(payload,
CopyTablePayload.class);
+ String sourceControllerUri = copyTablePayload.getSourceClusterUri();
+ Map requestHeaders = copyTablePayload.getHeaders();
+ String brokerTenant = copyTablePayload.getBrokerTenant();
+ String serverTenant = copyTablePayload.getServerTenant();
+ Map tagReplacementMap =
copyTablePayload.getTagPoolReplacementMap();
+
+ LOGGER.info("[copyTable] Start copying table: {} from source: {}",
tableName, sourceControllerUri);
+
+ // Fetch and add schema
+ URI schemaUri = new URI(sourceControllerUri + "/tables/" + tableName +
"/schema");
Review Comment:
Use `ControllerRequestURLBuilder` for building URLs.
##
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##
@@ -283,6 +289,118 @@ public ConfigSuccessResponse addTable(String
tableConfigStr,
}
}
+ @POST
+ @Path("/tables/{tableName}/copy")
+ @Authorize(targetType = TargetType.TABLE, action =
Actions.Table.CREATE_TABLE)
+ @Produces(MediaType.APPLICATION_JSON)
+ @ApiOperation(value = "Copy a table's schema and config from another
cluster", notes = "Non upsert table only")
+ public CopyTableResponse copyTable(
+ @ApiParam(value = "Name of the table", required = true)
@PathParam("tableName") String tableName, String payload,
+ @Context HttpHeaders headers) {
+try {
+ LOGGER.info("[copyTable] received request for table: {}, payload: {}",
tableName, payload);
+ tableName = DatabaseUtils.translateTableName(tableName, headers);
+ CopyTablePayload copyTablePayload = JsonUtils.stringToObject(payload,
CopyTablePayload.class);
+ String sourceControllerUri = copyTablePayload.getSourceClusterUri();
+ Map requestHeaders = copyTablePayload.getHeaders();
+ String brokerTenant = copyTablePayload.getBrokerTenant();
+ String serverTenant = copyTablePayload.getServerTenant();
+ Map tagReplacementMap =
copyTablePayload.getTagPoolReplacementMap();
+
+ LOGGER.info("[copyTable] Start copying table: {} from source: {}",
tableName, sourceControllerUri);
+
+ // Fetch and add schema
+ URI schemaUri = new URI(sourceControllerUri + "/tables/" + tableName +
"/schema");
+ SimpleHttpResponse schemaResponse = HttpClient.wrapAndThrowHttpException(
+ HttpClient.getInstance().sendGetRequest(schemaUri, requestHeaders));
+ String schemaJson = schemaResponse.getResponse();
+ Schema schema = Schema.fromString(schemaJson);
+ _pinotHelixResourceManager.addSchema(schema, true, false);
+ LOGGER.info("[copyTable] Successfully added schema for table: {},
schema: {}", tableName, schema);
+
+ // Fetch and add table configs
+ URI tableConfigUri = new URI(sourceControllerUri + "/tables/" +
tableName);
+ SimpleHttpResponse tableConfigResponse =
HttpClient.wrapAndThrowHttpException(
+ HttpClient.getInstance().sendGetRequest(tableConfigUri,
requestHeaders));
+ String tableConfigJson = tableConfigResponse.getResponse();
+ LOGGER.info("[copyTable] Fetched table config for table: {},
tableConfig: {}", tableName, tableConfigJson);
+ JsonNode tableConfigNode = JsonUtils.stringToJsonNode(tableConfigJson);
+
+ boolean hasOffline = tableConfigNode.has(TableType.OFFLINE.name());
+ if (tableConfigNode.has(TableType.REALTIME.name())) {
+ObjectNode realtimeTableConfigNode = (ObjectNode)
tableConfigNode.get(TableType.REALTIME.name());
+tweakRealtimeTableConfig(realtimeTableConfigNode, brokerTenant,
serverTenant, tagReplacementMap);
+TableConfig realtimeTableConfig =
JsonUtils.jsonNodeToObject(realtimeTableConfigNode, TableConfig.class);
+if (realtimeTableConfig.getUpsertConfig() != null) {
+ return new CopyTableResponse("fail", "upsert table copy not
supported");
+}
Review Comment:
Le
Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]
wirybeaver commented on code in PR #17235:
URL: https://github.com/apache/pinot/pull/17235#discussion_r2663540038
##
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/WatermarkInductionResult.java:
##
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonGetter;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.util.List;
+
+
+/**
+ * Represents the result of a watermark induction process, containing a list
of watermarks.
+ */
+public class WatermarkInductionResult {
+
+ private List _watermarks;
+
+ /**
+ * The @JsonCreator annotation marks this constructor to be used for
deserializing
+ * a JSON array back into a WaterMarks object.
+ *
+ * @param watermarks The list of watermarks.
+ */
+ @JsonCreator
+ public WatermarkInductionResult(@JsonProperty("watermarks") List
watermarks) {
+_watermarks = watermarks;
+ }
+
+ /**
+ * Gets the list of watermarks.
+ *
+ * @return The list of watermarks.
+ */
+ @JsonGetter("watermarks")
+ public List getWatermarks() {
+return _watermarks;
+ }
+
+ /**
+ * Represents a single watermark with its partition, sequence, and offset.
+ */
+ public static class Watermark {
+private long _partitionGroupId;
+private long _sequenceNumber;
+private long _offset;
+
+/**
+ * The @JsonCreator annotation tells Jackson to use this constructor to
create
+ * a WaterMark instance from a JSON object. The @JsonProperty annotations
+ * map the keys in the JSON object to the constructor parameters.
+ *
+ * @param partitionGroupId The ID of the partition group.
+ * @param sequenceNumber The sequence number of the watermark.
+ * @param offset The offset of the watermark.
+ */
+@JsonCreator
+public Watermark(@JsonProperty("partitionGroupId") long partitionGroupId,
+@JsonProperty("sequenceNumber") long sequenceNumber,
@JsonProperty("offset") long offset) {
+ _partitionGroupId = partitionGroupId;
+ _sequenceNumber = sequenceNumber;
+ _offset = offset;
+}
+
+/**
+ * Gets the partition group ID.
+ *
+ * @return The partition group ID.
+ */
+@JsonGetter("partitionGroupId")
+public long getPartitionGroupId() {
+ return _partitionGroupId;
+}
+
+/**
+ * Gets the sequence number.
+ *
+ * @return The sequence number.
+ */
+@JsonGetter("sequenceNumber")
+public long getSequenceNumber() {
+ return _sequenceNumber;
+}
+
+/**
+ * Gets the offset.
Review Comment:
Add a javadoc to explain which kafka offset it represents
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]
wirybeaver commented on code in PR #17235:
URL: https://github.com/apache/pinot/pull/17235#discussion_r2663376742
##
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/WatermarkInductionResult.java:
##
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonGetter;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.util.List;
+
+
+/**
+ * Represents the result of a watermark induction process, containing a list
of watermarks.
+ */
+public class WatermarkInductionResult {
+
+ private List _watermarks;
+
+ /**
+ * The @JsonCreator annotation marks this constructor to be used for
deserializing
+ * a JSON array back into a WaterMarks object.
+ *
+ * @param watermarks The list of watermarks.
+ */
+ @JsonCreator
+ public WatermarkInductionResult(@JsonProperty("watermarks") List
watermarks) {
+_watermarks = watermarks;
+ }
+
+ /**
+ * Gets the list of watermarks.
+ *
+ * @return The list of watermarks.
+ */
+ @JsonGetter("watermarks")
+ public List getWatermarks() {
+return _watermarks;
+ }
+
+ /**
+ * Represents a single watermark with its partition, sequence, and offset.
+ */
+ public static class Watermark {
+private long _partitionGroupId;
+private long _sequenceNumber;
+private long _offset;
+
+/**
+ * The @JsonCreator annotation tells Jackson to use this constructor to
create
+ * a WaterMark instance from a JSON object. The @JsonProperty annotations
+ * map the keys in the JSON object to the constructor parameters.
+ *
+ * @param partitionGroupId The ID of the partition group.
+ * @param sequenceNumber The sequence number of the watermark.
+ * @param offset The offset of the watermark.
+ */
+@JsonCreator
+public Watermark(@JsonProperty("partitionGroupId") long partitionGroupId,
+@JsonProperty("sequenceNumber") long sequenceNumber,
@JsonProperty("offset") long offset) {
+ _partitionGroupId = partitionGroupId;
+ _sequenceNumber = sequenceNumber;
+ _offset = offset;
+}
+
+/**
+ * Gets the partition group ID.
+ *
+ * @return The partition group ID.
+ */
+@JsonGetter("partitionGroupId")
+public long getPartitionGroupId() {
+ return _partitionGroupId;
+}
+
+/**
+ * Gets the sequence number.
+ *
+ * @return The sequence number.
+ */
+@JsonGetter("sequenceNumber")
+public long getSequenceNumber() {
+ return _sequenceNumber;
+}
+
+/**
+ * Gets the offset.
Review Comment:
you are right
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]
wirybeaver commented on code in PR #17235:
URL: https://github.com/apache/pinot/pull/17235#discussion_r2663375714
##
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/WatermarkInductionResult.java:
##
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonGetter;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.util.List;
+
+
+/**
+ * Represents the result of a watermark induction process, containing a list
of watermarks.
+ */
+public class WatermarkInductionResult {
+
+ private List _watermarks;
+
+ /**
+ * The @JsonCreator annotation marks this constructor to be used for
deserializing
+ * a JSON array back into a WaterMarks object.
+ *
+ * @param watermarks The list of watermarks.
+ */
+ @JsonCreator
+ public WatermarkInductionResult(@JsonProperty("watermarks") List
watermarks) {
+_watermarks = watermarks;
+ }
+
+ /**
+ * Gets the list of watermarks.
+ *
+ * @return The list of watermarks.
+ */
+ @JsonGetter("watermarks")
+ public List getWatermarks() {
+return _watermarks;
+ }
+
+ /**
+ * Represents a single watermark with its partition, sequence, and offset.
Review Comment:
sure, will modify the javadoc
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]
wirybeaver commented on code in PR #17235:
URL: https://github.com/apache/pinot/pull/17235#discussion_r2663374750
##
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java:
##
@@ -449,6 +451,25 @@ public String getPauselessTableDebugInfo(
}
}
+
+ @GET
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("/tables/{tableName}/consumerWatermarks")
+ @Authorize(targetType = TargetType.TABLE, paramName = "tableName", action =
Actions.Table.GET_IDEAL_STATE)
+ @ApiOperation(value = "Get table ideal state", notes = "Get table ideal
state")
+ public WatermarkInductionResult inductConsumingWatermark(
Review Comment:
I can use get.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]
wirybeaver commented on code in PR #17235:
URL: https://github.com/apache/pinot/pull/17235#discussion_r2663374219
##
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java:
##
@@ -449,6 +451,25 @@ public String getPauselessTableDebugInfo(
}
}
+
+ @GET
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("/tables/{tableName}/consumerWatermarks")
Review Comment:
It's get instead add. The target cluster will invoke this api against the
source cluster, the source cluster will return the current consumerWatermarks.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]
chenboat commented on code in PR #17235:
URL: https://github.com/apache/pinot/pull/17235#discussion_r2663328916
##
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java:
##
@@ -449,6 +451,25 @@ public String getPauselessTableDebugInfo(
}
}
+
+ @GET
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("/tables/{tableName}/consumerWatermarks")
+ @Authorize(targetType = TargetType.TABLE, paramName = "tableName", action =
Actions.Table.GET_IDEAL_STATE)
+ @ApiOperation(value = "Get table ideal state", notes = "Get table ideal
state")
+ public WatermarkInductionResult inductConsumingWatermark(
Review Comment:
Consider to use simple and straightforward verb like `add` rather than
`induct` to easier understanding.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]
chenboat commented on code in PR #17235:
URL: https://github.com/apache/pinot/pull/17235#discussion_r2663327546
##
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java:
##
@@ -449,6 +451,25 @@ public String getPauselessTableDebugInfo(
}
}
+
+ @GET
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("/tables/{tableName}/consumerWatermarks")
Review Comment:
better to be consistent with method name. change to addConsumingWatermarks?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]
chenboat commented on code in PR #17235:
URL: https://github.com/apache/pinot/pull/17235#discussion_r2663324436
##
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/WatermarkInductionResult.java:
##
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonGetter;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.util.List;
+
+
+/**
+ * Represents the result of a watermark induction process, containing a list
of watermarks.
+ */
+public class WatermarkInductionResult {
+
+ private List _watermarks;
+
+ /**
+ * The @JsonCreator annotation marks this constructor to be used for
deserializing
+ * a JSON array back into a WaterMarks object.
+ *
+ * @param watermarks The list of watermarks.
+ */
+ @JsonCreator
+ public WatermarkInductionResult(@JsonProperty("watermarks") List
watermarks) {
+_watermarks = watermarks;
+ }
+
+ /**
+ * Gets the list of watermarks.
+ *
+ * @return The list of watermarks.
+ */
+ @JsonGetter("watermarks")
+ public List getWatermarks() {
+return _watermarks;
+ }
+
+ /**
+ * Represents a single watermark with its partition, sequence, and offset.
Review Comment:
Here you use partition but the code variable is partitionGroupId? It needs
to be consistent.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]
chenboat commented on code in PR #17235:
URL: https://github.com/apache/pinot/pull/17235#discussion_r2663321756
##
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/WatermarkInductionResult.java:
##
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonGetter;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.util.List;
+
+
+/**
+ * Represents the result of a watermark induction process, containing a list
of watermarks.
+ */
+public class WatermarkInductionResult {
+
+ private List _watermarks;
+
+ /**
+ * The @JsonCreator annotation marks this constructor to be used for
deserializing
+ * a JSON array back into a WaterMarks object.
+ *
+ * @param watermarks The list of watermarks.
+ */
+ @JsonCreator
+ public WatermarkInductionResult(@JsonProperty("watermarks") List
watermarks) {
+_watermarks = watermarks;
+ }
+
+ /**
+ * Gets the list of watermarks.
+ *
+ * @return The list of watermarks.
+ */
+ @JsonGetter("watermarks")
+ public List getWatermarks() {
+return _watermarks;
+ }
+
+ /**
+ * Represents a single watermark with its partition, sequence, and offset.
+ */
+ public static class Watermark {
+private long _partitionGroupId;
+private long _sequenceNumber;
+private long _offset;
+
+/**
+ * The @JsonCreator annotation tells Jackson to use this constructor to
create
+ * a WaterMark instance from a JSON object. The @JsonProperty annotations
+ * map the keys in the JSON object to the constructor parameters.
+ *
+ * @param partitionGroupId The ID of the partition group.
+ * @param sequenceNumber The sequence number of the watermark.
+ * @param offset The offset of the watermark.
+ */
+@JsonCreator
+public Watermark(@JsonProperty("partitionGroupId") long partitionGroupId,
+@JsonProperty("sequenceNumber") long sequenceNumber,
@JsonProperty("offset") long offset) {
+ _partitionGroupId = partitionGroupId;
+ _sequenceNumber = sequenceNumber;
+ _offset = offset;
+}
+
+/**
+ * Gets the partition group ID.
+ *
+ * @return The partition group ID.
+ */
+@JsonGetter("partitionGroupId")
+public long getPartitionGroupId() {
+ return _partitionGroupId;
+}
+
+/**
+ * Gets the sequence number.
+ *
+ * @return The sequence number.
+ */
+@JsonGetter("sequenceNumber")
+public long getSequenceNumber() {
+ return _sequenceNumber;
+}
+
+/**
+ * Gets the offset.
Review Comment:
Instead of just repeating the method name, can you be more specific? This is
the offset of the stream partition?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]
chenboat commented on code in PR #17235:
URL: https://github.com/apache/pinot/pull/17235#discussion_r2663313232
##
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##
@@ -4737,6 +4761,40 @@ public QueryWorkloadManager getQueryWorkloadManager() {
return _queryWorkloadManager;
}
+ public WatermarkInductionResult inductConsumingWatermarks(String tableName)
throws TableNotFoundException {
Review Comment:
Please add javadoc and comments on this public method.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]
wirybeaver commented on PR #17235: URL: https://github.com/apache/pinot/pull/17235#issuecomment-3704082260 Walk through the design doc with @abhishekbafna on 12/23 and aligned on the direction of the solution. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]
wirybeaver commented on PR #17235: URL: https://github.com/apache/pinot/pull/17235#issuecomment-3672837020 The unit test Set-1 fail in java 21 but succeed in java 11. The error is about "selectionCombineOperator Not found issue", which is nothing related to table copy. I just do a rebase without any conflict resolving. Previous tests were all successful. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]
wirybeaver commented on code in PR #17235:
URL: https://github.com/apache/pinot/pull/17235#discussion_r2632926517
##
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##
@@ -283,6 +289,109 @@ public ConfigSuccessResponse addTable(String
tableConfigStr,
}
}
+ @POST
+ @Path("/tables/{tableName}/copy")
+ @Authorize(targetType = TargetType.TABLE, action =
Actions.Table.CREATE_TABLE)
+ @Produces(MediaType.APPLICATION_JSON)
+ @ApiOperation(value = "Copy a table's schema and config from another
cluster", notes = "Non upsert table only")
+ public CopyTableResponse copyTable(
+ @ApiParam(value = "Name of the table", required = true)
@PathParam("tableName") String tableName, String payload,
+ @Context HttpHeaders headers) {
+try {
+ LOGGER.info("[copyTable] received request for table: {}, payload: {}",
tableName, payload);
+ tableName = DatabaseUtils.translateTableName(tableName, headers);
+ CopyTablePayload copyTablePayload = JsonUtils.stringToObject(payload,
CopyTablePayload.class);
+ String sourceControllerUri = copyTablePayload.getSourceClusterUri();
+ Map requestHeaders = copyTablePayload.getHeaders();
+ String brokerTenant = copyTablePayload.getBrokerTenant();
+ String serverTenant = copyTablePayload.getServerTenant();
+ Map tagReplacementMap =
copyTablePayload.getTagPoolReplacementMap();
+
+ LOGGER.info("[copyTable] Start copying table: {} from source: {}",
tableName, sourceControllerUri);
+
+ // Fetch and add schema
+ URI schemaUri = new URI(sourceControllerUri + "/tables/" + tableName +
"/schema");
+ SimpleHttpResponse schemaResponse = HttpClient.wrapAndThrowHttpException(
+ HttpClient.getInstance().sendGetRequest(schemaUri, requestHeaders));
+ String schemaJson = schemaResponse.getResponse();
+ Schema schema = Schema.fromString(schemaJson);
+ _pinotHelixResourceManager.addSchema(schema, true, false);
+ LOGGER.info("[copyTable] Successfully added schema for table: {},
schema: {}", tableName, schema);
+
+ // Fetch and add table configs
+ URI tableConfigUri = new URI(sourceControllerUri + "/tables/" +
tableName);
+ SimpleHttpResponse tableConfigResponse =
HttpClient.wrapAndThrowHttpException(
+ HttpClient.getInstance().sendGetRequest(tableConfigUri,
requestHeaders));
+ String tableConfigJson = tableConfigResponse.getResponse();
+ LOGGER.info("[copyTable] Fetched table config for table: {},
tableConfig: {}", tableName, tableConfigJson);
+ JsonNode tableConfigNode = JsonUtils.stringToJsonNode(tableConfigJson);
+
+ if (tableConfigNode.has(TableType.REALTIME.name())) {
Review Comment:
for upsert and pure offline table, the status will be "fail" in reply. For
hybrid table, the status will be "warn" with message specifying only realtime
table config and segments is copied
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]
wirybeaver commented on code in PR #17235:
URL: https://github.com/apache/pinot/pull/17235#discussion_r2629468263
##
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##
@@ -283,6 +289,109 @@ public ConfigSuccessResponse addTable(String
tableConfigStr,
}
}
+ @POST
+ @Path("/tables/{tableName}/copy")
+ @Authorize(targetType = TargetType.TABLE, action =
Actions.Table.CREATE_TABLE)
+ @Produces(MediaType.APPLICATION_JSON)
+ @ApiOperation(value = "Copy a table's schema and config from another
cluster", notes = "Non upsert table only")
+ public CopyTableResponse copyTable(
Review Comment:
User can call the source clusters GET /tables//consumerWatermarks to
get the dry run result, which will show watermarks as well as segments will be
backfilled.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]
wirybeaver commented on code in PR #17235:
URL: https://github.com/apache/pinot/pull/17235#discussion_r2629468263
##
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##
@@ -283,6 +289,109 @@ public ConfigSuccessResponse addTable(String
tableConfigStr,
}
}
+ @POST
+ @Path("/tables/{tableName}/copy")
+ @Authorize(targetType = TargetType.TABLE, action =
Actions.Table.CREATE_TABLE)
+ @Produces(MediaType.APPLICATION_JSON)
+ @ApiOperation(value = "Copy a table's schema and config from another
cluster", notes = "Non upsert table only")
+ public CopyTableResponse copyTable(
Review Comment:
Use can call the source clusters GET /tables//consumerWatermarks to
get the dry run result, which will show watermarks as well as segments will be
backfilled.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]
wirybeaver commented on code in PR #17235:
URL: https://github.com/apache/pinot/pull/17235#discussion_r2630031852
##
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/CopyTablePayload.java:
##
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api.resources;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonGetter;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.util.Map;
+import javax.annotation.Nullable;
+
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class CopyTablePayload {
+
+ private String _sourceClusterUri;
+ private Map _headers;
+ /**
+ * Broker tenant for the new table.
+ * MUST NOT contain the tenant type suffix, i.e. _BROKER.
+ */
+ private String _brokerTenant;
+ /**
+ * Server tenant for the new table.
+ * MUST NOT contain the tenant type suffix, i.e. _REALTIME or _OFFLINE.
+ */
+ private String _serverTenant;
+
+ private Map _tagPoolReplacementMap;
Review Comment:
the instanceAssignmentConfig's tagPoolConfig contains full tenant name. We
will use this field to let user specify the replacement relation from source
cluster's full tenant to target cluster's full tenant.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]
wirybeaver commented on code in PR #17235:
URL: https://github.com/apache/pinot/pull/17235#discussion_r2630025570
##
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java:
##
@@ -449,6 +451,25 @@ public String getPauselessTableDebugInfo(
}
}
+
+ @GET
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("/tables/{tableName}/consumerWatermarks")
+ @Authorize(targetType = TargetType.TABLE, paramName = "tableName", action =
Actions.Table.GET_IDEAL_STATE)
+ @ApiOperation(value = "Get table ideal state", notes = "Get table ideal
state")
+ public WatermarkInductionResult inductConsumingWatermark(
+ @ApiParam(value = "Name of the realtime table", required = true)
@PathParam("tableName") String tableName,
+ @Context HttpHeaders headers) {
+try {
+ String table = DatabaseUtils.translateTableName(tableName, headers);
Review Comment:
you are right.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]
wirybeaver commented on code in PR #17235:
URL: https://github.com/apache/pinot/pull/17235#discussion_r2630024665
##
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##
@@ -283,6 +289,109 @@ public ConfigSuccessResponse addTable(String
tableConfigStr,
}
}
+ @POST
+ @Path("/tables/{tableName}/copy")
+ @Authorize(targetType = TargetType.TABLE, action =
Actions.Table.CREATE_TABLE)
+ @Produces(MediaType.APPLICATION_JSON)
+ @ApiOperation(value = "Copy a table's schema and config from another
cluster", notes = "Non upsert table only")
+ public CopyTableResponse copyTable(
+ @ApiParam(value = "Name of the table", required = true)
@PathParam("tableName") String tableName, String payload,
+ @Context HttpHeaders headers) {
+try {
+ LOGGER.info("[copyTable] received request for table: {}, payload: {}",
tableName, payload);
+ tableName = DatabaseUtils.translateTableName(tableName, headers);
+ CopyTablePayload copyTablePayload = JsonUtils.stringToObject(payload,
CopyTablePayload.class);
+ String sourceControllerUri = copyTablePayload.getSourceClusterUri();
+ Map requestHeaders = copyTablePayload.getHeaders();
+ String brokerTenant = copyTablePayload.getBrokerTenant();
+ String serverTenant = copyTablePayload.getServerTenant();
+ Map tagReplacementMap =
copyTablePayload.getTagPoolReplacementMap();
+
+ LOGGER.info("[copyTable] Start copying table: {} from source: {}",
tableName, sourceControllerUri);
+
+ // Fetch and add schema
+ URI schemaUri = new URI(sourceControllerUri + "/tables/" + tableName +
"/schema");
+ SimpleHttpResponse schemaResponse = HttpClient.wrapAndThrowHttpException(
+ HttpClient.getInstance().sendGetRequest(schemaUri, requestHeaders));
+ String schemaJson = schemaResponse.getResponse();
+ Schema schema = Schema.fromString(schemaJson);
+ _pinotHelixResourceManager.addSchema(schema, true, false);
Review Comment:
if the tableconfig already exists, an exception will throw. if the schema
exist, it won't throw exception
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]
wirybeaver commented on code in PR #17235:
URL: https://github.com/apache/pinot/pull/17235#discussion_r2630021797
##
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##
@@ -283,6 +289,109 @@ public ConfigSuccessResponse addTable(String
tableConfigStr,
}
}
+ @POST
+ @Path("/tables/{tableName}/copy")
+ @Authorize(targetType = TargetType.TABLE, action =
Actions.Table.CREATE_TABLE)
+ @Produces(MediaType.APPLICATION_JSON)
+ @ApiOperation(value = "Copy a table's schema and config from another
cluster", notes = "Non upsert table only")
+ public CopyTableResponse copyTable(
+ @ApiParam(value = "Name of the table", required = true)
@PathParam("tableName") String tableName, String payload,
+ @Context HttpHeaders headers) {
+try {
+ LOGGER.info("[copyTable] received request for table: {}, payload: {}",
tableName, payload);
+ tableName = DatabaseUtils.translateTableName(tableName, headers);
+ CopyTablePayload copyTablePayload = JsonUtils.stringToObject(payload,
CopyTablePayload.class);
+ String sourceControllerUri = copyTablePayload.getSourceClusterUri();
+ Map requestHeaders = copyTablePayload.getHeaders();
+ String brokerTenant = copyTablePayload.getBrokerTenant();
+ String serverTenant = copyTablePayload.getServerTenant();
+ Map tagReplacementMap =
copyTablePayload.getTagPoolReplacementMap();
+
+ LOGGER.info("[copyTable] Start copying table: {} from source: {}",
tableName, sourceControllerUri);
+
+ // Fetch and add schema
+ URI schemaUri = new URI(sourceControllerUri + "/tables/" + tableName +
"/schema");
+ SimpleHttpResponse schemaResponse = HttpClient.wrapAndThrowHttpException(
+ HttpClient.getInstance().sendGetRequest(schemaUri, requestHeaders));
+ String schemaJson = schemaResponse.getResponse();
+ Schema schema = Schema.fromString(schemaJson);
+ _pinotHelixResourceManager.addSchema(schema, true, false);
+ LOGGER.info("[copyTable] Successfully added schema for table: {},
schema: {}", tableName, schema);
+
+ // Fetch and add table configs
+ URI tableConfigUri = new URI(sourceControllerUri + "/tables/" +
tableName);
+ SimpleHttpResponse tableConfigResponse =
HttpClient.wrapAndThrowHttpException(
+ HttpClient.getInstance().sendGetRequest(tableConfigUri,
requestHeaders));
+ String tableConfigJson = tableConfigResponse.getResponse();
+ LOGGER.info("[copyTable] Fetched table config for table: {},
tableConfig: {}", tableName, tableConfigJson);
+ JsonNode tableConfigNode = JsonUtils.stringToJsonNode(tableConfigJson);
+
+ if (tableConfigNode.has(TableType.REALTIME.name())) {
+ObjectNode realtimeTableConfigNode = (ObjectNode)
tableConfigNode.get(TableType.REALTIME.name());
+tweakRealtimeTableConfig(realtimeTableConfigNode, brokerTenant,
serverTenant, tagReplacementMap);
+TableConfig realtimeTableConfig =
JsonUtils.jsonNodeToObject(realtimeTableConfigNode, TableConfig.class);
+LOGGER.info("[copyTable] Successfully fetched and tweaked table config
for table: {}, tableConfig: {}",
+tableName, realtimeTableConfig.toString());
+
+URI watermarkUri = new URI(sourceControllerUri + "/tables/" +
tableName + "/consumerWatermarks");
+SimpleHttpResponse watermarkResponse =
HttpClient.wrapAndThrowHttpException(
+HttpClient.getInstance().sendGetRequest(watermarkUri,
requestHeaders));
+String watermarkJson = watermarkResponse.getResponse();
+LOGGER.info("[copyTable] Fetched watermarks for table: {}. Result:
{}", tableName, watermarkJson);
+WatermarkInductionResult watermarkInductionResult =
+JsonUtils.stringToObject(watermarkJson,
WatermarkInductionResult.class);
+
+List partitionGroupInfos =
watermarkInductionResult.getWatermarks().stream()
+.map(PartitionGroupInfo::from)
+.collect(Collectors.toList());
+
+_pinotHelixResourceManager.addTable(realtimeTableConfig,
partitionGroupInfos);
+ }
+ LOGGER.info("[copyTable] Finished Table Config copy: {}", tableName);
+ return new CopyTableResponse("success");
+} catch (Exception e) {
+ LOGGER.error("[copyTable] Error copying table: {}", tableName, e);
+ throw new ControllerApplicationException(LOGGER, "Error copying table: "
+ e.getMessage(),
+ Response.Status.INTERNAL_SERVER_ERROR, e);
+}
+ }
+
+ /**
+ * Tweaks the realtime table config with the given broker and server tenants.
+ *
+ * @param realtimeTableConfigNode The JSON object representing the realtime
table config.
+ * @param brokerTenant The broker tenant to set in the config.
+ * @param serverTenant The server tenant to set in the config.
+ */
+ @VisibleForTesting
+ static void tweakRealtimeTableConfig(ObjectNode realtimeTableConfigNode,
String brokerTenant, String server
Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]
wirybeaver commented on code in PR #17235:
URL: https://github.com/apache/pinot/pull/17235#discussion_r2630022631
##
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##
@@ -283,6 +289,109 @@ public ConfigSuccessResponse addTable(String
tableConfigStr,
}
}
+ @POST
+ @Path("/tables/{tableName}/copy")
+ @Authorize(targetType = TargetType.TABLE, action =
Actions.Table.CREATE_TABLE)
+ @Produces(MediaType.APPLICATION_JSON)
+ @ApiOperation(value = "Copy a table's schema and config from another
cluster", notes = "Non upsert table only")
+ public CopyTableResponse copyTable(
+ @ApiParam(value = "Name of the table", required = true)
@PathParam("tableName") String tableName, String payload,
+ @Context HttpHeaders headers) {
+try {
+ LOGGER.info("[copyTable] received request for table: {}, payload: {}",
tableName, payload);
+ tableName = DatabaseUtils.translateTableName(tableName, headers);
+ CopyTablePayload copyTablePayload = JsonUtils.stringToObject(payload,
CopyTablePayload.class);
+ String sourceControllerUri = copyTablePayload.getSourceClusterUri();
+ Map requestHeaders = copyTablePayload.getHeaders();
+ String brokerTenant = copyTablePayload.getBrokerTenant();
+ String serverTenant = copyTablePayload.getServerTenant();
+ Map tagReplacementMap =
copyTablePayload.getTagPoolReplacementMap();
+
+ LOGGER.info("[copyTable] Start copying table: {} from source: {}",
tableName, sourceControllerUri);
+
+ // Fetch and add schema
+ URI schemaUri = new URI(sourceControllerUri + "/tables/" + tableName +
"/schema");
+ SimpleHttpResponse schemaResponse = HttpClient.wrapAndThrowHttpException(
+ HttpClient.getInstance().sendGetRequest(schemaUri, requestHeaders));
+ String schemaJson = schemaResponse.getResponse();
+ Schema schema = Schema.fromString(schemaJson);
+ _pinotHelixResourceManager.addSchema(schema, true, false);
+ LOGGER.info("[copyTable] Successfully added schema for table: {},
schema: {}", tableName, schema);
+
+ // Fetch and add table configs
+ URI tableConfigUri = new URI(sourceControllerUri + "/tables/" +
tableName);
+ SimpleHttpResponse tableConfigResponse =
HttpClient.wrapAndThrowHttpException(
+ HttpClient.getInstance().sendGetRequest(tableConfigUri,
requestHeaders));
+ String tableConfigJson = tableConfigResponse.getResponse();
+ LOGGER.info("[copyTable] Fetched table config for table: {},
tableConfig: {}", tableName, tableConfigJson);
+ JsonNode tableConfigNode = JsonUtils.stringToJsonNode(tableConfigJson);
+
+ if (tableConfigNode.has(TableType.REALTIME.name())) {
+ObjectNode realtimeTableConfigNode = (ObjectNode)
tableConfigNode.get(TableType.REALTIME.name());
+tweakRealtimeTableConfig(realtimeTableConfigNode, brokerTenant,
serverTenant, tagReplacementMap);
+TableConfig realtimeTableConfig =
JsonUtils.jsonNodeToObject(realtimeTableConfigNode, TableConfig.class);
+LOGGER.info("[copyTable] Successfully fetched and tweaked table config
for table: {}, tableConfig: {}",
+tableName, realtimeTableConfig.toString());
+
+URI watermarkUri = new URI(sourceControllerUri + "/tables/" +
tableName + "/consumerWatermarks");
+SimpleHttpResponse watermarkResponse =
HttpClient.wrapAndThrowHttpException(
+HttpClient.getInstance().sendGetRequest(watermarkUri,
requestHeaders));
+String watermarkJson = watermarkResponse.getResponse();
+LOGGER.info("[copyTable] Fetched watermarks for table: {}. Result:
{}", tableName, watermarkJson);
+WatermarkInductionResult watermarkInductionResult =
+JsonUtils.stringToObject(watermarkJson,
WatermarkInductionResult.class);
+
+List partitionGroupInfos =
watermarkInductionResult.getWatermarks().stream()
+.map(PartitionGroupInfo::from)
+.collect(Collectors.toList());
+
+_pinotHelixResourceManager.addTable(realtimeTableConfig,
partitionGroupInfos);
+ }
+ LOGGER.info("[copyTable] Finished Table Config copy: {}", tableName);
+ return new CopyTableResponse("success");
+} catch (Exception e) {
+ LOGGER.error("[copyTable] Error copying table: {}", tableName, e);
+ throw new ControllerApplicationException(LOGGER, "Error copying table: "
+ e.getMessage(),
+ Response.Status.INTERNAL_SERVER_ERROR, e);
+}
+ }
+
+ /**
+ * Tweaks the realtime table config with the given broker and server tenants.
+ *
+ * @param realtimeTableConfigNode The JSON object representing the realtime
table config.
+ * @param brokerTenant The broker tenant to set in the config.
+ * @param serverTenant The server tenant to set in the config.
+ */
+ @VisibleForTesting
+ static void tweakRealtimeTableConfig(ObjectNode realtimeTableConfigNode,
String brokerTenant, String server
Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]
wirybeaver commented on code in PR #17235:
URL: https://github.com/apache/pinot/pull/17235#discussion_r2630020019
##
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##
@@ -4737,6 +4761,37 @@ public QueryWorkloadManager getQueryWorkloadManager() {
return _queryWorkloadManager;
}
+ public WatermarkInductionResult inductConsumingWatermarks(String tableName)
throws TableNotFoundException {
+String tableNameWithType =
TableNameBuilder.REALTIME.tableNameWithType(tableName);
+if (!hasRealtimeTable(tableName)) {
+ throw new TableNotFoundException("Table " + tableNameWithType + " does
not exist");
+}
+TableConfig tableConfig = getTableConfig(tableNameWithType);
+Preconditions.checkNotNull(tableConfig, "Table " + tableNameWithType +
"exists but null tableConfig");
+List streamConfigs =
IngestionConfigUtils.getStreamConfigs(tableConfig);
+IdealState idealState = _helixAdmin
+.getResourceIdealState(getHelixClusterName(), tableNameWithType);
+List lst = _pinotLLCRealtimeSegmentManager
+.getPartitionGroupConsumptionStatusList(idealState, streamConfigs);
+List watermarks =
lst.stream().map(status -> {
+ long seq = status.getSequenceNumber();
+ long startOffset;
+ try {
+if (status.getStatus().equalsIgnoreCase("done")) {
Review Comment:
Yeah, good catch. done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]
wirybeaver commented on code in PR #17235:
URL: https://github.com/apache/pinot/pull/17235#discussion_r2630019232
##
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##
@@ -4737,6 +4761,37 @@ public QueryWorkloadManager getQueryWorkloadManager() {
return _queryWorkloadManager;
}
+ public WatermarkInductionResult inductConsumingWatermarks(String tableName)
throws TableNotFoundException {
+String tableNameWithType =
TableNameBuilder.REALTIME.tableNameWithType(tableName);
+if (!hasRealtimeTable(tableName)) {
+ throw new TableNotFoundException("Table " + tableNameWithType + " does
not exist");
+}
+TableConfig tableConfig = getTableConfig(tableNameWithType);
+Preconditions.checkNotNull(tableConfig, "Table " + tableNameWithType +
"exists but null tableConfig");
+List streamConfigs =
IngestionConfigUtils.getStreamConfigs(tableConfig);
+IdealState idealState = _helixAdmin
+.getResourceIdealState(getHelixClusterName(), tableNameWithType);
Review Comment:
add null check. done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]
wirybeaver commented on code in PR #17235:
URL: https://github.com/apache/pinot/pull/17235#discussion_r2629951634
##
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##
@@ -283,6 +289,109 @@ public ConfigSuccessResponse addTable(String
tableConfigStr,
}
}
+ @POST
+ @Path("/tables/{tableName}/copy")
+ @Authorize(targetType = TargetType.TABLE, action =
Actions.Table.CREATE_TABLE)
+ @Produces(MediaType.APPLICATION_JSON)
+ @ApiOperation(value = "Copy a table's schema and config from another
cluster", notes = "Non upsert table only")
+ public CopyTableResponse copyTable(
+ @ApiParam(value = "Name of the table", required = true)
@PathParam("tableName") String tableName, String payload,
+ @Context HttpHeaders headers) {
+try {
+ LOGGER.info("[copyTable] received request for table: {}, payload: {}",
tableName, payload);
+ tableName = DatabaseUtils.translateTableName(tableName, headers);
+ CopyTablePayload copyTablePayload = JsonUtils.stringToObject(payload,
CopyTablePayload.class);
+ String sourceControllerUri = copyTablePayload.getSourceClusterUri();
+ Map requestHeaders = copyTablePayload.getHeaders();
+ String brokerTenant = copyTablePayload.getBrokerTenant();
+ String serverTenant = copyTablePayload.getServerTenant();
+ Map tagReplacementMap =
copyTablePayload.getTagPoolReplacementMap();
+
+ LOGGER.info("[copyTable] Start copying table: {} from source: {}",
tableName, sourceControllerUri);
+
+ // Fetch and add schema
+ URI schemaUri = new URI(sourceControllerUri + "/tables/" + tableName +
"/schema");
+ SimpleHttpResponse schemaResponse = HttpClient.wrapAndThrowHttpException(
+ HttpClient.getInstance().sendGetRequest(schemaUri, requestHeaders));
+ String schemaJson = schemaResponse.getResponse();
+ Schema schema = Schema.fromString(schemaJson);
+ _pinotHelixResourceManager.addSchema(schema, true, false);
+ LOGGER.info("[copyTable] Successfully added schema for table: {},
schema: {}", tableName, schema);
+
+ // Fetch and add table configs
+ URI tableConfigUri = new URI(sourceControllerUri + "/tables/" +
tableName);
+ SimpleHttpResponse tableConfigResponse =
HttpClient.wrapAndThrowHttpException(
+ HttpClient.getInstance().sendGetRequest(tableConfigUri,
requestHeaders));
+ String tableConfigJson = tableConfigResponse.getResponse();
+ LOGGER.info("[copyTable] Fetched table config for table: {},
tableConfig: {}", tableName, tableConfigJson);
+ JsonNode tableConfigNode = JsonUtils.stringToJsonNode(tableConfigJson);
+
+ if (tableConfigNode.has(TableType.REALTIME.name())) {
+ObjectNode realtimeTableConfigNode = (ObjectNode)
tableConfigNode.get(TableType.REALTIME.name());
+tweakRealtimeTableConfig(realtimeTableConfigNode, brokerTenant,
serverTenant, tagReplacementMap);
+TableConfig realtimeTableConfig =
JsonUtils.jsonNodeToObject(realtimeTableConfigNode, TableConfig.class);
+LOGGER.info("[copyTable] Successfully fetched and tweaked table config
for table: {}, tableConfig: {}",
+tableName, realtimeTableConfig.toString());
+
+URI watermarkUri = new URI(sourceControllerUri + "/tables/" +
tableName + "/consumerWatermarks");
+SimpleHttpResponse watermarkResponse =
HttpClient.wrapAndThrowHttpException(
+HttpClient.getInstance().sendGetRequest(watermarkUri,
requestHeaders));
+String watermarkJson = watermarkResponse.getResponse();
+LOGGER.info("[copyTable] Fetched watermarks for table: {}. Result:
{}", tableName, watermarkJson);
+WatermarkInductionResult watermarkInductionResult =
+JsonUtils.stringToObject(watermarkJson,
WatermarkInductionResult.class);
+
+List partitionGroupInfos =
watermarkInductionResult.getWatermarks().stream()
+.map(PartitionGroupInfo::from)
+.collect(Collectors.toList());
+
+_pinotHelixResourceManager.addTable(realtimeTableConfig,
partitionGroupInfos);
+ }
+ LOGGER.info("[copyTable] Finished Table Config copy: {}", tableName);
+ return new CopyTableResponse("success");
+} catch (Exception e) {
+ LOGGER.error("[copyTable] Error copying table: {}", tableName, e);
+ throw new ControllerApplicationException(LOGGER, "Error copying table: "
+ e.getMessage(),
+ Response.Status.INTERNAL_SERVER_ERROR, e);
+}
+ }
+
+ /**
+ * Tweaks the realtime table config with the given broker and server tenants.
+ *
+ * @param realtimeTableConfigNode The JSON object representing the realtime
table config.
+ * @param brokerTenant The broker tenant to set in the config.
+ * @param serverTenant The server tenant to set in the config.
+ */
+ @VisibleForTesting
+ static void tweakRealtimeTableConfig(ObjectNode realtimeTableConfigNode,
String brokerTenant, String server
Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]
wirybeaver commented on code in PR #17235:
URL: https://github.com/apache/pinot/pull/17235#discussion_r2629944309
##
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##
@@ -283,6 +289,109 @@ public ConfigSuccessResponse addTable(String
tableConfigStr,
}
}
+ @POST
+ @Path("/tables/{tableName}/copy")
+ @Authorize(targetType = TargetType.TABLE, action =
Actions.Table.CREATE_TABLE)
+ @Produces(MediaType.APPLICATION_JSON)
+ @ApiOperation(value = "Copy a table's schema and config from another
cluster", notes = "Non upsert table only")
+ public CopyTableResponse copyTable(
+ @ApiParam(value = "Name of the table", required = true)
@PathParam("tableName") String tableName, String payload,
+ @Context HttpHeaders headers) {
+try {
+ LOGGER.info("[copyTable] received request for table: {}, payload: {}",
tableName, payload);
+ tableName = DatabaseUtils.translateTableName(tableName, headers);
+ CopyTablePayload copyTablePayload = JsonUtils.stringToObject(payload,
CopyTablePayload.class);
+ String sourceControllerUri = copyTablePayload.getSourceClusterUri();
+ Map requestHeaders = copyTablePayload.getHeaders();
+ String brokerTenant = copyTablePayload.getBrokerTenant();
+ String serverTenant = copyTablePayload.getServerTenant();
+ Map tagReplacementMap =
copyTablePayload.getTagPoolReplacementMap();
+
+ LOGGER.info("[copyTable] Start copying table: {} from source: {}",
tableName, sourceControllerUri);
+
+ // Fetch and add schema
+ URI schemaUri = new URI(sourceControllerUri + "/tables/" + tableName +
"/schema");
+ SimpleHttpResponse schemaResponse = HttpClient.wrapAndThrowHttpException(
+ HttpClient.getInstance().sendGetRequest(schemaUri, requestHeaders));
+ String schemaJson = schemaResponse.getResponse();
+ Schema schema = Schema.fromString(schemaJson);
+ _pinotHelixResourceManager.addSchema(schema, true, false);
+ LOGGER.info("[copyTable] Successfully added schema for table: {},
schema: {}", tableName, schema);
+
+ // Fetch and add table configs
+ URI tableConfigUri = new URI(sourceControllerUri + "/tables/" +
tableName);
+ SimpleHttpResponse tableConfigResponse =
HttpClient.wrapAndThrowHttpException(
+ HttpClient.getInstance().sendGetRequest(tableConfigUri,
requestHeaders));
+ String tableConfigJson = tableConfigResponse.getResponse();
+ LOGGER.info("[copyTable] Fetched table config for table: {},
tableConfig: {}", tableName, tableConfigJson);
+ JsonNode tableConfigNode = JsonUtils.stringToJsonNode(tableConfigJson);
+
+ if (tableConfigNode.has(TableType.REALTIME.name())) {
+ObjectNode realtimeTableConfigNode = (ObjectNode)
tableConfigNode.get(TableType.REALTIME.name());
+tweakRealtimeTableConfig(realtimeTableConfigNode, brokerTenant,
serverTenant, tagReplacementMap);
+TableConfig realtimeTableConfig =
JsonUtils.jsonNodeToObject(realtimeTableConfigNode, TableConfig.class);
+LOGGER.info("[copyTable] Successfully fetched and tweaked table config
for table: {}, tableConfig: {}",
+tableName, realtimeTableConfig.toString());
+
+URI watermarkUri = new URI(sourceControllerUri + "/tables/" +
tableName + "/consumerWatermarks");
+SimpleHttpResponse watermarkResponse =
HttpClient.wrapAndThrowHttpException(
+HttpClient.getInstance().sendGetRequest(watermarkUri,
requestHeaders));
+String watermarkJson = watermarkResponse.getResponse();
+LOGGER.info("[copyTable] Fetched watermarks for table: {}. Result:
{}", tableName, watermarkJson);
+WatermarkInductionResult watermarkInductionResult =
+JsonUtils.stringToObject(watermarkJson,
WatermarkInductionResult.class);
+
+List partitionGroupInfos =
watermarkInductionResult.getWatermarks().stream()
+.map(PartitionGroupInfo::from)
+.collect(Collectors.toList());
+
+_pinotHelixResourceManager.addTable(realtimeTableConfig,
partitionGroupInfos);
+ }
+ LOGGER.info("[copyTable] Finished Table Config copy: {}", tableName);
+ return new CopyTableResponse("success");
+} catch (Exception e) {
+ LOGGER.error("[copyTable] Error copying table: {}", tableName, e);
+ throw new ControllerApplicationException(LOGGER, "Error copying table: "
+ e.getMessage(),
+ Response.Status.INTERNAL_SERVER_ERROR, e);
+}
+ }
+
+ /**
+ * Tweaks the realtime table config with the given broker and server tenants.
+ *
+ * @param realtimeTableConfigNode The JSON object representing the realtime
table config.
+ * @param brokerTenant The broker tenant to set in the config.
+ * @param serverTenant The server tenant to set in the config.
+ */
+ @VisibleForTesting
+ static void tweakRealtimeTableConfig(ObjectNode realtimeTableConfigNode,
String brokerTenant, String server
Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]
wirybeaver commented on code in PR #17235:
URL: https://github.com/apache/pinot/pull/17235#discussion_r2629468263
##
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##
@@ -283,6 +289,109 @@ public ConfigSuccessResponse addTable(String
tableConfigStr,
}
}
+ @POST
+ @Path("/tables/{tableName}/copy")
+ @Authorize(targetType = TargetType.TABLE, action =
Actions.Table.CREATE_TABLE)
+ @Produces(MediaType.APPLICATION_JSON)
+ @ApiOperation(value = "Copy a table's schema and config from another
cluster", notes = "Non upsert table only")
+ public CopyTableResponse copyTable(
Review Comment:
I don't think we need it because it essentially do table copy into a
different cluster. User can safely delete the table copy.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]
wirybeaver commented on code in PR #17235:
URL: https://github.com/apache/pinot/pull/17235#discussion_r2629465487
##
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##
@@ -283,6 +289,109 @@ public ConfigSuccessResponse addTable(String
tableConfigStr,
}
}
+ @POST
+ @Path("/tables/{tableName}/copy")
+ @Authorize(targetType = TargetType.TABLE, action =
Actions.Table.CREATE_TABLE)
+ @Produces(MediaType.APPLICATION_JSON)
+ @ApiOperation(value = "Copy a table's schema and config from another
cluster", notes = "Non upsert table only")
+ public CopyTableResponse copyTable(
+ @ApiParam(value = "Name of the table", required = true)
@PathParam("tableName") String tableName, String payload,
+ @Context HttpHeaders headers) {
+try {
+ LOGGER.info("[copyTable] received request for table: {}, payload: {}",
tableName, payload);
+ tableName = DatabaseUtils.translateTableName(tableName, headers);
+ CopyTablePayload copyTablePayload = JsonUtils.stringToObject(payload,
CopyTablePayload.class);
+ String sourceControllerUri = copyTablePayload.getSourceClusterUri();
+ Map requestHeaders = copyTablePayload.getHeaders();
+ String brokerTenant = copyTablePayload.getBrokerTenant();
+ String serverTenant = copyTablePayload.getServerTenant();
+ Map tagReplacementMap =
copyTablePayload.getTagPoolReplacementMap();
+
+ LOGGER.info("[copyTable] Start copying table: {} from source: {}",
tableName, sourceControllerUri);
+
+ // Fetch and add schema
+ URI schemaUri = new URI(sourceControllerUri + "/tables/" + tableName +
"/schema");
+ SimpleHttpResponse schemaResponse = HttpClient.wrapAndThrowHttpException(
+ HttpClient.getInstance().sendGetRequest(schemaUri, requestHeaders));
+ String schemaJson = schemaResponse.getResponse();
+ Schema schema = Schema.fromString(schemaJson);
+ _pinotHelixResourceManager.addSchema(schema, true, false);
+ LOGGER.info("[copyTable] Successfully added schema for table: {},
schema: {}", tableName, schema);
+
+ // Fetch and add table configs
+ URI tableConfigUri = new URI(sourceControllerUri + "/tables/" +
tableName);
+ SimpleHttpResponse tableConfigResponse =
HttpClient.wrapAndThrowHttpException(
+ HttpClient.getInstance().sendGetRequest(tableConfigUri,
requestHeaders));
+ String tableConfigJson = tableConfigResponse.getResponse();
+ LOGGER.info("[copyTable] Fetched table config for table: {},
tableConfig: {}", tableName, tableConfigJson);
+ JsonNode tableConfigNode = JsonUtils.stringToJsonNode(tableConfigJson);
+
+ if (tableConfigNode.has(TableType.REALTIME.name())) {
Review Comment:
Yeah, for upsert and pauseless table, we should throw the exception.
For offline table or offline part of the hybrid table, we can log as a
warning? The hunch is that, the use can write their own internal workflow to
backfill the offline table. For hybrid table, the user need to copy realtime
part.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]
deemoliu commented on code in PR #17235:
URL: https://github.com/apache/pinot/pull/17235#discussion_r2621398775
##
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##
@@ -4737,6 +4761,37 @@ public QueryWorkloadManager getQueryWorkloadManager() {
return _queryWorkloadManager;
}
+ public WatermarkInductionResult inductConsumingWatermarks(String tableName)
throws TableNotFoundException {
+String tableNameWithType =
TableNameBuilder.REALTIME.tableNameWithType(tableName);
+if (!hasRealtimeTable(tableName)) {
+ throw new TableNotFoundException("Table " + tableNameWithType + " does
not exist");
+}
+TableConfig tableConfig = getTableConfig(tableNameWithType);
+Preconditions.checkNotNull(tableConfig, "Table " + tableNameWithType +
"exists but null tableConfig");
+List streamConfigs =
IngestionConfigUtils.getStreamConfigs(tableConfig);
+IdealState idealState = _helixAdmin
+.getResourceIdealState(getHelixClusterName(), tableNameWithType);
+List lst = _pinotLLCRealtimeSegmentManager
+.getPartitionGroupConsumptionStatusList(idealState, streamConfigs);
+List watermarks =
lst.stream().map(status -> {
+ long seq = status.getSequenceNumber();
+ long startOffset;
+ try {
+if (status.getStatus().equalsIgnoreCase("done")) {
Review Comment:
"DONE".equalsIgnoreCase(status.getStatus()) for null handling?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]
deemoliu commented on code in PR #17235:
URL: https://github.com/apache/pinot/pull/17235#discussion_r2621397588
##
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##
@@ -4737,6 +4761,37 @@ public QueryWorkloadManager getQueryWorkloadManager() {
return _queryWorkloadManager;
}
+ public WatermarkInductionResult inductConsumingWatermarks(String tableName)
throws TableNotFoundException {
+String tableNameWithType =
TableNameBuilder.REALTIME.tableNameWithType(tableName);
+if (!hasRealtimeTable(tableName)) {
+ throw new TableNotFoundException("Table " + tableNameWithType + " does
not exist");
+}
+TableConfig tableConfig = getTableConfig(tableNameWithType);
+Preconditions.checkNotNull(tableConfig, "Table " + tableNameWithType +
"exists but null tableConfig");
+List streamConfigs =
IngestionConfigUtils.getStreamConfigs(tableConfig);
+IdealState idealState = _helixAdmin
+.getResourceIdealState(getHelixClusterName(), tableNameWithType);
Review Comment:
Can idealState be null?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]
deemoliu commented on code in PR #17235:
URL: https://github.com/apache/pinot/pull/17235#discussion_r2621395272
##
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##
@@ -283,6 +289,109 @@ public ConfigSuccessResponse addTable(String
tableConfigStr,
}
}
+ @POST
+ @Path("/tables/{tableName}/copy")
+ @Authorize(targetType = TargetType.TABLE, action =
Actions.Table.CREATE_TABLE)
+ @Produces(MediaType.APPLICATION_JSON)
+ @ApiOperation(value = "Copy a table's schema and config from another
cluster", notes = "Non upsert table only")
+ public CopyTableResponse copyTable(
Review Comment:
for next PR, we might need to add a mode param and introduce a dryrun mode
for feature safety.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]
deemoliu commented on code in PR #17235:
URL: https://github.com/apache/pinot/pull/17235#discussion_r2621393154
##
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##
@@ -283,6 +289,109 @@ public ConfigSuccessResponse addTable(String
tableConfigStr,
}
}
+ @POST
+ @Path("/tables/{tableName}/copy")
+ @Authorize(targetType = TargetType.TABLE, action =
Actions.Table.CREATE_TABLE)
+ @Produces(MediaType.APPLICATION_JSON)
+ @ApiOperation(value = "Copy a table's schema and config from another
cluster", notes = "Non upsert table only")
+ public CopyTableResponse copyTable(
+ @ApiParam(value = "Name of the table", required = true)
@PathParam("tableName") String tableName, String payload,
+ @Context HttpHeaders headers) {
+try {
+ LOGGER.info("[copyTable] received request for table: {}, payload: {}",
tableName, payload);
+ tableName = DatabaseUtils.translateTableName(tableName, headers);
+ CopyTablePayload copyTablePayload = JsonUtils.stringToObject(payload,
CopyTablePayload.class);
+ String sourceControllerUri = copyTablePayload.getSourceClusterUri();
+ Map requestHeaders = copyTablePayload.getHeaders();
+ String brokerTenant = copyTablePayload.getBrokerTenant();
+ String serverTenant = copyTablePayload.getServerTenant();
+ Map tagReplacementMap =
copyTablePayload.getTagPoolReplacementMap();
+
+ LOGGER.info("[copyTable] Start copying table: {} from source: {}",
tableName, sourceControllerUri);
+
+ // Fetch and add schema
+ URI schemaUri = new URI(sourceControllerUri + "/tables/" + tableName +
"/schema");
+ SimpleHttpResponse schemaResponse = HttpClient.wrapAndThrowHttpException(
+ HttpClient.getInstance().sendGetRequest(schemaUri, requestHeaders));
+ String schemaJson = schemaResponse.getResponse();
+ Schema schema = Schema.fromString(schemaJson);
+ _pinotHelixResourceManager.addSchema(schema, true, false);
+ LOGGER.info("[copyTable] Successfully added schema for table: {},
schema: {}", tableName, schema);
+
+ // Fetch and add table configs
+ URI tableConfigUri = new URI(sourceControllerUri + "/tables/" +
tableName);
+ SimpleHttpResponse tableConfigResponse =
HttpClient.wrapAndThrowHttpException(
+ HttpClient.getInstance().sendGetRequest(tableConfigUri,
requestHeaders));
+ String tableConfigJson = tableConfigResponse.getResponse();
+ LOGGER.info("[copyTable] Fetched table config for table: {},
tableConfig: {}", tableName, tableConfigJson);
+ JsonNode tableConfigNode = JsonUtils.stringToJsonNode(tableConfigJson);
+
+ if (tableConfigNode.has(TableType.REALTIME.name())) {
+ObjectNode realtimeTableConfigNode = (ObjectNode)
tableConfigNode.get(TableType.REALTIME.name());
+tweakRealtimeTableConfig(realtimeTableConfigNode, brokerTenant,
serverTenant, tagReplacementMap);
+TableConfig realtimeTableConfig =
JsonUtils.jsonNodeToObject(realtimeTableConfigNode, TableConfig.class);
+LOGGER.info("[copyTable] Successfully fetched and tweaked table config
for table: {}, tableConfig: {}",
+tableName, realtimeTableConfig.toString());
+
+URI watermarkUri = new URI(sourceControllerUri + "/tables/" +
tableName + "/consumerWatermarks");
+SimpleHttpResponse watermarkResponse =
HttpClient.wrapAndThrowHttpException(
+HttpClient.getInstance().sendGetRequest(watermarkUri,
requestHeaders));
+String watermarkJson = watermarkResponse.getResponse();
+LOGGER.info("[copyTable] Fetched watermarks for table: {}. Result:
{}", tableName, watermarkJson);
+WatermarkInductionResult watermarkInductionResult =
+JsonUtils.stringToObject(watermarkJson,
WatermarkInductionResult.class);
+
+List partitionGroupInfos =
watermarkInductionResult.getWatermarks().stream()
+.map(PartitionGroupInfo::from)
+.collect(Collectors.toList());
+
+_pinotHelixResourceManager.addTable(realtimeTableConfig,
partitionGroupInfos);
+ }
+ LOGGER.info("[copyTable] Finished Table Config copy: {}", tableName);
+ return new CopyTableResponse("success");
+} catch (Exception e) {
+ LOGGER.error("[copyTable] Error copying table: {}", tableName, e);
+ throw new ControllerApplicationException(LOGGER, "Error copying table: "
+ e.getMessage(),
+ Response.Status.INTERNAL_SERVER_ERROR, e);
+}
+ }
+
+ /**
+ * Tweaks the realtime table config with the given broker and server tenants.
+ *
+ * @param realtimeTableConfigNode The JSON object representing the realtime
table config.
+ * @param brokerTenant The broker tenant to set in the config.
+ * @param serverTenant The server tenant to set in the config.
+ */
+ @VisibleForTesting
+ static void tweakRealtimeTableConfig(ObjectNode realtimeTableConfigNode,
String brokerTenant, String serverTe
Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]
deemoliu commented on code in PR #17235:
URL: https://github.com/apache/pinot/pull/17235#discussion_r2621392206
##
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##
@@ -283,6 +289,109 @@ public ConfigSuccessResponse addTable(String
tableConfigStr,
}
}
+ @POST
+ @Path("/tables/{tableName}/copy")
+ @Authorize(targetType = TargetType.TABLE, action =
Actions.Table.CREATE_TABLE)
+ @Produces(MediaType.APPLICATION_JSON)
+ @ApiOperation(value = "Copy a table's schema and config from another
cluster", notes = "Non upsert table only")
+ public CopyTableResponse copyTable(
+ @ApiParam(value = "Name of the table", required = true)
@PathParam("tableName") String tableName, String payload,
+ @Context HttpHeaders headers) {
+try {
+ LOGGER.info("[copyTable] received request for table: {}, payload: {}",
tableName, payload);
+ tableName = DatabaseUtils.translateTableName(tableName, headers);
+ CopyTablePayload copyTablePayload = JsonUtils.stringToObject(payload,
CopyTablePayload.class);
+ String sourceControllerUri = copyTablePayload.getSourceClusterUri();
+ Map requestHeaders = copyTablePayload.getHeaders();
+ String brokerTenant = copyTablePayload.getBrokerTenant();
+ String serverTenant = copyTablePayload.getServerTenant();
+ Map tagReplacementMap =
copyTablePayload.getTagPoolReplacementMap();
+
+ LOGGER.info("[copyTable] Start copying table: {} from source: {}",
tableName, sourceControllerUri);
+
+ // Fetch and add schema
+ URI schemaUri = new URI(sourceControllerUri + "/tables/" + tableName +
"/schema");
+ SimpleHttpResponse schemaResponse = HttpClient.wrapAndThrowHttpException(
+ HttpClient.getInstance().sendGetRequest(schemaUri, requestHeaders));
+ String schemaJson = schemaResponse.getResponse();
+ Schema schema = Schema.fromString(schemaJson);
+ _pinotHelixResourceManager.addSchema(schema, true, false);
+ LOGGER.info("[copyTable] Successfully added schema for table: {},
schema: {}", tableName, schema);
+
+ // Fetch and add table configs
+ URI tableConfigUri = new URI(sourceControllerUri + "/tables/" +
tableName);
+ SimpleHttpResponse tableConfigResponse =
HttpClient.wrapAndThrowHttpException(
+ HttpClient.getInstance().sendGetRequest(tableConfigUri,
requestHeaders));
+ String tableConfigJson = tableConfigResponse.getResponse();
+ LOGGER.info("[copyTable] Fetched table config for table: {},
tableConfig: {}", tableName, tableConfigJson);
+ JsonNode tableConfigNode = JsonUtils.stringToJsonNode(tableConfigJson);
+
+ if (tableConfigNode.has(TableType.REALTIME.name())) {
Review Comment:
same for Upsert.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]
deemoliu commented on code in PR #17235:
URL: https://github.com/apache/pinot/pull/17235#discussion_r2621391126
##
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##
@@ -283,6 +289,109 @@ public ConfigSuccessResponse addTable(String
tableConfigStr,
}
}
+ @POST
+ @Path("/tables/{tableName}/copy")
+ @Authorize(targetType = TargetType.TABLE, action =
Actions.Table.CREATE_TABLE)
+ @Produces(MediaType.APPLICATION_JSON)
+ @ApiOperation(value = "Copy a table's schema and config from another
cluster", notes = "Non upsert table only")
+ public CopyTableResponse copyTable(
+ @ApiParam(value = "Name of the table", required = true)
@PathParam("tableName") String tableName, String payload,
+ @Context HttpHeaders headers) {
+try {
+ LOGGER.info("[copyTable] received request for table: {}, payload: {}",
tableName, payload);
+ tableName = DatabaseUtils.translateTableName(tableName, headers);
+ CopyTablePayload copyTablePayload = JsonUtils.stringToObject(payload,
CopyTablePayload.class);
+ String sourceControllerUri = copyTablePayload.getSourceClusterUri();
+ Map requestHeaders = copyTablePayload.getHeaders();
+ String brokerTenant = copyTablePayload.getBrokerTenant();
+ String serverTenant = copyTablePayload.getServerTenant();
+ Map tagReplacementMap =
copyTablePayload.getTagPoolReplacementMap();
+
+ LOGGER.info("[copyTable] Start copying table: {} from source: {}",
tableName, sourceControllerUri);
+
+ // Fetch and add schema
+ URI schemaUri = new URI(sourceControllerUri + "/tables/" + tableName +
"/schema");
+ SimpleHttpResponse schemaResponse = HttpClient.wrapAndThrowHttpException(
+ HttpClient.getInstance().sendGetRequest(schemaUri, requestHeaders));
+ String schemaJson = schemaResponse.getResponse();
+ Schema schema = Schema.fromString(schemaJson);
+ _pinotHelixResourceManager.addSchema(schema, true, false);
+ LOGGER.info("[copyTable] Successfully added schema for table: {},
schema: {}", tableName, schema);
+
+ // Fetch and add table configs
+ URI tableConfigUri = new URI(sourceControllerUri + "/tables/" +
tableName);
+ SimpleHttpResponse tableConfigResponse =
HttpClient.wrapAndThrowHttpException(
+ HttpClient.getInstance().sendGetRequest(tableConfigUri,
requestHeaders));
+ String tableConfigJson = tableConfigResponse.getResponse();
+ LOGGER.info("[copyTable] Fetched table config for table: {},
tableConfig: {}", tableName, tableConfigJson);
+ JsonNode tableConfigNode = JsonUtils.stringToJsonNode(tableConfigJson);
+
+ if (tableConfigNode.has(TableType.REALTIME.name())) {
Review Comment:
it seems we only support realtime, do we have any validation to avoid this
using on hybrid and offline?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]
deemoliu commented on code in PR #17235:
URL: https://github.com/apache/pinot/pull/17235#discussion_r2621389583
##
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##
@@ -283,6 +289,109 @@ public ConfigSuccessResponse addTable(String
tableConfigStr,
}
}
+ @POST
+ @Path("/tables/{tableName}/copy")
+ @Authorize(targetType = TargetType.TABLE, action =
Actions.Table.CREATE_TABLE)
+ @Produces(MediaType.APPLICATION_JSON)
+ @ApiOperation(value = "Copy a table's schema and config from another
cluster", notes = "Non upsert table only")
+ public CopyTableResponse copyTable(
+ @ApiParam(value = "Name of the table", required = true)
@PathParam("tableName") String tableName, String payload,
+ @Context HttpHeaders headers) {
+try {
+ LOGGER.info("[copyTable] received request for table: {}, payload: {}",
tableName, payload);
+ tableName = DatabaseUtils.translateTableName(tableName, headers);
+ CopyTablePayload copyTablePayload = JsonUtils.stringToObject(payload,
CopyTablePayload.class);
+ String sourceControllerUri = copyTablePayload.getSourceClusterUri();
+ Map requestHeaders = copyTablePayload.getHeaders();
+ String brokerTenant = copyTablePayload.getBrokerTenant();
+ String serverTenant = copyTablePayload.getServerTenant();
+ Map tagReplacementMap =
copyTablePayload.getTagPoolReplacementMap();
+
+ LOGGER.info("[copyTable] Start copying table: {} from source: {}",
tableName, sourceControllerUri);
+
+ // Fetch and add schema
+ URI schemaUri = new URI(sourceControllerUri + "/tables/" + tableName +
"/schema");
+ SimpleHttpResponse schemaResponse = HttpClient.wrapAndThrowHttpException(
+ HttpClient.getInstance().sendGetRequest(schemaUri, requestHeaders));
+ String schemaJson = schemaResponse.getResponse();
+ Schema schema = Schema.fromString(schemaJson);
+ _pinotHelixResourceManager.addSchema(schema, true, false);
+ LOGGER.info("[copyTable] Successfully added schema for table: {},
schema: {}", tableName, schema);
+
+ // Fetch and add table configs
+ URI tableConfigUri = new URI(sourceControllerUri + "/tables/" +
tableName);
+ SimpleHttpResponse tableConfigResponse =
HttpClient.wrapAndThrowHttpException(
+ HttpClient.getInstance().sendGetRequest(tableConfigUri,
requestHeaders));
+ String tableConfigJson = tableConfigResponse.getResponse();
+ LOGGER.info("[copyTable] Fetched table config for table: {},
tableConfig: {}", tableName, tableConfigJson);
+ JsonNode tableConfigNode = JsonUtils.stringToJsonNode(tableConfigJson);
+
+ if (tableConfigNode.has(TableType.REALTIME.name())) {
+ObjectNode realtimeTableConfigNode = (ObjectNode)
tableConfigNode.get(TableType.REALTIME.name());
+tweakRealtimeTableConfig(realtimeTableConfigNode, brokerTenant,
serverTenant, tagReplacementMap);
+TableConfig realtimeTableConfig =
JsonUtils.jsonNodeToObject(realtimeTableConfigNode, TableConfig.class);
+LOGGER.info("[copyTable] Successfully fetched and tweaked table config
for table: {}, tableConfig: {}",
+tableName, realtimeTableConfig.toString());
+
+URI watermarkUri = new URI(sourceControllerUri + "/tables/" +
tableName + "/consumerWatermarks");
+SimpleHttpResponse watermarkResponse =
HttpClient.wrapAndThrowHttpException(
+HttpClient.getInstance().sendGetRequest(watermarkUri,
requestHeaders));
+String watermarkJson = watermarkResponse.getResponse();
+LOGGER.info("[copyTable] Fetched watermarks for table: {}. Result:
{}", tableName, watermarkJson);
+WatermarkInductionResult watermarkInductionResult =
+JsonUtils.stringToObject(watermarkJson,
WatermarkInductionResult.class);
+
+List partitionGroupInfos =
watermarkInductionResult.getWatermarks().stream()
+.map(PartitionGroupInfo::from)
+.collect(Collectors.toList());
+
+_pinotHelixResourceManager.addTable(realtimeTableConfig,
partitionGroupInfos);
+ }
+ LOGGER.info("[copyTable] Finished Table Config copy: {}", tableName);
+ return new CopyTableResponse("success");
+} catch (Exception e) {
+ LOGGER.error("[copyTable] Error copying table: {}", tableName, e);
+ throw new ControllerApplicationException(LOGGER, "Error copying table: "
+ e.getMessage(),
+ Response.Status.INTERNAL_SERVER_ERROR, e);
+}
+ }
+
+ /**
+ * Tweaks the realtime table config with the given broker and server tenants.
+ *
+ * @param realtimeTableConfigNode The JSON object representing the realtime
table config.
+ * @param brokerTenant The broker tenant to set in the config.
+ * @param serverTenant The server tenant to set in the config.
+ */
+ @VisibleForTesting
+ static void tweakRealtimeTableConfig(ObjectNode realtimeTableConfigNode,
String brokerTenant, String serverTe
Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]
deemoliu commented on code in PR #17235:
URL: https://github.com/apache/pinot/pull/17235#discussion_r2621386460
##
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##
@@ -283,6 +289,109 @@ public ConfigSuccessResponse addTable(String
tableConfigStr,
}
}
+ @POST
+ @Path("/tables/{tableName}/copy")
+ @Authorize(targetType = TargetType.TABLE, action =
Actions.Table.CREATE_TABLE)
+ @Produces(MediaType.APPLICATION_JSON)
+ @ApiOperation(value = "Copy a table's schema and config from another
cluster", notes = "Non upsert table only")
+ public CopyTableResponse copyTable(
+ @ApiParam(value = "Name of the table", required = true)
@PathParam("tableName") String tableName, String payload,
+ @Context HttpHeaders headers) {
+try {
+ LOGGER.info("[copyTable] received request for table: {}, payload: {}",
tableName, payload);
+ tableName = DatabaseUtils.translateTableName(tableName, headers);
+ CopyTablePayload copyTablePayload = JsonUtils.stringToObject(payload,
CopyTablePayload.class);
+ String sourceControllerUri = copyTablePayload.getSourceClusterUri();
+ Map requestHeaders = copyTablePayload.getHeaders();
+ String brokerTenant = copyTablePayload.getBrokerTenant();
+ String serverTenant = copyTablePayload.getServerTenant();
+ Map tagReplacementMap =
copyTablePayload.getTagPoolReplacementMap();
+
+ LOGGER.info("[copyTable] Start copying table: {} from source: {}",
tableName, sourceControllerUri);
+
+ // Fetch and add schema
+ URI schemaUri = new URI(sourceControllerUri + "/tables/" + tableName +
"/schema");
+ SimpleHttpResponse schemaResponse = HttpClient.wrapAndThrowHttpException(
+ HttpClient.getInstance().sendGetRequest(schemaUri, requestHeaders));
+ String schemaJson = schemaResponse.getResponse();
+ Schema schema = Schema.fromString(schemaJson);
+ _pinotHelixResourceManager.addSchema(schema, true, false);
Review Comment:
if the schema already exists, what will happen?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]
deemoliu commented on code in PR #17235:
URL: https://github.com/apache/pinot/pull/17235#discussion_r2621380681
##
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java:
##
@@ -449,6 +451,25 @@ public String getPauselessTableDebugInfo(
}
}
+
+ @GET
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("/tables/{tableName}/consumerWatermarks")
+ @Authorize(targetType = TargetType.TABLE, paramName = "tableName", action =
Actions.Table.GET_IDEAL_STATE)
+ @ApiOperation(value = "Get table ideal state", notes = "Get table ideal
state")
+ public WatermarkInductionResult inductConsumingWatermark(
+ @ApiParam(value = "Name of the realtime table", required = true)
@PathParam("tableName") String tableName,
+ @Context HttpHeaders headers) {
+try {
+ String table = DatabaseUtils.translateTableName(tableName, headers);
Review Comment:
Table copy won't change table name, is that correct?
That said, we cannot copy table in the same cluster (share controller) from
one tenant to another, is that correct?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]
deemoliu commented on code in PR #17235:
URL: https://github.com/apache/pinot/pull/17235#discussion_r2621185919
##
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/CopyTablePayload.java:
##
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api.resources;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonGetter;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.util.Map;
+import javax.annotation.Nullable;
+
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class CopyTablePayload {
+
+ private String _sourceClusterUri;
+ private Map _headers;
+ /**
+ * Broker tenant for the new table.
+ * MUST NOT contain the tenant type suffix, i.e. _BROKER.
+ */
+ private String _brokerTenant;
+ /**
+ * Server tenant for the new table.
+ * MUST NOT contain the tenant type suffix, i.e. _REALTIME or _OFFLINE.
+ */
+ private String _serverTenant;
+
+ private Map _tagPoolReplacementMap;
Review Comment:
what is the usage for this param?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]
wirybeaver commented on PR #17235: URL: https://github.com/apache/pinot/pull/17235#issuecomment-3648742146 @Jackie-Jiang @chenboat Could you take a review? The manual integration test gets done already. The purpose of this PR is to copy the schema and table config from the source cluster to the target cluster by replacing the server / broke tenant. And then copy the consuming segment to kick off the message consumption on the new table. The backfill of segments prior to consuming segments will be implemented in a follow up PR -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]
wirybeaver commented on PR #17235: URL: https://github.com/apache/pinot/pull/17235#issuecomment-3648733875 This Integration Test was not stable. TableRebalancePauselessIntegrationTest.testForceCommit:143->BaseClusterIntegrationTestSet.waitForRebalanceToComplete:850 Failed to meet condition in 60ms, error message: Failed to complete rebalance -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]
deemoliu commented on code in PR #17235:
URL: https://github.com/apache/pinot/pull/17235#discussion_r2608338598
##
pinot-controller/src/test/resources/table/table_config_with_instance_assignment.json:
##
@@ -0,0 +1,15 @@
+{
+ "tenants": {
+"broker": "broker1",
+"server": "rtaShared2"
Review Comment:
nit: rtaShared2 --> server1
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]
wirybeaver commented on code in PR #17235:
URL: https://github.com/apache/pinot/pull/17235#discussion_r2579486964
##
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java:
##
@@ -433,6 +435,25 @@ public String getPauselessTableDebugInfo(
}
}
+
+ @GET
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("/tables/{tableName}/watermarks")
Review Comment:
/consumerWatermarks
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]
wirybeaver commented on code in PR #17235:
URL: https://github.com/apache/pinot/pull/17235#discussion_r2579487604
##
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/CopyTablePayload.java:
##
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api.resources;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonGetter;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.util.Map;
+import javax.annotation.Nullable;
+
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class CopyTablePayload {
+
+ String _sourceClusterUri;
+ Map _headers;
+ /**
+ * Broker tenant for the new table.
+ * MUST NOT contain the tenant type suffix, i.e. _BROKER.
+ */
+ String _brokerTenant;
+ /**
+ * Server tenant for the new table.
+ * MUST NOT contain the tenant type suffix, i.e. _REALTIME or _OFFLINE.
+ */
+ String _serverTenant;
+
+ Map _tagPoolReplacementMap;
+
+ @JsonCreator
+ public CopyTablePayload(@JsonProperty(value = "tableName", required = true)
String tableName,
+ @JsonProperty(value = "sourceClusterUri", required = true) String
sourceClusterUri,
+ @JsonProperty("sourceClusterHeaders") Map headers,
+ @JsonProperty(value = "brokerTenant", required = true) String
brokerTenant,
+ @JsonProperty(value = "serverTenant", required = true) String
serverTenant,
+ @JsonProperty("tagPoolReplacementMap") @Nullable Map
tagPoolReplacementMap) {
+_sourceClusterUri = sourceClusterUri;
+_headers = headers;
Review Comment:
yep
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]
deemoliu commented on code in PR #17235:
URL: https://github.com/apache/pinot/pull/17235#discussion_r2548138260
##
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java:
##
@@ -433,6 +435,25 @@ public String getPauselessTableDebugInfo(
}
}
+
+ @GET
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("/tables/{tableName}/watermarks")
Review Comment:
the endpoints name is not intuitive. shall we rename it to
"@Path("/tables/{tableName}/consumptionWatermark"
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]
deemoliu commented on code in PR #17235:
URL: https://github.com/apache/pinot/pull/17235#discussion_r2548132800
##
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/CopyTablePayload.java:
##
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api.resources;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonGetter;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.util.Map;
+import javax.annotation.Nullable;
+
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class CopyTablePayload {
+
+ String _sourceClusterUri;
Review Comment:
private?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]
deemoliu commented on code in PR #17235:
URL: https://github.com/apache/pinot/pull/17235#discussion_r2548127822
##
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/CopyTablePayload.java:
##
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api.resources;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonGetter;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.util.Map;
+import javax.annotation.Nullable;
+
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class CopyTablePayload {
+
+ String _sourceClusterUri;
+ Map _headers;
+ /**
+ * Broker tenant for the new table.
+ * MUST NOT contain the tenant type suffix, i.e. _BROKER.
+ */
+ String _brokerTenant;
+ /**
+ * Server tenant for the new table.
+ * MUST NOT contain the tenant type suffix, i.e. _REALTIME or _OFFLINE.
+ */
+ String _serverTenant;
+
+ Map _tagPoolReplacementMap;
+
+ @JsonCreator
+ public CopyTablePayload(@JsonProperty(value = "tableName", required = true)
String tableName,
+ @JsonProperty(value = "sourceClusterUri", required = true) String
sourceClusterUri,
+ @JsonProperty("sourceClusterHeaders") Map headers,
+ @JsonProperty(value = "brokerTenant", required = true) String
brokerTenant,
+ @JsonProperty(value = "serverTenant", required = true) String
serverTenant,
+ @JsonProperty("tagPoolReplacementMap") @Nullable Map
tagPoolReplacementMap) {
+_sourceClusterUri = sourceClusterUri;
+_headers = headers;
Review Comment:
tableName is unused?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]
wirybeaver commented on PR #17235: URL: https://github.com/apache/pinot/pull/17235#issuecomment-3554733903 Unit Test Failure: seems that the error is not caused by my PR. Checking. PinotHelixResourceManagerStatelessTest.testGetLiveBrokers:383 ยป InvalidTableConfig No schema defined for table: testTable_OFFLINE -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]
codecov-commenter commented on PR #17235: URL: https://github.com/apache/pinot/pull/17235#issuecomment-3551422628 ### :x: 3 Tests Failed: | Tests completed | Failed | Passed | Skipped | |---|---|---|---| | 7180 | 3 | 7177 | 0 | View the top 3 failed test(s) by shortest run time > org.apache.pinot.controller.helix.core.PinotHelixResourceManagerStatelessTest::testGetLiveBrokers > Stack Traces | 0.006s run time > > > No schema defined for table: testTable_OFFLINE > > > org.apache.pinot.controller.helix.core.PinotHelixResourceManagerStatelessTest::testGetLiveBrokers > Stack Traces | 0.006s run time > > > No schema defined for table: testTable_OFFLINE > > > org.apache.pinot.controller.helix.core.PinotHelixResourceManagerStatelessTest::testGetLiveBrokers > Stack Traces | 0.009s run time > > > No schema defined for table: testTable_OFFLINE > > To view more test analytics, go to the [Test Analytics Dashboard](https://app.codecov.io/gh/apache/pinot/tests/wirybeaver%3Aoss%2FtableCopyConsuming) ๐ Got 3 mins? [Take this short survey](https://forms.gle/22i53Qa1CySZjA6c7) to help us improve Test Analytics. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
