Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]

2026-01-27 Thread via GitHub


wirybeaver commented on PR #17235:
URL: https://github.com/apache/pinot/pull/17235#issuecomment-3807462325

   > Can you please create a parent issue along with the design doc, and then 
link all the PRs to the parent issue? It is easier to track all the PRs under 
this topic
   
   Yeah, the parent issue was created before: 
https://github.com/apache/pinot/issues/16563


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]

2026-01-27 Thread via GitHub


Jackie-Jiang commented on PR #17235:
URL: https://github.com/apache/pinot/pull/17235#issuecomment-3807332593

   Can you please create a parent issue along with the design doc, and then 
link all the PRs to the parent issue? It is easier to track all the PRs under 
this topic


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]

2026-01-15 Thread via GitHub


chenboat merged PR #17235:
URL: https://github.com/apache/pinot/pull/17235


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]

2026-01-14 Thread via GitHub


deemoliu commented on PR #17235:
URL: https://github.com/apache/pinot/pull/17235#issuecomment-3750860005

   LGTM


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]

2026-01-09 Thread via GitHub


wirybeaver commented on code in PR #17235:
URL: https://github.com/apache/pinot/pull/17235#discussion_r2677047497


##
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##
@@ -1729,8 +1734,21 @@ public void updateDatabaseConfig(DatabaseConfig 
databaseConfig) {
*/
   public void addTable(TableConfig tableConfig)
   throws IOException {
+addTable(tableConfig, Collections.emptyList());
+  }
+
+  /**
+   * Performs validations of table config and adds the table to zookeeper

Review Comment:
   Oh, I just copy the javadoc of original addTable.
   
/**
  * Performs validations of table config and adds the table to zookeeper
  * @throws InvalidTableConfigException if validations fail
  * @throws TableAlreadyExistsException for offline tables only if the 
table already exists
  */
 public void addTable(TableConfig tableConfig)
 throws IOException {
   addTable(tableConfig, Collections.emptyList());
 }



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]

2026-01-09 Thread via GitHub


wirybeaver commented on code in PR #17235:
URL: https://github.com/apache/pinot/pull/17235#discussion_r2677047071


