JackieTien97 commented on code in PR #16764:
URL: https://github.com/apache/iotdb/pull/16764#discussion_r2576131277
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/MqttClientSession.java:
##########
@@ -22,12 +22,17 @@
import org.apache.iotdb.service.rpc.thrift.TSConnectionType;
import java.util.Collections;
+import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
public class MqttClientSession extends IClientSession {
private final String clientID;
+ // Map from statement name to PreparedStatementInfo
+ private final Map<String, PreparedStatementInfo> preparedStatements = new
ConcurrentHashMap<>();
+
Review Comment:
```suggestion
```
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/MqttClientSession.java:
##########
@@ -76,4 +81,24 @@ public void addQueryId(Long statementId, long queryId) {
public void removeQueryId(Long statementId, Long queryId) {
throw new UnsupportedOperationException();
}
+
+ @Override
+ public void addPreparedStatement(String statementName, PreparedStatementInfo
info) {
+ preparedStatements.put(statementName, info);
+ }
+
+ @Override
+ public PreparedStatementInfo removePreparedStatement(String statementName) {
+ return preparedStatements.remove(statementName);
+ }
+
+ @Override
+ public PreparedStatementInfo getPreparedStatement(String statementName) {
+ return preparedStatements.get(statementName);
+ }
+
+ @Override
+ public Set<String> getPreparedStatementNames() {
+ return preparedStatements.keySet();
+ }
Review Comment:
throw execption here, mqtt is for write not for query.
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/handler/BaseServerContextHandler.java:
##########
@@ -70,7 +72,22 @@ public ServerContext createContext(TProtocol in, TProtocol
out) {
}
public void deleteContext(ServerContext context, TProtocol in, TProtocol
out) {
+ IClientSession session = getSessionManager().getCurrSession();
+
+ // Release session resources (including PreparedStatement memory)
+ // This handles TCP connection loss scenarios
+ if (session != null) {
+ try {
+ getSessionManager().closeSession(session,
Coordinator.getInstance()::cleanupQueryExecution);
+ } catch (Exception e) {
+ logger.warn(
+ "Failed to close session during TCP connection disconnect: {}",
e.getMessage(), e);
+ }
+ }
+
+ // Remove the session from the current thread
getSessionManager().removeCurrSession();
+
Review Comment:
why did you do this change?
ClientRPCServiceImpl.handleClientExit has already called closeSession(req).
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/InternalClientSession.java:
##########
@@ -88,4 +91,24 @@ public void addQueryId(Long statementId, long queryId) {
public void removeQueryId(Long statementId, Long queryId) {
ClientSession.removeQueryId(statementIdToQueryId, statementId, queryId);
}
+
+ @Override
+ public void addPreparedStatement(String statementName, PreparedStatementInfo
info) {
+ preparedStatements.put(statementName, info);
+ }
+
+ @Override
+ public PreparedStatementInfo removePreparedStatement(String statementName) {
+ return preparedStatements.remove(statementName);
+ }
+
+ @Override
+ public PreparedStatementInfo getPreparedStatement(String statementName) {
+ return preparedStatements.get(statementName);
+ }
+
+ @Override
+ public Set<String> getPreparedStatementNames() {
+ return preparedStatements.keySet();
+ }
Review Comment:
throw exception here, InternalClientSession should never call these methods,
they're used for internal query and write which will never use prepare and
execute
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]