##
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PartitionGroupInfo.java:
##
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.realtime;
+
+import org.apache.pinot.controller.helix.core.WatermarkInductionResult;
+import org.apache.pinot.spi.stream.LongMsgOffset;
+import org.apache.pinot.spi.stream.PartitionGroupMetadata;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
+
+
+/**
+ * The {@code PartitionGroupInfo} class represents the metadata and sequence 
number for a partition group.
+ * It encapsulates the {@link PartitionGroupMetadata} and the sequence number 
associated with it.
+ */
+public class PartitionGroupInfo {

Review Comment:
   Replace with Pair class



##
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##
@@ -1729,8 +1734,21 @@ public void updateDatabaseConfig(DatabaseConfig 
databaseConfig) {
*/
   public void addTable(TableConfig tableConfig)
   throws IOException {
+addTable(tableConfig, Collections.emptyList());
+  }
+
+  /**
+   * Performs validations of table config and adds the table to zookeeper

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]

2026-01-08 Thread via GitHub


wirybeaver commented on code in PR #17235:
URL: https://github.com/apache/pinot/pull/17235#discussion_r2673535750


##
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PartitionGroupInfo.java:
##
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.realtime;
+
+import org.apache.pinot.controller.helix.core.WatermarkInductionResult;
+import org.apache.pinot.spi.stream.LongMsgOffset;
+import org.apache.pinot.spi.stream.PartitionGroupMetadata;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
+
+
+/**
+ * The {@code PartitionGroupInfo} class represents the metadata and sequence 
number for a partition group.
+ * It encapsulates the {@link PartitionGroupMetadata} and the sequence number 
associated with it.
+ */
+public class PartitionGroupInfo {

Review Comment:
   From a pure Kafka consumer point of view, the segment sequence number 
doesn't seem to belong to a PartitionGroupMeta? I
   I am also concerned that the change of the constructor and adding the 
sequence number might cause review rejections. Many places already use the 
class PartitonGroupMeta. Adding a wrapper class would bring benefit of less 
existing code changes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]

2026-01-08 Thread via GitHub


wirybeaver commented on code in PR #17235:
URL: https://github.com/apache/pinot/pull/17235#discussion_r2673535750


##
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PartitionGroupInfo.java:
##
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.realtime;
+
+import org.apache.pinot.controller.helix.core.WatermarkInductionResult;
+import org.apache.pinot.spi.stream.LongMsgOffset;
+import org.apache.pinot.spi.stream.PartitionGroupMetadata;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
+
+
+/**
+ * The {@code PartitionGroupInfo} class represents the metadata and sequence 
number for a partition group.
+ * It encapsulates the {@link PartitionGroupMetadata} and the sequence number 
associated with it.
+ */
+public class PartitionGroupInfo {

Review Comment:
   From a pure Kafka consumer point of view, the segment sequence number 
doesn't seem to belong to a PartitionGroupMeta? I
   I am also concerned that the change of the constructor and adding the 
sequence number might cause review rejections. Adding a wrapper class would 
bring benefit of less existing code changes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]

2026-01-08 Thread via GitHub


chenboat commented on code in PR #17235:
URL: https://github.com/apache/pinot/pull/17235#discussion_r2673507545


##
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PartitionGroupInfo.java:
##
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.realtime;
+
+import org.apache.pinot.controller.helix.core.WatermarkInductionResult;
+import org.apache.pinot.spi.stream.LongMsgOffset;
+import org.apache.pinot.spi.stream.PartitionGroupMetadata;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
+
+
+/**
+ * The {@code PartitionGroupInfo} class represents the metadata and sequence 
number for a partition group.
+ * It encapsulates the {@link PartitionGroupMetadata} and the sequence number 
associated with it.
+ */
+public class PartitionGroupInfo {

Review Comment:
   why do we need any wrapper class like this? Can we reuse 
PartitionGroupMetadata or add to it instead of creating a new class?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]

2026-01-08 Thread via GitHub


wirybeaver commented on code in PR #17235:
URL: https://github.com/apache/pinot/pull/17235#discussion_r2673501184


##
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java:
##
@@ -449,6 +451,25 @@ public String getPauselessTableDebugInfo(
 }
   }
 
+
+  @GET
+  @Produces(MediaType.APPLICATION_JSON)
+  @Path("/tables/{tableName}/consumerWatermarks")
+  @Authorize(targetType = TargetType.TABLE, paramName = "tableName", action = 
Actions.Table.GET_IDEAL_STATE)
+  @ApiOperation(value = "Get table ideal state", notes = "Get table ideal 
state")

Review Comment:
   good catch.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]

2026-01-08 Thread via GitHub


chenboat commented on code in PR #17235:
URL: https://github.com/apache/pinot/pull/17235#discussion_r2673428063


##
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##
@@ -1729,8 +1734,21 @@ public void updateDatabaseConfig(DatabaseConfig 
databaseConfig) {
*/
   public void addTable(TableConfig tableConfig)
   throws IOException {
+addTable(tableConfig, Collections.emptyList());
+  }
+
+  /**
+   * Performs validations of table config and adds the table to zookeeper

Review Comment:
   Is there any restriction on this API? Realtime table only? Can it add upsert 
tables? If so, please clarify in the javadoc.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]

2026-01-08 Thread via GitHub


chenboat commented on code in PR #17235:
URL: https://github.com/apache/pinot/pull/17235#discussion_r2673411779


##
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java:
##
@@ -449,6 +451,25 @@ public String getPauselessTableDebugInfo(
 }
   }
 
+
+  @GET
+  @Produces(MediaType.APPLICATION_JSON)
+  @Path("/tables/{tableName}/consumerWatermarks")
+  @Authorize(targetType = TargetType.TABLE, paramName = "tableName", action = 
Actions.Table.GET_IDEAL_STATE)
+  @ApiOperation(value = "Get table ideal state", notes = "Get table ideal 
state")

Review Comment:
   Why is it about "table ideal state"? Is it typo?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]

2026-01-07 Thread via GitHub


wirybeaver commented on code in PR #17235:
URL: https://github.com/apache/pinot/pull/17235#discussion_r2670884310


##
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##
@@ -4715,6 +4739,51 @@ public QueryWorkloadManager getQueryWorkloadManager() {
 return _queryWorkloadManager;
   }
 
+  /**
+   * Retrieves the consumer watermark for a given real-time table.
+   * The watermark represents the next offset to be consumed for each 
partition group.
+   * If the latest segment of a partition is in a DONE state, the watermark is 
the end offset of the completed segment.
+   * Otherwise, it is the start offset of the current consuming segment.
+   *
+   * @param tableName The name of the real-time table (without type suffix).
+   * @return A {@link WatermarkInductionResult} containing a list of 
watermarks for each partition group.
+   * @throws TableNotFoundException if the specified real-time table does not 
exist.
+   * @throws IllegalStateException if the IdealState for the table is not 
found.
+   */
+  public WatermarkInductionResult getConsumerWatermarks(String tableName) 
throws TableNotFoundException {
+String tableNameWithType = 
TableNameBuilder.REALTIME.tableNameWithType(tableName);
+if (!hasRealtimeTable(tableName)) {
+  throw new TableNotFoundException("Table " + tableNameWithType + " does 
not exist");
+}
+TableConfig tableConfig = getTableConfig(tableNameWithType);
+Preconditions.checkNotNull(tableConfig, "Table " + tableNameWithType + 
"exists but null tableConfig");
+List streamConfigs = 
IngestionConfigUtils.getStreamConfigs(tableConfig);
+IdealState idealState = _helixAdmin
+.getResourceIdealState(getHelixClusterName(), tableNameWithType);
+if (idealState == null) {
+  throw new IllegalStateException("Null IdealState of the table " + 
tableNameWithType);
+}
+List lst = _pinotLLCRealtimeSegmentManager
+.getPartitionGroupConsumptionStatusList(idealState, streamConfigs);
+List watermarks = 
lst.stream().map(status -> {
+  int seq = status.getSequenceNumber();
+  long startOffset;
+  try {
+if ("DONE".equalsIgnoreCase(status.getStatus())) {
+  Preconditions.checkNotNull(status.getEndOffset());
+  startOffset = 
NumberUtils.parseLong(status.getEndOffset().toString());
+  seq++;
+} else {

Review Comment:
   The testGetConsumerWatermarks already contains both cases.
   
   ```
   PartitionGroupConsumptionStatus doneStatus = new 
PartitionGroupConsumptionStatus(0, 100,
   new LongMsgOffset(123), new LongMsgOffset(456), "done");
   PartitionGroupConsumptionStatus inProgressStatus =
   new PartitionGroupConsumptionStatus(1, 200, new LongMsgOffset(789), 
null, "IN_PROGRESS");
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]

2026-01-07 Thread via GitHub


abhishekbafna commented on code in PR #17235:
URL: https://github.com/apache/pinot/pull/17235#discussion_r2668973896


##
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/CopyTablePayload.java:
##
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api.resources;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonGetter;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.util.Map;
+import javax.annotation.Nullable;
+
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class CopyTablePayload {
+
+  private String _sourceClusterUri;
+  private Map _headers;
+  /**
+   * Broker tenant for the new table.
+   * MUST NOT contain the tenant type suffix, i.e. _BROKER.
+   */
+  private String _brokerTenant;
+  /**
+   * Server tenant for the new table.
+   * MUST NOT contain the tenant type suffix, i.e. _REALTIME or _OFFLINE.
+   */
+  private String _serverTenant;
+
+  /**
+   * The instanceAssignmentConfig's tagPoolConfig contains full tenant name. 
We will use this field to let user specify
+   * the replacement relation from source cluster's full tenant to target 
cluster's full tenant.
+   */
+  private Map _tagPoolReplacementMap;
+
+  private boolean _verbose = false;
+  private boolean _dryRun = true;

Review Comment:
   These should be API parameters and not copy payload. The copy payload should 
only contain table config specific key-value pairs only.



##
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##
@@ -283,6 +290,136 @@ public ConfigSuccessResponse addTable(String 
tableConfigStr,
 }
   }
 
+  @POST
+  @Path("/tables/{tableName}/copy")
+  @Authorize(targetType = TargetType.TABLE, action = 
Actions.Table.CREATE_TABLE)
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Copy a table's schema and config from another 
cluster", notes = "Non upsert table only")
+  public CopyTableResponse copyTable(
+  @ApiParam(value = "Name of the table", required = true) 
@PathParam("tableName") String tableName, String payload,
+  @Context HttpHeaders headers) {
+try {
+  LOGGER.info("[copyTable] received request for table: {}, payload: {}", 
tableName, payload);
+  tableName = DatabaseUtils.translateTableName(tableName, headers);
+
+  if 
(_pinotHelixResourceManager.getTableConfig(TableNameBuilder.REALTIME.tableNameWithType(tableName))
 != null
+  || 
_pinotHelixResourceManager.getTableConfig(TableNameBuilder.OFFLINE.tableNameWithType(tableName))
 != null) {
+throw new TableAlreadyExistsException("Table config for " + tableName
++ " already exists. If this is unexpected, try deleting the table 
to remove all metadata associated"
++ " with it before attempting to recreate.");
+  }
+
+  CopyTablePayload copyTablePayload = JsonUtils.stringToObject(payload, 
CopyTablePayload.class);
+  String sourceControllerUri = copyTablePayload.getSourceClusterUri();
+  Map requestHeaders = copyTablePayload.getHeaders();
+
+  LOGGER.info("[copyTable] Start copying table: {} from source: {}", 
tableName, sourceControllerUri);
+
+  ControllerRequestURLBuilder urlBuilder = 
ControllerRequestURLBuilder.baseUrl(sourceControllerUri);
+
+  URI schemaUri = new URI(urlBuilder.forTableSchemaGet(tableName));
+  SimpleHttpResponse schemaResponse = HttpClient.wrapAndThrowHttpException(
+  HttpClient.getInstance().sendGetRequest(schemaUri, requestHeaders));
+  String schemaJson = schemaResponse.getResponse();
+  Schema schema = Schema.fromString(schemaJson);
+
+  URI tableConfigUri = new URI(urlBuilder.forTableGet(tableName));
+  SimpleHttpResponse tableConfigResponse = 
HttpClient.wrapAndThrowHttpException(
+  HttpClient.getInstance().sendGetRequest(tableConfigUri, 
requestHeaders));
+  String tableConfigJson = tableConfigResponse.getResponse();
+  LOGGER.info("[copyTable] Fetched table config for table: {}", tableName);
+  JsonNode

Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]

2026-01-06 Thread via GitHub


wirybeaver commented on code in PR #17235:
URL: https://github.com/apache/pinot/pull/17235#discussion_r2666991674


##
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java:
##
@@ -449,6 +451,25 @@ public String getPauselessTableDebugInfo(
 }
   }
 
+
+  @GET
+  @Produces(MediaType.APPLICATION_JSON)
+  @Path("/tables/{tableName}/consumerWatermarks")
+  @Authorize(targetType = TargetType.TABLE, paramName = "tableName", action = 
Actions.Table.GET_IDEAL_STATE)
+  @ApiOperation(value = "Get table ideal state", notes = "Get table ideal 
state")
+  public WatermarkInductionResult inductConsumingWatermark(
+  @ApiParam(value = "Name of the realtime table", required = true) 
@PathParam("tableName") String tableName,
+  @Context HttpHeaders headers) {
+try {
+  String table = DatabaseUtils.translateTableName(tableName, headers);

Review Comment:
   the segment itself has cdc, likely include the table name



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]

2026-01-06 Thread via GitHub


wirybeaver commented on code in PR #17235:
URL: https://github.com/apache/pinot/pull/17235#discussion_r2666991156


##
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##
@@ -283,6 +289,109 @@ public ConfigSuccessResponse addTable(String 
tableConfigStr,
 }
   }
 
+  @POST
+  @Path("/tables/{tableName}/copy")
+  @Authorize(targetType = TargetType.TABLE, action = 
Actions.Table.CREATE_TABLE)
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Copy a table's schema and config from another 
cluster", notes = "Non upsert table only")
+  public CopyTableResponse copyTable(

Review Comment:
   Yeah, already support the dryrun mode with a boolean flag in the input 
payload



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]

2026-01-06 Thread via GitHub


wirybeaver commented on PR #17235:
URL: https://github.com/apache/pinot/pull/17235#issuecomment-3717213852

   MSE against 1.3 failed but succeed against 1.4 and master


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]

2026-01-06 Thread via GitHub


wirybeaver commented on PR #17235:
URL: https://github.com/apache/pinot/pull/17235#issuecomment-3717126545

   @abhishekbafna  I add the dryRun as a boolean flag in the payload. The reply 
contains schema, modified table config and watermarkResult as well. The write 
operation (schema and table config addition) is moved to the end.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]

2026-01-06 Thread via GitHub


wirybeaver commented on code in PR #17235:
URL: https://github.com/apache/pinot/pull/17235#discussion_r2666889807


##
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##
@@ -283,6 +289,118 @@ public ConfigSuccessResponse addTable(String 
tableConfigStr,
 }
   }
 
+  @POST
+  @Path("/tables/{tableName}/copy")
+  @Authorize(targetType = TargetType.TABLE, action = 
Actions.Table.CREATE_TABLE)
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Copy a table's schema and config from another 
cluster", notes = "Non upsert table only")
+  public CopyTableResponse copyTable(
+  @ApiParam(value = "Name of the table", required = true) 
@PathParam("tableName") String tableName, String payload,
+  @Context HttpHeaders headers) {
+try {
+  LOGGER.info("[copyTable] received request for table: {}, payload: {}", 
tableName, payload);
+  tableName = DatabaseUtils.translateTableName(tableName, headers);
+  CopyTablePayload copyTablePayload = JsonUtils.stringToObject(payload, 
CopyTablePayload.class);
+  String sourceControllerUri = copyTablePayload.getSourceClusterUri();
+  Map requestHeaders = copyTablePayload.getHeaders();
+  String brokerTenant = copyTablePayload.getBrokerTenant();
+  String serverTenant = copyTablePayload.getServerTenant();
+  Map tagReplacementMap = 
copyTablePayload.getTagPoolReplacementMap();
+
+  LOGGER.info("[copyTable] Start copying table: {} from source: {}", 
tableName, sourceControllerUri);
+
+  // Fetch and add schema
+  URI schemaUri = new URI(sourceControllerUri + "/tables/" + tableName + 
"/schema");
+  SimpleHttpResponse schemaResponse = HttpClient.wrapAndThrowHttpException(
+  HttpClient.getInstance().sendGetRequest(schemaUri, requestHeaders));
+  String schemaJson = schemaResponse.getResponse();
+  Schema schema = Schema.fromString(schemaJson);
+  _pinotHelixResourceManager.addSchema(schema, true, false);
+  LOGGER.info("[copyTable] Successfully added schema for table: {}, 
schema: {}", tableName, schema);
+
+  // Fetch and add table configs
+  URI tableConfigUri = new URI(sourceControllerUri + "/tables/" + 
tableName);
+  SimpleHttpResponse tableConfigResponse = 
HttpClient.wrapAndThrowHttpException(
+  HttpClient.getInstance().sendGetRequest(tableConfigUri, 
requestHeaders));
+  String tableConfigJson = tableConfigResponse.getResponse();
+  LOGGER.info("[copyTable] Fetched table config for table: {}, 
tableConfig: {}", tableName, tableConfigJson);
+  JsonNode tableConfigNode = JsonUtils.stringToJsonNode(tableConfigJson);
+
+  boolean hasOffline = tableConfigNode.has(TableType.OFFLINE.name());
+  if (tableConfigNode.has(TableType.REALTIME.name())) {
+ObjectNode realtimeTableConfigNode = (ObjectNode) 
tableConfigNode.get(TableType.REALTIME.name());
+tweakRealtimeTableConfig(realtimeTableConfigNode, brokerTenant, 
serverTenant, tagReplacementMap);
+TableConfig realtimeTableConfig = 
JsonUtils.jsonNodeToObject(realtimeTableConfigNode, TableConfig.class);
+if (realtimeTableConfig.getUpsertConfig() != null) {
+  return new CopyTableResponse("fail", "upsert table copy not 
supported");
+}
+LOGGER.info("[copyTable] Successfully fetched and tweaked table config 
for table: {}, tableConfig: {}",
+tableName, realtimeTableConfig.toString());
+
+URI watermarkUri = new URI(sourceControllerUri + "/tables/" + 
tableName + "/consumerWatermarks");
+SimpleHttpResponse watermarkResponse = 
HttpClient.wrapAndThrowHttpException(
+HttpClient.getInstance().sendGetRequest(watermarkUri, 
requestHeaders));
+String watermarkJson = watermarkResponse.getResponse();
+LOGGER.info("[copyTable] Fetched watermarks for table: {}. Result: 
{}", tableName, watermarkJson);
+WatermarkInductionResult watermarkInductionResult =
+JsonUtils.stringToObject(watermarkJson, 
WatermarkInductionResult.class);
+
+List partitionGroupInfos = 
watermarkInductionResult.getWatermarks().stream()
+.map(PartitionGroupInfo::from)
+.collect(Collectors.toList());
+
+// Add the table with designated starting kafka offset and segment 
sequence number to create consuming segments
+_pinotHelixResourceManager.addTable(realtimeTableConfig, 
partitionGroupInfos);
+if (hasOffline) {
+  return new CopyTableResponse("warn", "detect offline; copy real-time 
segments only");
+}
+return new CopyTableResponse("success", "");
+  }
+  return new CopyTableResponse("fail", "copying offline table's segments 
is not supported yet");

Review Comment:
   yeah, throw exception



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comme

Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]

2026-01-06 Thread via GitHub


wirybeaver commented on code in PR #17235:
URL: https://github.com/apache/pinot/pull/17235#discussion_r2666889336


##
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##
@@ -283,6 +289,118 @@ public ConfigSuccessResponse addTable(String 
tableConfigStr,
 }
   }
 
+  @POST
+  @Path("/tables/{tableName}/copy")
+  @Authorize(targetType = TargetType.TABLE, action = 
Actions.Table.CREATE_TABLE)
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Copy a table's schema and config from another 
cluster", notes = "Non upsert table only")
+  public CopyTableResponse copyTable(
+  @ApiParam(value = "Name of the table", required = true) 
@PathParam("tableName") String tableName, String payload,
+  @Context HttpHeaders headers) {
+try {
+  LOGGER.info("[copyTable] received request for table: {}, payload: {}", 
tableName, payload);
+  tableName = DatabaseUtils.translateTableName(tableName, headers);
+  CopyTablePayload copyTablePayload = JsonUtils.stringToObject(payload, 
CopyTablePayload.class);
+  String sourceControllerUri = copyTablePayload.getSourceClusterUri();
+  Map requestHeaders = copyTablePayload.getHeaders();
+  String brokerTenant = copyTablePayload.getBrokerTenant();
+  String serverTenant = copyTablePayload.getServerTenant();
+  Map tagReplacementMap = 
copyTablePayload.getTagPoolReplacementMap();
+
+  LOGGER.info("[copyTable] Start copying table: {} from source: {}", 
tableName, sourceControllerUri);
+
+  // Fetch and add schema
+  URI schemaUri = new URI(sourceControllerUri + "/tables/" + tableName + 
"/schema");
+  SimpleHttpResponse schemaResponse = HttpClient.wrapAndThrowHttpException(
+  HttpClient.getInstance().sendGetRequest(schemaUri, requestHeaders));
+  String schemaJson = schemaResponse.getResponse();
+  Schema schema = Schema.fromString(schemaJson);
+  _pinotHelixResourceManager.addSchema(schema, true, false);
+  LOGGER.info("[copyTable] Successfully added schema for table: {}, 
schema: {}", tableName, schema);
+
+  // Fetch and add table configs
+  URI tableConfigUri = new URI(sourceControllerUri + "/tables/" + 
tableName);
+  SimpleHttpResponse tableConfigResponse = 
HttpClient.wrapAndThrowHttpException(
+  HttpClient.getInstance().sendGetRequest(tableConfigUri, 
requestHeaders));
+  String tableConfigJson = tableConfigResponse.getResponse();
+  LOGGER.info("[copyTable] Fetched table config for table: {}, 
tableConfig: {}", tableName, tableConfigJson);
+  JsonNode tableConfigNode = JsonUtils.stringToJsonNode(tableConfigJson);
+
+  boolean hasOffline = tableConfigNode.has(TableType.OFFLINE.name());
+  if (tableConfigNode.has(TableType.REALTIME.name())) {
+ObjectNode realtimeTableConfigNode = (ObjectNode) 
tableConfigNode.get(TableType.REALTIME.name());
+tweakRealtimeTableConfig(realtimeTableConfigNode, brokerTenant, 
serverTenant, tagReplacementMap);
+TableConfig realtimeTableConfig = 
JsonUtils.jsonNodeToObject(realtimeTableConfigNode, TableConfig.class);
+if (realtimeTableConfig.getUpsertConfig() != null) {
+  return new CopyTableResponse("fail", "upsert table copy not 
supported");
+}

Review Comment:
   yeah, you are right. the add schema is moved after validation check



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]

2026-01-06 Thread via GitHub


wirybeaver commented on code in PR #17235:
URL: https://github.com/apache/pinot/pull/17235#discussion_r2666379437


##
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##
@@ -1789,10 +1807,16 @@ public void addTable(TableConfig tableConfig)
 // Add ideal state
 _helixAdmin.addResource(_helixClusterName, tableNameWithType, 
idealState);
 LOGGER.info("Adding table {}: Added ideal state for offline table", 
tableNameWithType);
-  } else {
+  } else if (consumeMeta == null || consumeMeta.isEmpty()) {
 // Add ideal state with the first CONSUMING segment
 _pinotLLCRealtimeSegmentManager.setUpNewTable(tableConfig, idealState);
 LOGGER.info("Adding table {}: Added ideal state with first consuming 
segment", tableNameWithType);
+  } else {
+// Add ideal state with the first CONSUMING segment with designated 
partition consuming metadata
+// Add ideal state with the first CONSUMING segment
+_pinotLLCRealtimeSegmentManager.setUpNewTable(tableConfig, idealState, 
consumeMeta);
+LOGGER.info("Adding table {}: Added consuming segments ideal state 
given the designated consuming metadata",

Review Comment:
   This routine can be shared for other potential purposes. for example, a use 
can create a table with kafka offset they wanted to ingestion from



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]

2026-01-06 Thread via GitHub


abhishekbafna commented on code in PR #17235:
URL: https://github.com/apache/pinot/pull/17235#discussion_r2664718282


##
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##
@@ -283,6 +289,118 @@ public ConfigSuccessResponse addTable(String 
tableConfigStr,
 }
   }
 
+  @POST
+  @Path("/tables/{tableName}/copy")
+  @Authorize(targetType = TargetType.TABLE, action = 
Actions.Table.CREATE_TABLE)
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Copy a table's schema and config from another 
cluster", notes = "Non upsert table only")
+  public CopyTableResponse copyTable(
+  @ApiParam(value = "Name of the table", required = true) 
@PathParam("tableName") String tableName, String payload,
+  @Context HttpHeaders headers) {
+try {
+  LOGGER.info("[copyTable] received request for table: {}, payload: {}", 
tableName, payload);
+  tableName = DatabaseUtils.translateTableName(tableName, headers);
+  CopyTablePayload copyTablePayload = JsonUtils.stringToObject(payload, 
CopyTablePayload.class);
+  String sourceControllerUri = copyTablePayload.getSourceClusterUri();
+  Map requestHeaders = copyTablePayload.getHeaders();
+  String brokerTenant = copyTablePayload.getBrokerTenant();
+  String serverTenant = copyTablePayload.getServerTenant();
+  Map tagReplacementMap = 
copyTablePayload.getTagPoolReplacementMap();
+
+  LOGGER.info("[copyTable] Start copying table: {} from source: {}", 
tableName, sourceControllerUri);
+
+  // Fetch and add schema
+  URI schemaUri = new URI(sourceControllerUri + "/tables/" + tableName + 
"/schema");

Review Comment:
   Use `ControllerRequestURLBuilder` for building URLs.



##
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##
@@ -283,6 +289,118 @@ public ConfigSuccessResponse addTable(String 
tableConfigStr,
 }
   }
 
+  @POST
+  @Path("/tables/{tableName}/copy")
+  @Authorize(targetType = TargetType.TABLE, action = 
Actions.Table.CREATE_TABLE)
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Copy a table's schema and config from another 
cluster", notes = "Non upsert table only")
+  public CopyTableResponse copyTable(
+  @ApiParam(value = "Name of the table", required = true) 
@PathParam("tableName") String tableName, String payload,
+  @Context HttpHeaders headers) {
+try {
+  LOGGER.info("[copyTable] received request for table: {}, payload: {}", 
tableName, payload);
+  tableName = DatabaseUtils.translateTableName(tableName, headers);
+  CopyTablePayload copyTablePayload = JsonUtils.stringToObject(payload, 
CopyTablePayload.class);
+  String sourceControllerUri = copyTablePayload.getSourceClusterUri();
+  Map requestHeaders = copyTablePayload.getHeaders();
+  String brokerTenant = copyTablePayload.getBrokerTenant();
+  String serverTenant = copyTablePayload.getServerTenant();
+  Map tagReplacementMap = 
copyTablePayload.getTagPoolReplacementMap();
+
+  LOGGER.info("[copyTable] Start copying table: {} from source: {}", 
tableName, sourceControllerUri);
+
+  // Fetch and add schema
+  URI schemaUri = new URI(sourceControllerUri + "/tables/" + tableName + 
"/schema");
+  SimpleHttpResponse schemaResponse = HttpClient.wrapAndThrowHttpException(
+  HttpClient.getInstance().sendGetRequest(schemaUri, requestHeaders));
+  String schemaJson = schemaResponse.getResponse();
+  Schema schema = Schema.fromString(schemaJson);
+  _pinotHelixResourceManager.addSchema(schema, true, false);
+  LOGGER.info("[copyTable] Successfully added schema for table: {}, 
schema: {}", tableName, schema);
+
+  // Fetch and add table configs
+  URI tableConfigUri = new URI(sourceControllerUri + "/tables/" + 
tableName);
+  SimpleHttpResponse tableConfigResponse = 
HttpClient.wrapAndThrowHttpException(
+  HttpClient.getInstance().sendGetRequest(tableConfigUri, 
requestHeaders));
+  String tableConfigJson = tableConfigResponse.getResponse();
+  LOGGER.info("[copyTable] Fetched table config for table: {}, 
tableConfig: {}", tableName, tableConfigJson);
+  JsonNode tableConfigNode = JsonUtils.stringToJsonNode(tableConfigJson);
+
+  boolean hasOffline = tableConfigNode.has(TableType.OFFLINE.name());
+  if (tableConfigNode.has(TableType.REALTIME.name())) {
+ObjectNode realtimeTableConfigNode = (ObjectNode) 
tableConfigNode.get(TableType.REALTIME.name());
+tweakRealtimeTableConfig(realtimeTableConfigNode, brokerTenant, 
serverTenant, tagReplacementMap);
+TableConfig realtimeTableConfig = 
JsonUtils.jsonNodeToObject(realtimeTableConfigNode, TableConfig.class);
+if (realtimeTableConfig.getUpsertConfig() != null) {
+  return new CopyTableResponse("fail", "upsert table copy not 
supported");
+}

Review Comment:
   Le

Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]

2026-01-05 Thread via GitHub


wirybeaver commented on code in PR #17235:
URL: https://github.com/apache/pinot/pull/17235#discussion_r2663540038


##
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/WatermarkInductionResult.java:
##
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonGetter;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.util.List;
+
+
+/**
+ * Represents the result of a watermark induction process, containing a list 
of watermarks.
+ */
+public class WatermarkInductionResult {
+
+  private List _watermarks;
+
+  /**
+   * The @JsonCreator annotation marks this constructor to be used for 
deserializing
+   * a JSON array back into a WaterMarks object.
+   *
+   * @param watermarks The list of watermarks.
+   */
+  @JsonCreator
+  public WatermarkInductionResult(@JsonProperty("watermarks") List 
watermarks) {
+_watermarks = watermarks;
+  }
+
+  /**
+   * Gets the list of watermarks.
+   *
+   * @return The list of watermarks.
+   */
+  @JsonGetter("watermarks")
+  public List getWatermarks() {
+return _watermarks;
+  }
+
+  /**
+   * Represents a single watermark with its partition, sequence, and offset.
+   */
+  public static class Watermark {
+private long _partitionGroupId;
+private long _sequenceNumber;
+private long _offset;
+
+/**
+ * The @JsonCreator annotation tells Jackson to use this constructor to 
create
+ * a WaterMark instance from a JSON object. The @JsonProperty annotations
+ * map the keys in the JSON object to the constructor parameters.
+ *
+ * @param partitionGroupId The ID of the partition group.
+ * @param sequenceNumber The sequence number of the watermark.
+ * @param offset The offset of the watermark.
+ */
+@JsonCreator
+public Watermark(@JsonProperty("partitionGroupId") long partitionGroupId,
+@JsonProperty("sequenceNumber") long sequenceNumber, 
@JsonProperty("offset") long offset) {
+  _partitionGroupId = partitionGroupId;
+  _sequenceNumber = sequenceNumber;
+  _offset = offset;
+}
+
+/**
+ * Gets the partition group ID.
+ *
+ * @return The partition group ID.
+ */
+@JsonGetter("partitionGroupId")
+public long getPartitionGroupId() {
+  return _partitionGroupId;
+}
+
+/**
+ * Gets the sequence number.
+ *
+ * @return The sequence number.
+ */
+@JsonGetter("sequenceNumber")
+public long getSequenceNumber() {
+  return _sequenceNumber;
+}
+
+/**
+ * Gets the offset.

Review Comment:
   Add a javadoc to explain which kafka offset it represents



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]

2026-01-05 Thread via GitHub


wirybeaver commented on code in PR #17235:
URL: https://github.com/apache/pinot/pull/17235#discussion_r2663376742


##
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/WatermarkInductionResult.java:
##
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonGetter;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.util.List;
+
+
+/**
+ * Represents the result of a watermark induction process, containing a list 
of watermarks.
+ */
+public class WatermarkInductionResult {
+
+  private List _watermarks;
+
+  /**
+   * The @JsonCreator annotation marks this constructor to be used for 
deserializing
+   * a JSON array back into a WaterMarks object.
+   *
+   * @param watermarks The list of watermarks.
+   */
+  @JsonCreator
+  public WatermarkInductionResult(@JsonProperty("watermarks") List 
watermarks) {
+_watermarks = watermarks;
+  }
+
+  /**
+   * Gets the list of watermarks.
+   *
+   * @return The list of watermarks.
+   */
+  @JsonGetter("watermarks")
+  public List getWatermarks() {
+return _watermarks;
+  }
+
+  /**
+   * Represents a single watermark with its partition, sequence, and offset.
+   */
+  public static class Watermark {
+private long _partitionGroupId;
+private long _sequenceNumber;
+private long _offset;
+
+/**
+ * The @JsonCreator annotation tells Jackson to use this constructor to 
create
+ * a WaterMark instance from a JSON object. The @JsonProperty annotations
+ * map the keys in the JSON object to the constructor parameters.
+ *
+ * @param partitionGroupId The ID of the partition group.
+ * @param sequenceNumber The sequence number of the watermark.
+ * @param offset The offset of the watermark.
+ */
+@JsonCreator
+public Watermark(@JsonProperty("partitionGroupId") long partitionGroupId,
+@JsonProperty("sequenceNumber") long sequenceNumber, 
@JsonProperty("offset") long offset) {
+  _partitionGroupId = partitionGroupId;
+  _sequenceNumber = sequenceNumber;
+  _offset = offset;
+}
+
+/**
+ * Gets the partition group ID.
+ *
+ * @return The partition group ID.
+ */
+@JsonGetter("partitionGroupId")
+public long getPartitionGroupId() {
+  return _partitionGroupId;
+}
+
+/**
+ * Gets the sequence number.
+ *
+ * @return The sequence number.
+ */
+@JsonGetter("sequenceNumber")
+public long getSequenceNumber() {
+  return _sequenceNumber;
+}
+
+/**
+ * Gets the offset.

Review Comment:
   you are right



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]

2026-01-05 Thread via GitHub


wirybeaver commented on code in PR #17235:
URL: https://github.com/apache/pinot/pull/17235#discussion_r2663375714


##
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/WatermarkInductionResult.java:
##
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonGetter;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.util.List;
+
+
+/**
+ * Represents the result of a watermark induction process, containing a list 
of watermarks.
+ */
+public class WatermarkInductionResult {
+
+  private List _watermarks;
+
+  /**
+   * The @JsonCreator annotation marks this constructor to be used for 
deserializing
+   * a JSON array back into a WaterMarks object.
+   *
+   * @param watermarks The list of watermarks.
+   */
+  @JsonCreator
+  public WatermarkInductionResult(@JsonProperty("watermarks") List 
watermarks) {
+_watermarks = watermarks;
+  }
+
+  /**
+   * Gets the list of watermarks.
+   *
+   * @return The list of watermarks.
+   */
+  @JsonGetter("watermarks")
+  public List getWatermarks() {
+return _watermarks;
+  }
+
+  /**
+   * Represents a single watermark with its partition, sequence, and offset.

Review Comment:
   sure, will modify the javadoc



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]

2026-01-05 Thread via GitHub


wirybeaver commented on code in PR #17235:
URL: https://github.com/apache/pinot/pull/17235#discussion_r2663374750


##
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java:
##
@@ -449,6 +451,25 @@ public String getPauselessTableDebugInfo(
 }
   }
 
+
+  @GET
+  @Produces(MediaType.APPLICATION_JSON)
+  @Path("/tables/{tableName}/consumerWatermarks")
+  @Authorize(targetType = TargetType.TABLE, paramName = "tableName", action = 
Actions.Table.GET_IDEAL_STATE)
+  @ApiOperation(value = "Get table ideal state", notes = "Get table ideal 
state")
+  public WatermarkInductionResult inductConsumingWatermark(

Review Comment:
   I can use get.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]

2026-01-05 Thread via GitHub


wirybeaver commented on code in PR #17235:
URL: https://github.com/apache/pinot/pull/17235#discussion_r2663374219


##
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java:
##
@@ -449,6 +451,25 @@ public String getPauselessTableDebugInfo(
 }
   }
 
+
+  @GET
+  @Produces(MediaType.APPLICATION_JSON)
+  @Path("/tables/{tableName}/consumerWatermarks")

Review Comment:
   It's get instead add. The target cluster will invoke this api against the 
source cluster, the source cluster will return the current consumerWatermarks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]

2026-01-05 Thread via GitHub


chenboat commented on code in PR #17235:
URL: https://github.com/apache/pinot/pull/17235#discussion_r2663328916


##
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java:
##
@@ -449,6 +451,25 @@ public String getPauselessTableDebugInfo(
 }
   }
 
+
+  @GET
+  @Produces(MediaType.APPLICATION_JSON)
+  @Path("/tables/{tableName}/consumerWatermarks")
+  @Authorize(targetType = TargetType.TABLE, paramName = "tableName", action = 
Actions.Table.GET_IDEAL_STATE)
+  @ApiOperation(value = "Get table ideal state", notes = "Get table ideal 
state")
+  public WatermarkInductionResult inductConsumingWatermark(

Review Comment:
   Consider to use simple and straightforward verb like `add` rather than 
`induct` to easier understanding.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]

2026-01-05 Thread via GitHub


chenboat commented on code in PR #17235:
URL: https://github.com/apache/pinot/pull/17235#discussion_r2663327546


##
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java:
##
@@ -449,6 +451,25 @@ public String getPauselessTableDebugInfo(
 }
   }
 
+
+  @GET
+  @Produces(MediaType.APPLICATION_JSON)
+  @Path("/tables/{tableName}/consumerWatermarks")

Review Comment:
   better to be consistent with method name. change to addConsumingWatermarks?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]

2026-01-05 Thread via GitHub


chenboat commented on code in PR #17235:
URL: https://github.com/apache/pinot/pull/17235#discussion_r2663324436


##
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/WatermarkInductionResult.java:
##
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonGetter;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.util.List;
+
+
+/**
+ * Represents the result of a watermark induction process, containing a list 
of watermarks.
+ */
+public class WatermarkInductionResult {
+
+  private List _watermarks;
+
+  /**
+   * The @JsonCreator annotation marks this constructor to be used for 
deserializing
+   * a JSON array back into a WaterMarks object.
+   *
+   * @param watermarks The list of watermarks.
+   */
+  @JsonCreator
+  public WatermarkInductionResult(@JsonProperty("watermarks") List 
watermarks) {
+_watermarks = watermarks;
+  }
+
+  /**
+   * Gets the list of watermarks.
+   *
+   * @return The list of watermarks.
+   */
+  @JsonGetter("watermarks")
+  public List getWatermarks() {
+return _watermarks;
+  }
+
+  /**
+   * Represents a single watermark with its partition, sequence, and offset.

Review Comment:
   Here you use partition but the code variable is partitionGroupId? It needs 
to be consistent.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]

2026-01-05 Thread via GitHub


chenboat commented on code in PR #17235:
URL: https://github.com/apache/pinot/pull/17235#discussion_r2663321756


##
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/WatermarkInductionResult.java:
##
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonGetter;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.util.List;
+
+
+/**
+ * Represents the result of a watermark induction process, containing a list 
of watermarks.
+ */
+public class WatermarkInductionResult {
+
+  private List _watermarks;
+
+  /**
+   * The @JsonCreator annotation marks this constructor to be used for 
deserializing
+   * a JSON array back into a WaterMarks object.
+   *
+   * @param watermarks The list of watermarks.
+   */
+  @JsonCreator
+  public WatermarkInductionResult(@JsonProperty("watermarks") List 
watermarks) {
+_watermarks = watermarks;
+  }
+
+  /**
+   * Gets the list of watermarks.
+   *
+   * @return The list of watermarks.
+   */
+  @JsonGetter("watermarks")
+  public List getWatermarks() {
+return _watermarks;
+  }
+
+  /**
+   * Represents a single watermark with its partition, sequence, and offset.
+   */
+  public static class Watermark {
+private long _partitionGroupId;
+private long _sequenceNumber;
+private long _offset;
+
+/**
+ * The @JsonCreator annotation tells Jackson to use this constructor to 
create
+ * a WaterMark instance from a JSON object. The @JsonProperty annotations
+ * map the keys in the JSON object to the constructor parameters.
+ *
+ * @param partitionGroupId The ID of the partition group.
+ * @param sequenceNumber The sequence number of the watermark.
+ * @param offset The offset of the watermark.
+ */
+@JsonCreator
+public Watermark(@JsonProperty("partitionGroupId") long partitionGroupId,
+@JsonProperty("sequenceNumber") long sequenceNumber, 
@JsonProperty("offset") long offset) {
+  _partitionGroupId = partitionGroupId;
+  _sequenceNumber = sequenceNumber;
+  _offset = offset;
+}
+
+/**
+ * Gets the partition group ID.
+ *
+ * @return The partition group ID.
+ */
+@JsonGetter("partitionGroupId")
+public long getPartitionGroupId() {
+  return _partitionGroupId;
+}
+
+/**
+ * Gets the sequence number.
+ *
+ * @return The sequence number.
+ */
+@JsonGetter("sequenceNumber")
+public long getSequenceNumber() {
+  return _sequenceNumber;
+}
+
+/**
+ * Gets the offset.

Review Comment:
   Instead of just repeating the method name, can you be more specific? This is 
the offset of the stream partition?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]

2026-01-05 Thread via GitHub


chenboat commented on code in PR #17235:
URL: https://github.com/apache/pinot/pull/17235#discussion_r2663313232


##
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##
@@ -4737,6 +4761,40 @@ public QueryWorkloadManager getQueryWorkloadManager() {
 return _queryWorkloadManager;
   }
 
+  public WatermarkInductionResult inductConsumingWatermarks(String tableName) 
throws TableNotFoundException {

Review Comment:
   Please add javadoc and comments on this public method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]

2026-01-01 Thread via GitHub


wirybeaver commented on PR #17235:
URL: https://github.com/apache/pinot/pull/17235#issuecomment-3704082260

   Walk through the design doc with @abhishekbafna on 12/23 and aligned on the 
direction of the solution.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]

2025-12-18 Thread via GitHub


wirybeaver commented on PR #17235:
URL: https://github.com/apache/pinot/pull/17235#issuecomment-3672837020

   The unit test Set-1 fail in java 21 but succeed in java 11. The error is 
about "selectionCombineOperator Not found issue", which is nothing related to 
table copy. I just do a rebase without any conflict resolving. Previous tests 
were all successful.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]

2025-12-18 Thread via GitHub


wirybeaver commented on code in PR #17235:
URL: https://github.com/apache/pinot/pull/17235#discussion_r2632926517


##
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##
@@ -283,6 +289,109 @@ public ConfigSuccessResponse addTable(String 
tableConfigStr,
 }
   }
 
+  @POST
+  @Path("/tables/{tableName}/copy")
+  @Authorize(targetType = TargetType.TABLE, action = 
Actions.Table.CREATE_TABLE)
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Copy a table's schema and config from another 
cluster", notes = "Non upsert table only")
+  public CopyTableResponse copyTable(
+  @ApiParam(value = "Name of the table", required = true) 
@PathParam("tableName") String tableName, String payload,
+  @Context HttpHeaders headers) {
+try {
+  LOGGER.info("[copyTable] received request for table: {}, payload: {}", 
tableName, payload);
+  tableName = DatabaseUtils.translateTableName(tableName, headers);
+  CopyTablePayload copyTablePayload = JsonUtils.stringToObject(payload, 
CopyTablePayload.class);
+  String sourceControllerUri = copyTablePayload.getSourceClusterUri();
+  Map requestHeaders = copyTablePayload.getHeaders();
+  String brokerTenant = copyTablePayload.getBrokerTenant();
+  String serverTenant = copyTablePayload.getServerTenant();
+  Map tagReplacementMap = 
copyTablePayload.getTagPoolReplacementMap();
+
+  LOGGER.info("[copyTable] Start copying table: {} from source: {}", 
tableName, sourceControllerUri);
+
+  // Fetch and add schema
+  URI schemaUri = new URI(sourceControllerUri + "/tables/" + tableName + 
"/schema");
+  SimpleHttpResponse schemaResponse = HttpClient.wrapAndThrowHttpException(
+  HttpClient.getInstance().sendGetRequest(schemaUri, requestHeaders));
+  String schemaJson = schemaResponse.getResponse();
+  Schema schema = Schema.fromString(schemaJson);
+  _pinotHelixResourceManager.addSchema(schema, true, false);
+  LOGGER.info("[copyTable] Successfully added schema for table: {}, 
schema: {}", tableName, schema);
+
+  // Fetch and add table configs
+  URI tableConfigUri = new URI(sourceControllerUri + "/tables/" + 
tableName);
+  SimpleHttpResponse tableConfigResponse = 
HttpClient.wrapAndThrowHttpException(
+  HttpClient.getInstance().sendGetRequest(tableConfigUri, 
requestHeaders));
+  String tableConfigJson = tableConfigResponse.getResponse();
+  LOGGER.info("[copyTable] Fetched table config for table: {}, 
tableConfig: {}", tableName, tableConfigJson);
+  JsonNode tableConfigNode = JsonUtils.stringToJsonNode(tableConfigJson);
+
+  if (tableConfigNode.has(TableType.REALTIME.name())) {

Review Comment:
   for upsert and pure offline table, the status will be "fail" in reply. For 
hybrid table, the status will be "warn" with message specifying only realtime 
table config and segments is copied



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]

2025-12-18 Thread via GitHub


wirybeaver commented on code in PR #17235:
URL: https://github.com/apache/pinot/pull/17235#discussion_r2629468263


##
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##
@@ -283,6 +289,109 @@ public ConfigSuccessResponse addTable(String 
tableConfigStr,
 }
   }
 
+  @POST
+  @Path("/tables/{tableName}/copy")
+  @Authorize(targetType = TargetType.TABLE, action = 
Actions.Table.CREATE_TABLE)
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Copy a table's schema and config from another 
cluster", notes = "Non upsert table only")
+  public CopyTableResponse copyTable(

Review Comment:
   User can call the source clusters GET /tables//consumerWatermarks to 
get the dry run result, which will show watermarks as well as segments will be 
backfilled.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]

2025-12-18 Thread via GitHub


wirybeaver commented on code in PR #17235:
URL: https://github.com/apache/pinot/pull/17235#discussion_r2629468263


##
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##
@@ -283,6 +289,109 @@ public ConfigSuccessResponse addTable(String 
tableConfigStr,
 }
   }
 
+  @POST
+  @Path("/tables/{tableName}/copy")
+  @Authorize(targetType = TargetType.TABLE, action = 
Actions.Table.CREATE_TABLE)
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Copy a table's schema and config from another 
cluster", notes = "Non upsert table only")
+  public CopyTableResponse copyTable(

Review Comment:
   Use can call the source clusters GET /tables//consumerWatermarks to 
get the dry run result, which will show watermarks as well as segments will be 
backfilled.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]

2025-12-18 Thread via GitHub


wirybeaver commented on code in PR #17235:
URL: https://github.com/apache/pinot/pull/17235#discussion_r2630031852


##
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/CopyTablePayload.java:
##
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api.resources;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonGetter;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.util.Map;
+import javax.annotation.Nullable;
+
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class CopyTablePayload {
+
+  private String _sourceClusterUri;
+  private Map _headers;
+  /**
+   * Broker tenant for the new table.
+   * MUST NOT contain the tenant type suffix, i.e. _BROKER.
+   */
+  private String _brokerTenant;
+  /**
+   * Server tenant for the new table.
+   * MUST NOT contain the tenant type suffix, i.e. _REALTIME or _OFFLINE.
+   */
+  private String _serverTenant;
+
+  private Map _tagPoolReplacementMap;

Review Comment:
   the instanceAssignmentConfig's tagPoolConfig contains full tenant name. We 
will use this field to let user specify the replacement relation from source 
cluster's full tenant to target cluster's full tenant.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]

2025-12-18 Thread via GitHub


wirybeaver commented on code in PR #17235:
URL: https://github.com/apache/pinot/pull/17235#discussion_r2630025570


##
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java:
##
@@ -449,6 +451,25 @@ public String getPauselessTableDebugInfo(
 }
   }
 
+
+  @GET
+  @Produces(MediaType.APPLICATION_JSON)
+  @Path("/tables/{tableName}/consumerWatermarks")
+  @Authorize(targetType = TargetType.TABLE, paramName = "tableName", action = 
Actions.Table.GET_IDEAL_STATE)
+  @ApiOperation(value = "Get table ideal state", notes = "Get table ideal 
state")
+  public WatermarkInductionResult inductConsumingWatermark(
+  @ApiParam(value = "Name of the realtime table", required = true) 
@PathParam("tableName") String tableName,
+  @Context HttpHeaders headers) {
+try {
+  String table = DatabaseUtils.translateTableName(tableName, headers);

Review Comment:
   you are right.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]

2025-12-18 Thread via GitHub


wirybeaver commented on code in PR #17235:
URL: https://github.com/apache/pinot/pull/17235#discussion_r2630024665


##
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##
@@ -283,6 +289,109 @@ public ConfigSuccessResponse addTable(String 
tableConfigStr,
 }
   }
 
+  @POST
+  @Path("/tables/{tableName}/copy")
+  @Authorize(targetType = TargetType.TABLE, action = 
Actions.Table.CREATE_TABLE)
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Copy a table's schema and config from another 
cluster", notes = "Non upsert table only")
+  public CopyTableResponse copyTable(
+  @ApiParam(value = "Name of the table", required = true) 
@PathParam("tableName") String tableName, String payload,
+  @Context HttpHeaders headers) {
+try {
+  LOGGER.info("[copyTable] received request for table: {}, payload: {}", 
tableName, payload);
+  tableName = DatabaseUtils.translateTableName(tableName, headers);
+  CopyTablePayload copyTablePayload = JsonUtils.stringToObject(payload, 
CopyTablePayload.class);
+  String sourceControllerUri = copyTablePayload.getSourceClusterUri();
+  Map requestHeaders = copyTablePayload.getHeaders();
+  String brokerTenant = copyTablePayload.getBrokerTenant();
+  String serverTenant = copyTablePayload.getServerTenant();
+  Map tagReplacementMap = 
copyTablePayload.getTagPoolReplacementMap();
+
+  LOGGER.info("[copyTable] Start copying table: {} from source: {}", 
tableName, sourceControllerUri);
+
+  // Fetch and add schema
+  URI schemaUri = new URI(sourceControllerUri + "/tables/" + tableName + 
"/schema");
+  SimpleHttpResponse schemaResponse = HttpClient.wrapAndThrowHttpException(
+  HttpClient.getInstance().sendGetRequest(schemaUri, requestHeaders));
+  String schemaJson = schemaResponse.getResponse();
+  Schema schema = Schema.fromString(schemaJson);
+  _pinotHelixResourceManager.addSchema(schema, true, false);

Review Comment:
   if the tableconfig already exists, an exception will throw. if the schema 
exist, it won't throw exception



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]

2025-12-18 Thread via GitHub


wirybeaver commented on code in PR #17235:
URL: https://github.com/apache/pinot/pull/17235#discussion_r2630021797


##
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##
@@ -283,6 +289,109 @@ public ConfigSuccessResponse addTable(String 
tableConfigStr,
 }
   }
 
+  @POST
+  @Path("/tables/{tableName}/copy")
+  @Authorize(targetType = TargetType.TABLE, action = 
Actions.Table.CREATE_TABLE)
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Copy a table's schema and config from another 
cluster", notes = "Non upsert table only")
+  public CopyTableResponse copyTable(
+  @ApiParam(value = "Name of the table", required = true) 
@PathParam("tableName") String tableName, String payload,
+  @Context HttpHeaders headers) {
+try {
+  LOGGER.info("[copyTable] received request for table: {}, payload: {}", 
tableName, payload);
+  tableName = DatabaseUtils.translateTableName(tableName, headers);
+  CopyTablePayload copyTablePayload = JsonUtils.stringToObject(payload, 
CopyTablePayload.class);
+  String sourceControllerUri = copyTablePayload.getSourceClusterUri();
+  Map requestHeaders = copyTablePayload.getHeaders();
+  String brokerTenant = copyTablePayload.getBrokerTenant();
+  String serverTenant = copyTablePayload.getServerTenant();
+  Map tagReplacementMap = 
copyTablePayload.getTagPoolReplacementMap();
+
+  LOGGER.info("[copyTable] Start copying table: {} from source: {}", 
tableName, sourceControllerUri);
+
+  // Fetch and add schema
+  URI schemaUri = new URI(sourceControllerUri + "/tables/" + tableName + 
"/schema");
+  SimpleHttpResponse schemaResponse = HttpClient.wrapAndThrowHttpException(
+  HttpClient.getInstance().sendGetRequest(schemaUri, requestHeaders));
+  String schemaJson = schemaResponse.getResponse();
+  Schema schema = Schema.fromString(schemaJson);
+  _pinotHelixResourceManager.addSchema(schema, true, false);
+  LOGGER.info("[copyTable] Successfully added schema for table: {}, 
schema: {}", tableName, schema);
+
+  // Fetch and add table configs
+  URI tableConfigUri = new URI(sourceControllerUri + "/tables/" + 
tableName);
+  SimpleHttpResponse tableConfigResponse = 
HttpClient.wrapAndThrowHttpException(
+  HttpClient.getInstance().sendGetRequest(tableConfigUri, 
requestHeaders));
+  String tableConfigJson = tableConfigResponse.getResponse();
+  LOGGER.info("[copyTable] Fetched table config for table: {}, 
tableConfig: {}", tableName, tableConfigJson);
+  JsonNode tableConfigNode = JsonUtils.stringToJsonNode(tableConfigJson);
+
+  if (tableConfigNode.has(TableType.REALTIME.name())) {
+ObjectNode realtimeTableConfigNode = (ObjectNode) 
tableConfigNode.get(TableType.REALTIME.name());
+tweakRealtimeTableConfig(realtimeTableConfigNode, brokerTenant, 
serverTenant, tagReplacementMap);
+TableConfig realtimeTableConfig = 
JsonUtils.jsonNodeToObject(realtimeTableConfigNode, TableConfig.class);
+LOGGER.info("[copyTable] Successfully fetched and tweaked table config 
for table: {}, tableConfig: {}",
+tableName, realtimeTableConfig.toString());
+
+URI watermarkUri = new URI(sourceControllerUri + "/tables/" + 
tableName + "/consumerWatermarks");
+SimpleHttpResponse watermarkResponse = 
HttpClient.wrapAndThrowHttpException(
+HttpClient.getInstance().sendGetRequest(watermarkUri, 
requestHeaders));
+String watermarkJson = watermarkResponse.getResponse();
+LOGGER.info("[copyTable] Fetched watermarks for table: {}. Result: 
{}", tableName, watermarkJson);
+WatermarkInductionResult watermarkInductionResult =
+JsonUtils.stringToObject(watermarkJson, 
WatermarkInductionResult.class);
+
+List partitionGroupInfos = 
watermarkInductionResult.getWatermarks().stream()
+.map(PartitionGroupInfo::from)
+.collect(Collectors.toList());
+
+_pinotHelixResourceManager.addTable(realtimeTableConfig, 
partitionGroupInfos);
+  }
+  LOGGER.info("[copyTable] Finished Table Config copy: {}", tableName);
+  return new CopyTableResponse("success");
+} catch (Exception e) {
+  LOGGER.error("[copyTable] Error copying table: {}", tableName, e);
+  throw new ControllerApplicationException(LOGGER, "Error copying table: " 
+ e.getMessage(),
+  Response.Status.INTERNAL_SERVER_ERROR, e);
+}
+  }
+
+  /**
+   * Tweaks the realtime table config with the given broker and server tenants.
+   *
+   * @param realtimeTableConfigNode The JSON object representing the realtime 
table config.
+   * @param brokerTenant The broker tenant to set in the config.
+   * @param serverTenant The server tenant to set in the config.
+   */
+  @VisibleForTesting
+  static void tweakRealtimeTableConfig(ObjectNode realtimeTableConfigNode, 
String brokerTenant, String server

Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]

2025-12-18 Thread via GitHub


wirybeaver commented on code in PR #17235:
URL: https://github.com/apache/pinot/pull/17235#discussion_r2630022631


##
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##
@@ -283,6 +289,109 @@ public ConfigSuccessResponse addTable(String 
tableConfigStr,
 }
   }
 
+  @POST
+  @Path("/tables/{tableName}/copy")
+  @Authorize(targetType = TargetType.TABLE, action = 
Actions.Table.CREATE_TABLE)
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Copy a table's schema and config from another 
cluster", notes = "Non upsert table only")
+  public CopyTableResponse copyTable(
+  @ApiParam(value = "Name of the table", required = true) 
@PathParam("tableName") String tableName, String payload,
+  @Context HttpHeaders headers) {
+try {
+  LOGGER.info("[copyTable] received request for table: {}, payload: {}", 
tableName, payload);
+  tableName = DatabaseUtils.translateTableName(tableName, headers);
+  CopyTablePayload copyTablePayload = JsonUtils.stringToObject(payload, 
CopyTablePayload.class);
+  String sourceControllerUri = copyTablePayload.getSourceClusterUri();
+  Map requestHeaders = copyTablePayload.getHeaders();
+  String brokerTenant = copyTablePayload.getBrokerTenant();
+  String serverTenant = copyTablePayload.getServerTenant();
+  Map tagReplacementMap = 
copyTablePayload.getTagPoolReplacementMap();
+
+  LOGGER.info("[copyTable] Start copying table: {} from source: {}", 
tableName, sourceControllerUri);
+
+  // Fetch and add schema
+  URI schemaUri = new URI(sourceControllerUri + "/tables/" + tableName + 
"/schema");
+  SimpleHttpResponse schemaResponse = HttpClient.wrapAndThrowHttpException(
+  HttpClient.getInstance().sendGetRequest(schemaUri, requestHeaders));
+  String schemaJson = schemaResponse.getResponse();
+  Schema schema = Schema.fromString(schemaJson);
+  _pinotHelixResourceManager.addSchema(schema, true, false);
+  LOGGER.info("[copyTable] Successfully added schema for table: {}, 
schema: {}", tableName, schema);
+
+  // Fetch and add table configs
+  URI tableConfigUri = new URI(sourceControllerUri + "/tables/" + 
tableName);
+  SimpleHttpResponse tableConfigResponse = 
HttpClient.wrapAndThrowHttpException(
+  HttpClient.getInstance().sendGetRequest(tableConfigUri, 
requestHeaders));
+  String tableConfigJson = tableConfigResponse.getResponse();
+  LOGGER.info("[copyTable] Fetched table config for table: {}, 
tableConfig: {}", tableName, tableConfigJson);
+  JsonNode tableConfigNode = JsonUtils.stringToJsonNode(tableConfigJson);
+
+  if (tableConfigNode.has(TableType.REALTIME.name())) {
+ObjectNode realtimeTableConfigNode = (ObjectNode) 
tableConfigNode.get(TableType.REALTIME.name());
+tweakRealtimeTableConfig(realtimeTableConfigNode, brokerTenant, 
serverTenant, tagReplacementMap);
+TableConfig realtimeTableConfig = 
JsonUtils.jsonNodeToObject(realtimeTableConfigNode, TableConfig.class);
+LOGGER.info("[copyTable] Successfully fetched and tweaked table config 
for table: {}, tableConfig: {}",
+tableName, realtimeTableConfig.toString());
+
+URI watermarkUri = new URI(sourceControllerUri + "/tables/" + 
tableName + "/consumerWatermarks");
+SimpleHttpResponse watermarkResponse = 
HttpClient.wrapAndThrowHttpException(
+HttpClient.getInstance().sendGetRequest(watermarkUri, 
requestHeaders));
+String watermarkJson = watermarkResponse.getResponse();
+LOGGER.info("[copyTable] Fetched watermarks for table: {}. Result: 
{}", tableName, watermarkJson);
+WatermarkInductionResult watermarkInductionResult =
+JsonUtils.stringToObject(watermarkJson, 
WatermarkInductionResult.class);
+
+List partitionGroupInfos = 
watermarkInductionResult.getWatermarks().stream()
+.map(PartitionGroupInfo::from)
+.collect(Collectors.toList());
+
+_pinotHelixResourceManager.addTable(realtimeTableConfig, 
partitionGroupInfos);
+  }
+  LOGGER.info("[copyTable] Finished Table Config copy: {}", tableName);
+  return new CopyTableResponse("success");
+} catch (Exception e) {
+  LOGGER.error("[copyTable] Error copying table: {}", tableName, e);
+  throw new ControllerApplicationException(LOGGER, "Error copying table: " 
+ e.getMessage(),
+  Response.Status.INTERNAL_SERVER_ERROR, e);
+}
+  }
+
+  /**
+   * Tweaks the realtime table config with the given broker and server tenants.
+   *
+   * @param realtimeTableConfigNode The JSON object representing the realtime 
table config.
+   * @param brokerTenant The broker tenant to set in the config.
+   * @param serverTenant The server tenant to set in the config.
+   */
+  @VisibleForTesting
+  static void tweakRealtimeTableConfig(ObjectNode realtimeTableConfigNode, 
String brokerTenant, String server

Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]

2025-12-18 Thread via GitHub


wirybeaver commented on code in PR #17235:
URL: https://github.com/apache/pinot/pull/17235#discussion_r2630020019


##
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##
@@ -4737,6 +4761,37 @@ public QueryWorkloadManager getQueryWorkloadManager() {
 return _queryWorkloadManager;
   }
 
+  public WatermarkInductionResult inductConsumingWatermarks(String tableName) 
throws TableNotFoundException {
+String tableNameWithType = 
TableNameBuilder.REALTIME.tableNameWithType(tableName);
+if (!hasRealtimeTable(tableName)) {
+  throw new TableNotFoundException("Table " + tableNameWithType + " does 
not exist");
+}
+TableConfig tableConfig = getTableConfig(tableNameWithType);
+Preconditions.checkNotNull(tableConfig, "Table " + tableNameWithType + 
"exists but null tableConfig");
+List streamConfigs = 
IngestionConfigUtils.getStreamConfigs(tableConfig);
+IdealState idealState = _helixAdmin
+.getResourceIdealState(getHelixClusterName(), tableNameWithType);
+List lst = _pinotLLCRealtimeSegmentManager
+.getPartitionGroupConsumptionStatusList(idealState, streamConfigs);
+List watermarks = 
lst.stream().map(status -> {
+  long seq = status.getSequenceNumber();
+  long startOffset;
+  try {
+if (status.getStatus().equalsIgnoreCase("done")) {

Review Comment:
   Yeah, good catch. done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]

2025-12-18 Thread via GitHub


wirybeaver commented on code in PR #17235:
URL: https://github.com/apache/pinot/pull/17235#discussion_r2630019232


##
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##
@@ -4737,6 +4761,37 @@ public QueryWorkloadManager getQueryWorkloadManager() {
 return _queryWorkloadManager;
   }
 
+  public WatermarkInductionResult inductConsumingWatermarks(String tableName) 
throws TableNotFoundException {
+String tableNameWithType = 
TableNameBuilder.REALTIME.tableNameWithType(tableName);
+if (!hasRealtimeTable(tableName)) {
+  throw new TableNotFoundException("Table " + tableNameWithType + " does 
not exist");
+}
+TableConfig tableConfig = getTableConfig(tableNameWithType);
+Preconditions.checkNotNull(tableConfig, "Table " + tableNameWithType + 
"exists but null tableConfig");
+List streamConfigs = 
IngestionConfigUtils.getStreamConfigs(tableConfig);
+IdealState idealState = _helixAdmin
+.getResourceIdealState(getHelixClusterName(), tableNameWithType);

Review Comment:
   add null check. done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]

2025-12-17 Thread via GitHub


wirybeaver commented on code in PR #17235:
URL: https://github.com/apache/pinot/pull/17235#discussion_r2629951634


##
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##
@@ -283,6 +289,109 @@ public ConfigSuccessResponse addTable(String 
tableConfigStr,
 }
   }
 
+  @POST
+  @Path("/tables/{tableName}/copy")
+  @Authorize(targetType = TargetType.TABLE, action = 
Actions.Table.CREATE_TABLE)
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Copy a table's schema and config from another 
cluster", notes = "Non upsert table only")
+  public CopyTableResponse copyTable(
+  @ApiParam(value = "Name of the table", required = true) 
@PathParam("tableName") String tableName, String payload,
+  @Context HttpHeaders headers) {
+try {
+  LOGGER.info("[copyTable] received request for table: {}, payload: {}", 
tableName, payload);
+  tableName = DatabaseUtils.translateTableName(tableName, headers);
+  CopyTablePayload copyTablePayload = JsonUtils.stringToObject(payload, 
CopyTablePayload.class);
+  String sourceControllerUri = copyTablePayload.getSourceClusterUri();
+  Map requestHeaders = copyTablePayload.getHeaders();
+  String brokerTenant = copyTablePayload.getBrokerTenant();
+  String serverTenant = copyTablePayload.getServerTenant();
+  Map tagReplacementMap = 
copyTablePayload.getTagPoolReplacementMap();
+
+  LOGGER.info("[copyTable] Start copying table: {} from source: {}", 
tableName, sourceControllerUri);
+
+  // Fetch and add schema
+  URI schemaUri = new URI(sourceControllerUri + "/tables/" + tableName + 
"/schema");
+  SimpleHttpResponse schemaResponse = HttpClient.wrapAndThrowHttpException(
+  HttpClient.getInstance().sendGetRequest(schemaUri, requestHeaders));
+  String schemaJson = schemaResponse.getResponse();
+  Schema schema = Schema.fromString(schemaJson);
+  _pinotHelixResourceManager.addSchema(schema, true, false);
+  LOGGER.info("[copyTable] Successfully added schema for table: {}, 
schema: {}", tableName, schema);
+
+  // Fetch and add table configs
+  URI tableConfigUri = new URI(sourceControllerUri + "/tables/" + 
tableName);
+  SimpleHttpResponse tableConfigResponse = 
HttpClient.wrapAndThrowHttpException(
+  HttpClient.getInstance().sendGetRequest(tableConfigUri, 
requestHeaders));
+  String tableConfigJson = tableConfigResponse.getResponse();
+  LOGGER.info("[copyTable] Fetched table config for table: {}, 
tableConfig: {}", tableName, tableConfigJson);
+  JsonNode tableConfigNode = JsonUtils.stringToJsonNode(tableConfigJson);
+
+  if (tableConfigNode.has(TableType.REALTIME.name())) {
+ObjectNode realtimeTableConfigNode = (ObjectNode) 
tableConfigNode.get(TableType.REALTIME.name());
+tweakRealtimeTableConfig(realtimeTableConfigNode, brokerTenant, 
serverTenant, tagReplacementMap);
+TableConfig realtimeTableConfig = 
JsonUtils.jsonNodeToObject(realtimeTableConfigNode, TableConfig.class);
+LOGGER.info("[copyTable] Successfully fetched and tweaked table config 
for table: {}, tableConfig: {}",
+tableName, realtimeTableConfig.toString());
+
+URI watermarkUri = new URI(sourceControllerUri + "/tables/" + 
tableName + "/consumerWatermarks");
+SimpleHttpResponse watermarkResponse = 
HttpClient.wrapAndThrowHttpException(
+HttpClient.getInstance().sendGetRequest(watermarkUri, 
requestHeaders));
+String watermarkJson = watermarkResponse.getResponse();
+LOGGER.info("[copyTable] Fetched watermarks for table: {}. Result: 
{}", tableName, watermarkJson);
+WatermarkInductionResult watermarkInductionResult =
+JsonUtils.stringToObject(watermarkJson, 
WatermarkInductionResult.class);
+
+List partitionGroupInfos = 
watermarkInductionResult.getWatermarks().stream()
+.map(PartitionGroupInfo::from)
+.collect(Collectors.toList());
+
+_pinotHelixResourceManager.addTable(realtimeTableConfig, 
partitionGroupInfos);
+  }
+  LOGGER.info("[copyTable] Finished Table Config copy: {}", tableName);
+  return new CopyTableResponse("success");
+} catch (Exception e) {
+  LOGGER.error("[copyTable] Error copying table: {}", tableName, e);
+  throw new ControllerApplicationException(LOGGER, "Error copying table: " 
+ e.getMessage(),
+  Response.Status.INTERNAL_SERVER_ERROR, e);
+}
+  }
+
+  /**
+   * Tweaks the realtime table config with the given broker and server tenants.
+   *
+   * @param realtimeTableConfigNode The JSON object representing the realtime 
table config.
+   * @param brokerTenant The broker tenant to set in the config.
+   * @param serverTenant The server tenant to set in the config.
+   */
+  @VisibleForTesting
+  static void tweakRealtimeTableConfig(ObjectNode realtimeTableConfigNode, 
String brokerTenant, String server

Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]

2025-12-17 Thread via GitHub


wirybeaver commented on code in PR #17235:
URL: https://github.com/apache/pinot/pull/17235#discussion_r2629944309


##
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##
@@ -283,6 +289,109 @@ public ConfigSuccessResponse addTable(String 
tableConfigStr,
 }
   }
 
+  @POST
+  @Path("/tables/{tableName}/copy")
+  @Authorize(targetType = TargetType.TABLE, action = 
Actions.Table.CREATE_TABLE)
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Copy a table's schema and config from another 
cluster", notes = "Non upsert table only")
+  public CopyTableResponse copyTable(
+  @ApiParam(value = "Name of the table", required = true) 
@PathParam("tableName") String tableName, String payload,
+  @Context HttpHeaders headers) {
+try {
+  LOGGER.info("[copyTable] received request for table: {}, payload: {}", 
tableName, payload);
+  tableName = DatabaseUtils.translateTableName(tableName, headers);
+  CopyTablePayload copyTablePayload = JsonUtils.stringToObject(payload, 
CopyTablePayload.class);
+  String sourceControllerUri = copyTablePayload.getSourceClusterUri();
+  Map requestHeaders = copyTablePayload.getHeaders();
+  String brokerTenant = copyTablePayload.getBrokerTenant();
+  String serverTenant = copyTablePayload.getServerTenant();
+  Map tagReplacementMap = 
copyTablePayload.getTagPoolReplacementMap();
+
+  LOGGER.info("[copyTable] Start copying table: {} from source: {}", 
tableName, sourceControllerUri);
+
+  // Fetch and add schema
+  URI schemaUri = new URI(sourceControllerUri + "/tables/" + tableName + 
"/schema");
+  SimpleHttpResponse schemaResponse = HttpClient.wrapAndThrowHttpException(
+  HttpClient.getInstance().sendGetRequest(schemaUri, requestHeaders));
+  String schemaJson = schemaResponse.getResponse();
+  Schema schema = Schema.fromString(schemaJson);
+  _pinotHelixResourceManager.addSchema(schema, true, false);
+  LOGGER.info("[copyTable] Successfully added schema for table: {}, 
schema: {}", tableName, schema);
+
+  // Fetch and add table configs
+  URI tableConfigUri = new URI(sourceControllerUri + "/tables/" + 
tableName);
+  SimpleHttpResponse tableConfigResponse = 
HttpClient.wrapAndThrowHttpException(
+  HttpClient.getInstance().sendGetRequest(tableConfigUri, 
requestHeaders));
+  String tableConfigJson = tableConfigResponse.getResponse();
+  LOGGER.info("[copyTable] Fetched table config for table: {}, 
tableConfig: {}", tableName, tableConfigJson);
+  JsonNode tableConfigNode = JsonUtils.stringToJsonNode(tableConfigJson);
+
+  if (tableConfigNode.has(TableType.REALTIME.name())) {
+ObjectNode realtimeTableConfigNode = (ObjectNode) 
tableConfigNode.get(TableType.REALTIME.name());
+tweakRealtimeTableConfig(realtimeTableConfigNode, brokerTenant, 
serverTenant, tagReplacementMap);
+TableConfig realtimeTableConfig = 
JsonUtils.jsonNodeToObject(realtimeTableConfigNode, TableConfig.class);
+LOGGER.info("[copyTable] Successfully fetched and tweaked table config 
for table: {}, tableConfig: {}",
+tableName, realtimeTableConfig.toString());
+
+URI watermarkUri = new URI(sourceControllerUri + "/tables/" + 
tableName + "/consumerWatermarks");
+SimpleHttpResponse watermarkResponse = 
HttpClient.wrapAndThrowHttpException(
+HttpClient.getInstance().sendGetRequest(watermarkUri, 
requestHeaders));
+String watermarkJson = watermarkResponse.getResponse();
+LOGGER.info("[copyTable] Fetched watermarks for table: {}. Result: 
{}", tableName, watermarkJson);
+WatermarkInductionResult watermarkInductionResult =
+JsonUtils.stringToObject(watermarkJson, 
WatermarkInductionResult.class);
+
+List partitionGroupInfos = 
watermarkInductionResult.getWatermarks().stream()
+.map(PartitionGroupInfo::from)
+.collect(Collectors.toList());
+
+_pinotHelixResourceManager.addTable(realtimeTableConfig, 
partitionGroupInfos);
+  }
+  LOGGER.info("[copyTable] Finished Table Config copy: {}", tableName);
+  return new CopyTableResponse("success");
+} catch (Exception e) {
+  LOGGER.error("[copyTable] Error copying table: {}", tableName, e);
+  throw new ControllerApplicationException(LOGGER, "Error copying table: " 
+ e.getMessage(),
+  Response.Status.INTERNAL_SERVER_ERROR, e);
+}
+  }
+
+  /**
+   * Tweaks the realtime table config with the given broker and server tenants.
+   *
+   * @param realtimeTableConfigNode The JSON object representing the realtime 
table config.
+   * @param brokerTenant The broker tenant to set in the config.
+   * @param serverTenant The server tenant to set in the config.
+   */
+  @VisibleForTesting
+  static void tweakRealtimeTableConfig(ObjectNode realtimeTableConfigNode, 
String brokerTenant, String server

Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]

2025-12-17 Thread via GitHub


wirybeaver commented on code in PR #17235:
URL: https://github.com/apache/pinot/pull/17235#discussion_r2629468263


##
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##
@@ -283,6 +289,109 @@ public ConfigSuccessResponse addTable(String 
tableConfigStr,
 }
   }
 
+  @POST
+  @Path("/tables/{tableName}/copy")
+  @Authorize(targetType = TargetType.TABLE, action = 
Actions.Table.CREATE_TABLE)
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Copy a table's schema and config from another 
cluster", notes = "Non upsert table only")
+  public CopyTableResponse copyTable(

Review Comment:
   I don't think we need it because it essentially do table copy into a 
different cluster. User can safely delete the table copy.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]

2025-12-17 Thread via GitHub


wirybeaver commented on code in PR #17235:
URL: https://github.com/apache/pinot/pull/17235#discussion_r2629465487


##
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##
@@ -283,6 +289,109 @@ public ConfigSuccessResponse addTable(String 
tableConfigStr,
 }
   }
 
+  @POST
+  @Path("/tables/{tableName}/copy")
+  @Authorize(targetType = TargetType.TABLE, action = 
Actions.Table.CREATE_TABLE)
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Copy a table's schema and config from another 
cluster", notes = "Non upsert table only")
+  public CopyTableResponse copyTable(
+  @ApiParam(value = "Name of the table", required = true) 
@PathParam("tableName") String tableName, String payload,
+  @Context HttpHeaders headers) {
+try {
+  LOGGER.info("[copyTable] received request for table: {}, payload: {}", 
tableName, payload);
+  tableName = DatabaseUtils.translateTableName(tableName, headers);
+  CopyTablePayload copyTablePayload = JsonUtils.stringToObject(payload, 
CopyTablePayload.class);
+  String sourceControllerUri = copyTablePayload.getSourceClusterUri();
+  Map requestHeaders = copyTablePayload.getHeaders();
+  String brokerTenant = copyTablePayload.getBrokerTenant();
+  String serverTenant = copyTablePayload.getServerTenant();
+  Map tagReplacementMap = 
copyTablePayload.getTagPoolReplacementMap();
+
+  LOGGER.info("[copyTable] Start copying table: {} from source: {}", 
tableName, sourceControllerUri);
+
+  // Fetch and add schema
+  URI schemaUri = new URI(sourceControllerUri + "/tables/" + tableName + 
"/schema");
+  SimpleHttpResponse schemaResponse = HttpClient.wrapAndThrowHttpException(
+  HttpClient.getInstance().sendGetRequest(schemaUri, requestHeaders));
+  String schemaJson = schemaResponse.getResponse();
+  Schema schema = Schema.fromString(schemaJson);
+  _pinotHelixResourceManager.addSchema(schema, true, false);
+  LOGGER.info("[copyTable] Successfully added schema for table: {}, 
schema: {}", tableName, schema);
+
+  // Fetch and add table configs
+  URI tableConfigUri = new URI(sourceControllerUri + "/tables/" + 
tableName);
+  SimpleHttpResponse tableConfigResponse = 
HttpClient.wrapAndThrowHttpException(
+  HttpClient.getInstance().sendGetRequest(tableConfigUri, 
requestHeaders));
+  String tableConfigJson = tableConfigResponse.getResponse();
+  LOGGER.info("[copyTable] Fetched table config for table: {}, 
tableConfig: {}", tableName, tableConfigJson);
+  JsonNode tableConfigNode = JsonUtils.stringToJsonNode(tableConfigJson);
+
+  if (tableConfigNode.has(TableType.REALTIME.name())) {

Review Comment:
   Yeah, for upsert and pauseless table, we should throw the exception.
   
   For offline table or offline part of the hybrid table, we can log as a 
warning? The hunch is that, the use can write their own internal workflow to 
backfill the offline table. For hybrid table, the user need to copy realtime 
part.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]

2025-12-15 Thread via GitHub


deemoliu commented on code in PR #17235:
URL: https://github.com/apache/pinot/pull/17235#discussion_r2621398775


##
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##
@@ -4737,6 +4761,37 @@ public QueryWorkloadManager getQueryWorkloadManager() {
 return _queryWorkloadManager;
   }
 
+  public WatermarkInductionResult inductConsumingWatermarks(String tableName) 
throws TableNotFoundException {
+String tableNameWithType = 
TableNameBuilder.REALTIME.tableNameWithType(tableName);
+if (!hasRealtimeTable(tableName)) {
+  throw new TableNotFoundException("Table " + tableNameWithType + " does 
not exist");
+}
+TableConfig tableConfig = getTableConfig(tableNameWithType);
+Preconditions.checkNotNull(tableConfig, "Table " + tableNameWithType + 
"exists but null tableConfig");
+List streamConfigs = 
IngestionConfigUtils.getStreamConfigs(tableConfig);
+IdealState idealState = _helixAdmin
+.getResourceIdealState(getHelixClusterName(), tableNameWithType);
+List lst = _pinotLLCRealtimeSegmentManager
+.getPartitionGroupConsumptionStatusList(idealState, streamConfigs);
+List watermarks = 
lst.stream().map(status -> {
+  long seq = status.getSequenceNumber();
+  long startOffset;
+  try {
+if (status.getStatus().equalsIgnoreCase("done")) {

Review Comment:
   "DONE".equalsIgnoreCase(status.getStatus()) for null handling?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]

2025-12-15 Thread via GitHub


deemoliu commented on code in PR #17235:
URL: https://github.com/apache/pinot/pull/17235#discussion_r2621397588


##
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##
@@ -4737,6 +4761,37 @@ public QueryWorkloadManager getQueryWorkloadManager() {
 return _queryWorkloadManager;
   }
 
+  public WatermarkInductionResult inductConsumingWatermarks(String tableName) 
throws TableNotFoundException {
+String tableNameWithType = 
TableNameBuilder.REALTIME.tableNameWithType(tableName);
+if (!hasRealtimeTable(tableName)) {
+  throw new TableNotFoundException("Table " + tableNameWithType + " does 
not exist");
+}
+TableConfig tableConfig = getTableConfig(tableNameWithType);
+Preconditions.checkNotNull(tableConfig, "Table " + tableNameWithType + 
"exists but null tableConfig");
+List streamConfigs = 
IngestionConfigUtils.getStreamConfigs(tableConfig);
+IdealState idealState = _helixAdmin
+.getResourceIdealState(getHelixClusterName(), tableNameWithType);

Review Comment:
   Can idealState be null?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]

2025-12-15 Thread via GitHub


deemoliu commented on code in PR #17235:
URL: https://github.com/apache/pinot/pull/17235#discussion_r2621395272


##
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##
@@ -283,6 +289,109 @@ public ConfigSuccessResponse addTable(String 
tableConfigStr,
 }
   }
 
+  @POST
+  @Path("/tables/{tableName}/copy")
+  @Authorize(targetType = TargetType.TABLE, action = 
Actions.Table.CREATE_TABLE)
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Copy a table's schema and config from another 
cluster", notes = "Non upsert table only")
+  public CopyTableResponse copyTable(

Review Comment:
   for next PR, we might need to add a mode param and introduce a dryrun mode 
for feature safety.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]

2025-12-15 Thread via GitHub


deemoliu commented on code in PR #17235:
URL: https://github.com/apache/pinot/pull/17235#discussion_r2621393154


##
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##
@@ -283,6 +289,109 @@ public ConfigSuccessResponse addTable(String 
tableConfigStr,
 }
   }
 
+  @POST
+  @Path("/tables/{tableName}/copy")
+  @Authorize(targetType = TargetType.TABLE, action = 
Actions.Table.CREATE_TABLE)
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Copy a table's schema and config from another 
cluster", notes = "Non upsert table only")
+  public CopyTableResponse copyTable(
+  @ApiParam(value = "Name of the table", required = true) 
@PathParam("tableName") String tableName, String payload,
+  @Context HttpHeaders headers) {
+try {
+  LOGGER.info("[copyTable] received request for table: {}, payload: {}", 
tableName, payload);
+  tableName = DatabaseUtils.translateTableName(tableName, headers);
+  CopyTablePayload copyTablePayload = JsonUtils.stringToObject(payload, 
CopyTablePayload.class);
+  String sourceControllerUri = copyTablePayload.getSourceClusterUri();
+  Map requestHeaders = copyTablePayload.getHeaders();
+  String brokerTenant = copyTablePayload.getBrokerTenant();
+  String serverTenant = copyTablePayload.getServerTenant();
+  Map tagReplacementMap = 
copyTablePayload.getTagPoolReplacementMap();
+
+  LOGGER.info("[copyTable] Start copying table: {} from source: {}", 
tableName, sourceControllerUri);
+
+  // Fetch and add schema
+  URI schemaUri = new URI(sourceControllerUri + "/tables/" + tableName + 
"/schema");
+  SimpleHttpResponse schemaResponse = HttpClient.wrapAndThrowHttpException(
+  HttpClient.getInstance().sendGetRequest(schemaUri, requestHeaders));
+  String schemaJson = schemaResponse.getResponse();
+  Schema schema = Schema.fromString(schemaJson);
+  _pinotHelixResourceManager.addSchema(schema, true, false);
+  LOGGER.info("[copyTable] Successfully added schema for table: {}, 
schema: {}", tableName, schema);
+
+  // Fetch and add table configs
+  URI tableConfigUri = new URI(sourceControllerUri + "/tables/" + 
tableName);
+  SimpleHttpResponse tableConfigResponse = 
HttpClient.wrapAndThrowHttpException(
+  HttpClient.getInstance().sendGetRequest(tableConfigUri, 
requestHeaders));
+  String tableConfigJson = tableConfigResponse.getResponse();
+  LOGGER.info("[copyTable] Fetched table config for table: {}, 
tableConfig: {}", tableName, tableConfigJson);
+  JsonNode tableConfigNode = JsonUtils.stringToJsonNode(tableConfigJson);
+
+  if (tableConfigNode.has(TableType.REALTIME.name())) {
+ObjectNode realtimeTableConfigNode = (ObjectNode) 
tableConfigNode.get(TableType.REALTIME.name());
+tweakRealtimeTableConfig(realtimeTableConfigNode, brokerTenant, 
serverTenant, tagReplacementMap);
+TableConfig realtimeTableConfig = 
JsonUtils.jsonNodeToObject(realtimeTableConfigNode, TableConfig.class);
+LOGGER.info("[copyTable] Successfully fetched and tweaked table config 
for table: {}, tableConfig: {}",
+tableName, realtimeTableConfig.toString());
+
+URI watermarkUri = new URI(sourceControllerUri + "/tables/" + 
tableName + "/consumerWatermarks");
+SimpleHttpResponse watermarkResponse = 
HttpClient.wrapAndThrowHttpException(
+HttpClient.getInstance().sendGetRequest(watermarkUri, 
requestHeaders));
+String watermarkJson = watermarkResponse.getResponse();
+LOGGER.info("[copyTable] Fetched watermarks for table: {}. Result: 
{}", tableName, watermarkJson);
+WatermarkInductionResult watermarkInductionResult =
+JsonUtils.stringToObject(watermarkJson, 
WatermarkInductionResult.class);
+
+List partitionGroupInfos = 
watermarkInductionResult.getWatermarks().stream()
+.map(PartitionGroupInfo::from)
+.collect(Collectors.toList());
+
+_pinotHelixResourceManager.addTable(realtimeTableConfig, 
partitionGroupInfos);
+  }
+  LOGGER.info("[copyTable] Finished Table Config copy: {}", tableName);
+  return new CopyTableResponse("success");
+} catch (Exception e) {
+  LOGGER.error("[copyTable] Error copying table: {}", tableName, e);
+  throw new ControllerApplicationException(LOGGER, "Error copying table: " 
+ e.getMessage(),
+  Response.Status.INTERNAL_SERVER_ERROR, e);
+}
+  }
+
+  /**
+   * Tweaks the realtime table config with the given broker and server tenants.
+   *
+   * @param realtimeTableConfigNode The JSON object representing the realtime 
table config.
+   * @param brokerTenant The broker tenant to set in the config.
+   * @param serverTenant The server tenant to set in the config.
+   */
+  @VisibleForTesting
+  static void tweakRealtimeTableConfig(ObjectNode realtimeTableConfigNode, 
String brokerTenant, String serverTe

Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]

2025-12-15 Thread via GitHub


deemoliu commented on code in PR #17235:
URL: https://github.com/apache/pinot/pull/17235#discussion_r2621392206


##
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##
@@ -283,6 +289,109 @@ public ConfigSuccessResponse addTable(String 
tableConfigStr,
 }
   }
 
+  @POST
+  @Path("/tables/{tableName}/copy")
+  @Authorize(targetType = TargetType.TABLE, action = 
Actions.Table.CREATE_TABLE)
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Copy a table's schema and config from another 
cluster", notes = "Non upsert table only")
+  public CopyTableResponse copyTable(
+  @ApiParam(value = "Name of the table", required = true) 
@PathParam("tableName") String tableName, String payload,
+  @Context HttpHeaders headers) {
+try {
+  LOGGER.info("[copyTable] received request for table: {}, payload: {}", 
tableName, payload);
+  tableName = DatabaseUtils.translateTableName(tableName, headers);
+  CopyTablePayload copyTablePayload = JsonUtils.stringToObject(payload, 
CopyTablePayload.class);
+  String sourceControllerUri = copyTablePayload.getSourceClusterUri();
+  Map requestHeaders = copyTablePayload.getHeaders();
+  String brokerTenant = copyTablePayload.getBrokerTenant();
+  String serverTenant = copyTablePayload.getServerTenant();
+  Map tagReplacementMap = 
copyTablePayload.getTagPoolReplacementMap();
+
+  LOGGER.info("[copyTable] Start copying table: {} from source: {}", 
tableName, sourceControllerUri);
+
+  // Fetch and add schema
+  URI schemaUri = new URI(sourceControllerUri + "/tables/" + tableName + 
"/schema");
+  SimpleHttpResponse schemaResponse = HttpClient.wrapAndThrowHttpException(
+  HttpClient.getInstance().sendGetRequest(schemaUri, requestHeaders));
+  String schemaJson = schemaResponse.getResponse();
+  Schema schema = Schema.fromString(schemaJson);
+  _pinotHelixResourceManager.addSchema(schema, true, false);
+  LOGGER.info("[copyTable] Successfully added schema for table: {}, 
schema: {}", tableName, schema);
+
+  // Fetch and add table configs
+  URI tableConfigUri = new URI(sourceControllerUri + "/tables/" + 
tableName);
+  SimpleHttpResponse tableConfigResponse = 
HttpClient.wrapAndThrowHttpException(
+  HttpClient.getInstance().sendGetRequest(tableConfigUri, 
requestHeaders));
+  String tableConfigJson = tableConfigResponse.getResponse();
+  LOGGER.info("[copyTable] Fetched table config for table: {}, 
tableConfig: {}", tableName, tableConfigJson);
+  JsonNode tableConfigNode = JsonUtils.stringToJsonNode(tableConfigJson);
+
+  if (tableConfigNode.has(TableType.REALTIME.name())) {

Review Comment:
   same for Upsert.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]

2025-12-15 Thread via GitHub


deemoliu commented on code in PR #17235:
URL: https://github.com/apache/pinot/pull/17235#discussion_r2621391126


##
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##
@@ -283,6 +289,109 @@ public ConfigSuccessResponse addTable(String 
tableConfigStr,
 }
   }
 
+  @POST
+  @Path("/tables/{tableName}/copy")
+  @Authorize(targetType = TargetType.TABLE, action = 
Actions.Table.CREATE_TABLE)
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Copy a table's schema and config from another 
cluster", notes = "Non upsert table only")
+  public CopyTableResponse copyTable(
+  @ApiParam(value = "Name of the table", required = true) 
@PathParam("tableName") String tableName, String payload,
+  @Context HttpHeaders headers) {
+try {
+  LOGGER.info("[copyTable] received request for table: {}, payload: {}", 
tableName, payload);
+  tableName = DatabaseUtils.translateTableName(tableName, headers);
+  CopyTablePayload copyTablePayload = JsonUtils.stringToObject(payload, 
CopyTablePayload.class);
+  String sourceControllerUri = copyTablePayload.getSourceClusterUri();
+  Map requestHeaders = copyTablePayload.getHeaders();
+  String brokerTenant = copyTablePayload.getBrokerTenant();
+  String serverTenant = copyTablePayload.getServerTenant();
+  Map tagReplacementMap = 
copyTablePayload.getTagPoolReplacementMap();
+
+  LOGGER.info("[copyTable] Start copying table: {} from source: {}", 
tableName, sourceControllerUri);
+
+  // Fetch and add schema
+  URI schemaUri = new URI(sourceControllerUri + "/tables/" + tableName + 
"/schema");
+  SimpleHttpResponse schemaResponse = HttpClient.wrapAndThrowHttpException(
+  HttpClient.getInstance().sendGetRequest(schemaUri, requestHeaders));
+  String schemaJson = schemaResponse.getResponse();
+  Schema schema = Schema.fromString(schemaJson);
+  _pinotHelixResourceManager.addSchema(schema, true, false);
+  LOGGER.info("[copyTable] Successfully added schema for table: {}, 
schema: {}", tableName, schema);
+
+  // Fetch and add table configs
+  URI tableConfigUri = new URI(sourceControllerUri + "/tables/" + 
tableName);
+  SimpleHttpResponse tableConfigResponse = 
HttpClient.wrapAndThrowHttpException(
+  HttpClient.getInstance().sendGetRequest(tableConfigUri, 
requestHeaders));
+  String tableConfigJson = tableConfigResponse.getResponse();
+  LOGGER.info("[copyTable] Fetched table config for table: {}, 
tableConfig: {}", tableName, tableConfigJson);
+  JsonNode tableConfigNode = JsonUtils.stringToJsonNode(tableConfigJson);
+
+  if (tableConfigNode.has(TableType.REALTIME.name())) {

Review Comment:
   it seems we only support realtime, do we have any validation to avoid this 
using on hybrid and offline?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]

2025-12-15 Thread via GitHub


deemoliu commented on code in PR #17235:
URL: https://github.com/apache/pinot/pull/17235#discussion_r2621389583


##
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##
@@ -283,6 +289,109 @@ public ConfigSuccessResponse addTable(String 
tableConfigStr,
 }
   }
 
+  @POST
+  @Path("/tables/{tableName}/copy")
+  @Authorize(targetType = TargetType.TABLE, action = 
Actions.Table.CREATE_TABLE)
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Copy a table's schema and config from another 
cluster", notes = "Non upsert table only")
+  public CopyTableResponse copyTable(
+  @ApiParam(value = "Name of the table", required = true) 
@PathParam("tableName") String tableName, String payload,
+  @Context HttpHeaders headers) {
+try {
+  LOGGER.info("[copyTable] received request for table: {}, payload: {}", 
tableName, payload);
+  tableName = DatabaseUtils.translateTableName(tableName, headers);
+  CopyTablePayload copyTablePayload = JsonUtils.stringToObject(payload, 
CopyTablePayload.class);
+  String sourceControllerUri = copyTablePayload.getSourceClusterUri();
+  Map requestHeaders = copyTablePayload.getHeaders();
+  String brokerTenant = copyTablePayload.getBrokerTenant();
+  String serverTenant = copyTablePayload.getServerTenant();
+  Map tagReplacementMap = 
copyTablePayload.getTagPoolReplacementMap();
+
+  LOGGER.info("[copyTable] Start copying table: {} from source: {}", 
tableName, sourceControllerUri);
+
+  // Fetch and add schema
+  URI schemaUri = new URI(sourceControllerUri + "/tables/" + tableName + 
"/schema");
+  SimpleHttpResponse schemaResponse = HttpClient.wrapAndThrowHttpException(
+  HttpClient.getInstance().sendGetRequest(schemaUri, requestHeaders));
+  String schemaJson = schemaResponse.getResponse();
+  Schema schema = Schema.fromString(schemaJson);
+  _pinotHelixResourceManager.addSchema(schema, true, false);
+  LOGGER.info("[copyTable] Successfully added schema for table: {}, 
schema: {}", tableName, schema);
+
+  // Fetch and add table configs
+  URI tableConfigUri = new URI(sourceControllerUri + "/tables/" + 
tableName);
+  SimpleHttpResponse tableConfigResponse = 
HttpClient.wrapAndThrowHttpException(
+  HttpClient.getInstance().sendGetRequest(tableConfigUri, 
requestHeaders));
+  String tableConfigJson = tableConfigResponse.getResponse();
+  LOGGER.info("[copyTable] Fetched table config for table: {}, 
tableConfig: {}", tableName, tableConfigJson);
+  JsonNode tableConfigNode = JsonUtils.stringToJsonNode(tableConfigJson);
+
+  if (tableConfigNode.has(TableType.REALTIME.name())) {
+ObjectNode realtimeTableConfigNode = (ObjectNode) 
tableConfigNode.get(TableType.REALTIME.name());
+tweakRealtimeTableConfig(realtimeTableConfigNode, brokerTenant, 
serverTenant, tagReplacementMap);
+TableConfig realtimeTableConfig = 
JsonUtils.jsonNodeToObject(realtimeTableConfigNode, TableConfig.class);
+LOGGER.info("[copyTable] Successfully fetched and tweaked table config 
for table: {}, tableConfig: {}",
+tableName, realtimeTableConfig.toString());
+
+URI watermarkUri = new URI(sourceControllerUri + "/tables/" + 
tableName + "/consumerWatermarks");
+SimpleHttpResponse watermarkResponse = 
HttpClient.wrapAndThrowHttpException(
+HttpClient.getInstance().sendGetRequest(watermarkUri, 
requestHeaders));
+String watermarkJson = watermarkResponse.getResponse();
+LOGGER.info("[copyTable] Fetched watermarks for table: {}. Result: 
{}", tableName, watermarkJson);
+WatermarkInductionResult watermarkInductionResult =
+JsonUtils.stringToObject(watermarkJson, 
WatermarkInductionResult.class);
+
+List partitionGroupInfos = 
watermarkInductionResult.getWatermarks().stream()
+.map(PartitionGroupInfo::from)
+.collect(Collectors.toList());
+
+_pinotHelixResourceManager.addTable(realtimeTableConfig, 
partitionGroupInfos);
+  }
+  LOGGER.info("[copyTable] Finished Table Config copy: {}", tableName);
+  return new CopyTableResponse("success");
+} catch (Exception e) {
+  LOGGER.error("[copyTable] Error copying table: {}", tableName, e);
+  throw new ControllerApplicationException(LOGGER, "Error copying table: " 
+ e.getMessage(),
+  Response.Status.INTERNAL_SERVER_ERROR, e);
+}
+  }
+
+  /**
+   * Tweaks the realtime table config with the given broker and server tenants.
+   *
+   * @param realtimeTableConfigNode The JSON object representing the realtime 
table config.
+   * @param brokerTenant The broker tenant to set in the config.
+   * @param serverTenant The server tenant to set in the config.
+   */
+  @VisibleForTesting
+  static void tweakRealtimeTableConfig(ObjectNode realtimeTableConfigNode, 
String brokerTenant, String serverTe

Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]

2025-12-15 Thread via GitHub


deemoliu commented on code in PR #17235:
URL: https://github.com/apache/pinot/pull/17235#discussion_r2621386460


##
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##
@@ -283,6 +289,109 @@ public ConfigSuccessResponse addTable(String 
tableConfigStr,
 }
   }
 
+  @POST
+  @Path("/tables/{tableName}/copy")
+  @Authorize(targetType = TargetType.TABLE, action = 
Actions.Table.CREATE_TABLE)
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Copy a table's schema and config from another 
cluster", notes = "Non upsert table only")
+  public CopyTableResponse copyTable(
+  @ApiParam(value = "Name of the table", required = true) 
@PathParam("tableName") String tableName, String payload,
+  @Context HttpHeaders headers) {
+try {
+  LOGGER.info("[copyTable] received request for table: {}, payload: {}", 
tableName, payload);
+  tableName = DatabaseUtils.translateTableName(tableName, headers);
+  CopyTablePayload copyTablePayload = JsonUtils.stringToObject(payload, 
CopyTablePayload.class);
+  String sourceControllerUri = copyTablePayload.getSourceClusterUri();
+  Map requestHeaders = copyTablePayload.getHeaders();
+  String brokerTenant = copyTablePayload.getBrokerTenant();
+  String serverTenant = copyTablePayload.getServerTenant();
+  Map tagReplacementMap = 
copyTablePayload.getTagPoolReplacementMap();
+
+  LOGGER.info("[copyTable] Start copying table: {} from source: {}", 
tableName, sourceControllerUri);
+
+  // Fetch and add schema
+  URI schemaUri = new URI(sourceControllerUri + "/tables/" + tableName + 
"/schema");
+  SimpleHttpResponse schemaResponse = HttpClient.wrapAndThrowHttpException(
+  HttpClient.getInstance().sendGetRequest(schemaUri, requestHeaders));
+  String schemaJson = schemaResponse.getResponse();
+  Schema schema = Schema.fromString(schemaJson);
+  _pinotHelixResourceManager.addSchema(schema, true, false);

Review Comment:
   if the schema already exists, what will happen?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]

2025-12-15 Thread via GitHub


deemoliu commented on code in PR #17235:
URL: https://github.com/apache/pinot/pull/17235#discussion_r2621380681


##
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java:
##
@@ -449,6 +451,25 @@ public String getPauselessTableDebugInfo(
 }
   }
 
+
+  @GET
+  @Produces(MediaType.APPLICATION_JSON)
+  @Path("/tables/{tableName}/consumerWatermarks")
+  @Authorize(targetType = TargetType.TABLE, paramName = "tableName", action = 
Actions.Table.GET_IDEAL_STATE)
+  @ApiOperation(value = "Get table ideal state", notes = "Get table ideal 
state")
+  public WatermarkInductionResult inductConsumingWatermark(
+  @ApiParam(value = "Name of the realtime table", required = true) 
@PathParam("tableName") String tableName,
+  @Context HttpHeaders headers) {
+try {
+  String table = DatabaseUtils.translateTableName(tableName, headers);

Review Comment:
   Table copy won't change table name, is that correct?
   That said, we cannot copy table in the same cluster (share controller) from 
one tenant to another, is that correct?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]

2025-12-15 Thread via GitHub


deemoliu commented on code in PR #17235:
URL: https://github.com/apache/pinot/pull/17235#discussion_r2621185919


##
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/CopyTablePayload.java:
##
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api.resources;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonGetter;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.util.Map;
+import javax.annotation.Nullable;
+
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class CopyTablePayload {
+
+  private String _sourceClusterUri;
+  private Map _headers;
+  /**
+   * Broker tenant for the new table.
+   * MUST NOT contain the tenant type suffix, i.e. _BROKER.
+   */
+  private String _brokerTenant;
+  /**
+   * Server tenant for the new table.
+   * MUST NOT contain the tenant type suffix, i.e. _REALTIME or _OFFLINE.
+   */
+  private String _serverTenant;
+
+  private Map _tagPoolReplacementMap;

Review Comment:
   what is the usage for this param?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]

2025-12-12 Thread via GitHub


wirybeaver commented on PR #17235:
URL: https://github.com/apache/pinot/pull/17235#issuecomment-3648742146

   @Jackie-Jiang @chenboat Could you take a review? The manual integration test 
gets done already. The purpose of this PR is to copy the schema and table 
config from the source cluster to the target cluster by replacing the server / 
broke tenant. And then copy the consuming segment to kick off the message 
consumption on the new table. The backfill of segments prior to consuming 
segments will be implemented in a follow up PR


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]

2025-12-12 Thread via GitHub


wirybeaver commented on PR #17235:
URL: https://github.com/apache/pinot/pull/17235#issuecomment-3648733875

   This Integration Test was not stable. 
   
   
TableRebalancePauselessIntegrationTest.testForceCommit:143->BaseClusterIntegrationTestSet.waitForRebalanceToComplete:850
 Failed to meet condition in 60ms, error message: Failed to complete 
rebalance


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]

2025-12-10 Thread via GitHub


deemoliu commented on code in PR #17235:
URL: https://github.com/apache/pinot/pull/17235#discussion_r2608338598


##
pinot-controller/src/test/resources/table/table_config_with_instance_assignment.json:
##
@@ -0,0 +1,15 @@
+{
+  "tenants": {
+"broker": "broker1",
+"server": "rtaShared2"

Review Comment:
   nit: rtaShared2 --> server1



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]

2025-12-01 Thread via GitHub


wirybeaver commented on code in PR #17235:
URL: https://github.com/apache/pinot/pull/17235#discussion_r2579486964


##
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java:
##
@@ -433,6 +435,25 @@ public String getPauselessTableDebugInfo(
 }
   }
 
+
+  @GET
+  @Produces(MediaType.APPLICATION_JSON)
+  @Path("/tables/{tableName}/watermarks")

Review Comment:
   /consumerWatermarks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]

2025-12-01 Thread via GitHub


wirybeaver commented on code in PR #17235:
URL: https://github.com/apache/pinot/pull/17235#discussion_r2579487604


##
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/CopyTablePayload.java:
##
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api.resources;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonGetter;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.util.Map;
+import javax.annotation.Nullable;
+
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class CopyTablePayload {
+
+  String _sourceClusterUri;
+  Map _headers;
+  /**
+   * Broker tenant for the new table.
+   * MUST NOT contain the tenant type suffix, i.e. _BROKER.
+   */
+  String _brokerTenant;
+  /**
+   * Server tenant for the new table.
+   * MUST NOT contain the tenant type suffix, i.e. _REALTIME or _OFFLINE.
+   */
+  String _serverTenant;
+
+  Map _tagPoolReplacementMap;
+
+  @JsonCreator
+  public CopyTablePayload(@JsonProperty(value = "tableName", required = true) 
String tableName,
+  @JsonProperty(value = "sourceClusterUri", required = true) String 
sourceClusterUri,
+  @JsonProperty("sourceClusterHeaders") Map headers,
+  @JsonProperty(value = "brokerTenant", required = true) String 
brokerTenant,
+  @JsonProperty(value = "serverTenant", required = true) String 
serverTenant,
+  @JsonProperty("tagPoolReplacementMap") @Nullable Map 
tagPoolReplacementMap) {
+_sourceClusterUri = sourceClusterUri;
+_headers = headers;

Review Comment:
   yep



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]

2025-11-20 Thread via GitHub


deemoliu commented on code in PR #17235:
URL: https://github.com/apache/pinot/pull/17235#discussion_r2548138260


##
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java:
##
@@ -433,6 +435,25 @@ public String getPauselessTableDebugInfo(
 }
   }
 
+
+  @GET
+  @Produces(MediaType.APPLICATION_JSON)
+  @Path("/tables/{tableName}/watermarks")

Review Comment:
   the endpoints name is not intuitive. shall we rename it to 
"@Path("/tables/{tableName}/consumptionWatermark"
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]

2025-11-20 Thread via GitHub


deemoliu commented on code in PR #17235:
URL: https://github.com/apache/pinot/pull/17235#discussion_r2548132800


##
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/CopyTablePayload.java:
##
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api.resources;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonGetter;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.util.Map;
+import javax.annotation.Nullable;
+
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class CopyTablePayload {
+
+  String _sourceClusterUri;

Review Comment:
   private?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]

2025-11-20 Thread via GitHub


deemoliu commented on code in PR #17235:
URL: https://github.com/apache/pinot/pull/17235#discussion_r2548127822


##
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/CopyTablePayload.java:
##
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api.resources;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonGetter;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.util.Map;
+import javax.annotation.Nullable;
+
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class CopyTablePayload {
+
+  String _sourceClusterUri;
+  Map _headers;
+  /**
+   * Broker tenant for the new table.
+   * MUST NOT contain the tenant type suffix, i.e. _BROKER.
+   */
+  String _brokerTenant;
+  /**
+   * Server tenant for the new table.
+   * MUST NOT contain the tenant type suffix, i.e. _REALTIME or _OFFLINE.
+   */
+  String _serverTenant;
+
+  Map _tagPoolReplacementMap;
+
+  @JsonCreator
+  public CopyTablePayload(@JsonProperty(value = "tableName", required = true) 
String tableName,
+  @JsonProperty(value = "sourceClusterUri", required = true) String 
sourceClusterUri,
+  @JsonProperty("sourceClusterHeaders") Map headers,
+  @JsonProperty(value = "brokerTenant", required = true) String 
brokerTenant,
+  @JsonProperty(value = "serverTenant", required = true) String 
serverTenant,
+  @JsonProperty("tagPoolReplacementMap") @Nullable Map 
tagPoolReplacementMap) {
+_sourceClusterUri = sourceClusterUri;
+_headers = headers;

Review Comment:
   tableName is unused?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]

2025-11-19 Thread via GitHub


wirybeaver commented on PR #17235:
URL: https://github.com/apache/pinot/pull/17235#issuecomment-3554733903

   Unit Test Failure: seems that the error is not caused by my PR. Checking.
   PinotHelixResourceManagerStatelessTest.testGetLiveBrokers:383 ยป 
InvalidTableConfig No schema defined for table: testTable_OFFLINE


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [Real-time Table Replication X clusters][1/n] Creating new tables with designated consuming segments [pinot]

2025-11-19 Thread via GitHub


codecov-commenter commented on PR #17235:
URL: https://github.com/apache/pinot/pull/17235#issuecomment-3551422628

   ### :x: 3 Tests Failed:
   | Tests completed | Failed | Passed | Skipped |
   |---|---|---|---|
   | 7180 | 3 | 7177 | 0 |
   View the top 3 failed test(s) by shortest run 
time
   
   > org.apache.pinot.controller.helix.core.PinotHelixResourceManagerStatelessTest::testGetLiveBrokers
   > Stack Traces | 0.006s run time
   > 
   > > No schema defined for table: 
testTable_OFFLINE
   > 
   > 
   
   
   > org.apache.pinot.controller.helix.core.PinotHelixResourceManagerStatelessTest::testGetLiveBrokers
   > Stack Traces | 0.006s run time
   > 
   > > No schema defined for table: 
testTable_OFFLINE
   > 
   > 
   
   
   > org.apache.pinot.controller.helix.core.PinotHelixResourceManagerStatelessTest::testGetLiveBrokers
   > Stack Traces | 0.009s run time
   > 
   > > No schema defined for table: 
testTable_OFFLINE
   > 
   > 
   
   
   
   To view more test analytics, go to the [Test Analytics 
Dashboard](https://app.codecov.io/gh/apache/pinot/tests/wirybeaver%3Aoss%2FtableCopyConsuming)
   ๐Ÿ“‹ Got 3 mins? [Take this short 
survey](https://forms.gle/22i53Qa1CySZjA6c7) to help us improve Test 
Analytics.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]