This is an automated email from the ASF dual-hosted git repository. wuweijie pushed a commit to branch opengauss_adapt in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
commit f50959a83001d6e2b8bda3df1b4b3924d4f7018c Author: 吴伟杰 <[email protected]> AuthorDate: Thu Jun 17 23:53:46 2021 +0800 Implements openGauss batch bind protocol (#10850) * Implements openGauss BatchBind * Add javadoc --- .../command/PostgreSQLCommandPacketFactory.java | 3 + .../command/PostgreSQLCommandPacketType.java | 2 + .../binary/bind/OpenGaussComBatchBindPacket.java | 179 +++++++++++++++++++++ .../command/PostgreSQLCommandExecutorFactory.java | 4 + .../binary/bind/OpenGaussComBatchBindExecutor.java | 171 ++++++++++++++++++++ 5 files changed, 359 insertions(+) diff --git a/shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/main/java/org/apache/shardingsphere/db/protocol/postgresql/packet/command/PostgreSQLCommandPacketFactory.java b/shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/main/java/org/apache/shardingsphere/db/protocol/postgresql/packet/command/PostgreSQLCommandPacketFactory.java index 070bc37..9832864 100644 --- a/shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/main/java/org/apache/shardingsphere/db/protocol/postgresql/packet/command/PostgreSQLCommandPacketFactory.java +++ b/shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/main/java/org/apache/shardingsphere/db/protocol/postgresql/packet/command/PostgreSQLCommandPacketFactory.java @@ -20,6 +20,7 @@ package org.apache.shardingsphere.db.protocol.postgresql.packet.command; import lombok.AccessLevel; import lombok.NoArgsConstructor; import org.apache.shardingsphere.db.protocol.postgresql.packet.command.admin.PostgreSQLUnsupportedCommandPacket; +import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.binary.bind.OpenGaussComBatchBindPacket; import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.binary.bind.PostgreSQLComBindPacket; import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.binary.close.PostgreSQLComClosePacket; import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.binary.describe.PostgreSQLComDescribePacket; @@ -52,6 +53,8 @@ public final class PostgreSQLCommandPacketFactory { return new PostgreSQLComParsePacket(payload); case BIND_COMMAND: return new PostgreSQLComBindPacket(payload, connectionId); + case BATCH_BIND_COMMAND: + return new OpenGaussComBatchBindPacket(payload, connectionId); case DESCRIBE_COMMAND: return new PostgreSQLComDescribePacket(payload); case EXECUTE_COMMAND: diff --git a/shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/main/java/org/apache/shardingsphere/db/protocol/postgresql/packet/command/PostgreSQLCommandPacketType.java b/shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/main/java/org/apache/shardingsphere/db/protocol/postgresql/packet/command/PostgreSQLCommandPacketType.java index 014ad2d..f59f684 100644 --- a/shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/main/java/org/apache/shardingsphere/db/protocol/postgresql/packet/command/PostgreSQLCommandPacketType.java +++ b/shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/main/java/org/apache/shardingsphere/db/protocol/postgresql/packet/command/PostgreSQLCommandPacketType.java @@ -37,6 +37,8 @@ public enum PostgreSQLCommandPacketType implements CommandPacketType, PostgreSQL BIND_COMMAND('B'), + BATCH_BIND_COMMAND('U'), + DESCRIBE_COMMAND('D'), EXECUTE_COMMAND('E'), diff --git a/shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/main/java/org/apache/shardingsphere/db/protocol/postgresql/packet/command/query/binary/bind/OpenGaussComBatchBindPacket.java b/shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/main/java/org/apache/shardingsphere/db/protocol/postgresql/packet/command/query/binary/bind/OpenGaussComBatchBindPacket.java new file mode 100644 index 0000000..8c290e9 --- /dev/null +++ b/shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/main/java/org/apache/shardingsphere/db/protocol/postgresql/packet/command/query/binary/bind/OpenGaussComBatchBindPacket.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.binary.bind; + +import com.google.common.collect.Lists; +import lombok.Getter; +import org.apache.shardingsphere.db.protocol.postgresql.constant.PostgreSQLBinaryColumnType; +import org.apache.shardingsphere.db.protocol.postgresql.constant.PostgreSQLColumnFormat; +import org.apache.shardingsphere.db.protocol.postgresql.packet.command.PostgreSQLCommandPacket; +import org.apache.shardingsphere.db.protocol.postgresql.packet.command.PostgreSQLCommandPacketType; +import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.binary.PostgreSQLBinaryStatement; +import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.binary.PostgreSQLBinaryStatementRegistry; +import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.binary.bind.protocol.PostgreSQLBinaryProtocolValue; +import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.binary.bind.protocol.PostgreSQLBinaryProtocolValueFactory; +import org.apache.shardingsphere.db.protocol.postgresql.packet.identifier.PostgreSQLIdentifierTag; +import org.apache.shardingsphere.db.protocol.postgresql.payload.PostgreSQLPacketPayload; + +import java.math.BigDecimal; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** + * Batch bind packet for openGauss. + */ +@Getter +public final class OpenGaussComBatchBindPacket extends PostgreSQLCommandPacket { + + private final String statementId; + + private final String sql; + + private final List<Integer> resultFormatCodes; + + private final List<List<Object>> parameters; + + public OpenGaussComBatchBindPacket(final PostgreSQLPacketPayload payload, final int connectionId) { + payload.readInt4(); + payload.readInt4(); + payload.readStringNul(); + statementId = payload.readStringNul(); + int parameterFormatCount = payload.readInt2(); + List<Integer> parameterFormats = new ArrayList<>(parameterFormatCount); + for (int i = 0; i < parameterFormatCount; i++) { + parameterFormats.add(payload.readInt2()); + } + int resultFormatsLength = payload.readInt2(); + resultFormatCodes = new ArrayList<>(resultFormatsLength); + for (int i = 0; i < resultFormatsLength; i++) { + resultFormatCodes.add(payload.readInt2()); + } + PostgreSQLBinaryStatement binaryStatement = PostgreSQLBinaryStatementRegistry.getInstance().get(connectionId).getBinaryStatement(statementId); + sql = null == binaryStatement ? null : binaryStatement.getSql(); + List<Object> allParameters = null == sql ? Collections.emptyList() : getParameters(payload, parameterFormats, binaryStatement.getColumnTypes()); + parameters = Lists.partition(allParameters, parameterFormatCount); + payload.readInt1(); + payload.readStringNul(); + payload.readInt4(); + } + + private List<Object> getParameters(final PostgreSQLPacketPayload payload, final List<Integer> parameterFormats, final List<PostgreSQLBinaryColumnType> columnTypes) { + int parameterCount = payload.readInt2(); + List<Object> result = new ArrayList<>(parameterCount); + for (int parameterIndex = 0; hasNextParameter(payload); parameterIndex++) { + int parameterValueLength = payload.readInt4(); + if (-1 == parameterValueLength) { + result.add(null); + continue; + } + int modedParameterIndex = parameterIndex % parameterCount; + Object parameterValue = isTextParameterValue(parameterFormats, modedParameterIndex) + ? getTextParameters(payload, parameterValueLength, columnTypes.get(modedParameterIndex)) : getBinaryParameters(payload, parameterValueLength, columnTypes.get(modedParameterIndex)); + result.add(parameterValue); + } + return result; + } + + private boolean hasNextParameter(final PostgreSQLPacketPayload payload) { + payload.getByteBuf().markReaderIndex(); + int c = payload.readInt1(); + payload.getByteBuf().resetReaderIndex(); + return 'E' != c; + } + + private boolean isTextParameterValue(final List<Integer> parameterFormats, final int parameterIndex) { + return parameterFormats.isEmpty() || 0 == parameterFormats.get(parameterIndex % parameterFormats.size()); + } + + private Object getTextParameters(final PostgreSQLPacketPayload payload, final int parameterValueLength, final PostgreSQLBinaryColumnType columnType) { + byte[] bytes = new byte[parameterValueLength]; + payload.getByteBuf().readBytes(bytes); + return getTextParameters(new String(bytes), columnType); + } + + private Object getTextParameters(final String textValue, final PostgreSQLBinaryColumnType columnType) { + switch (columnType) { + case POSTGRESQL_TYPE_UNSPECIFIED: + return new PostgreSQLTypeUnspecifiedSQLParameter(textValue); + case POSTGRESQL_TYPE_BOOL: + return Boolean.valueOf(textValue); + case POSTGRESQL_TYPE_INT2: + case POSTGRESQL_TYPE_INT4: + return Integer.parseInt(textValue); + case POSTGRESQL_TYPE_INT8: + return Long.parseLong(textValue); + case POSTGRESQL_TYPE_FLOAT4: + return Float.parseFloat(textValue); + case POSTGRESQL_TYPE_FLOAT8: + return Double.parseDouble(textValue); + case POSTGRESQL_TYPE_NUMERIC: + try { + return Integer.parseInt(textValue); + } catch (final NumberFormatException ignored) { + } + try { + return Long.parseLong(textValue); + } catch (final NumberFormatException ignored) { + } + return new BigDecimal(textValue); + case POSTGRESQL_TYPE_DATE: + return Date.valueOf(textValue); + case POSTGRESQL_TYPE_TIME: + return Time.valueOf(textValue); + case POSTGRESQL_TYPE_TIMESTAMP: + case POSTGRESQL_TYPE_TIMESTAMPTZ: + return Timestamp.valueOf(textValue); + default: + return textValue; + } + } + + private Object getBinaryParameters(final PostgreSQLPacketPayload payload, final int parameterValueLength, final PostgreSQLBinaryColumnType columnType) { + PostgreSQLBinaryProtocolValue binaryProtocolValue = PostgreSQLBinaryProtocolValueFactory.getBinaryProtocolValue(columnType); + return binaryProtocolValue.read(payload, parameterValueLength); + } + + /** + * Get result format by column index. + * + * @param columnIndex column index + * @return result format + */ + public PostgreSQLColumnFormat getResultFormatByColumnIndex(final int columnIndex) { + if (resultFormatCodes.isEmpty()) { + return PostgreSQLColumnFormat.TEXT; + } + if (1 == resultFormatCodes.size()) { + return PostgreSQLColumnFormat.valueOf(resultFormatCodes.get(0)); + } + return PostgreSQLColumnFormat.valueOf(resultFormatCodes.get(columnIndex)); + } + + @Override + public void write(final PostgreSQLPacketPayload payload) { + } + + @Override + public PostgreSQLIdentifierTag getIdentifier() { + return PostgreSQLCommandPacketType.BATCH_BIND_COMMAND; + } +} diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/PostgreSQLCommandExecutorFactory.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/PostgreSQLCommandExecutorFactory.java index a0e9e75..f5bb0c1 100644 --- a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/PostgreSQLCommandExecutorFactory.java +++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/PostgreSQLCommandExecutorFactory.java @@ -22,6 +22,7 @@ import lombok.NoArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.shardingsphere.db.protocol.postgresql.packet.command.PostgreSQLCommandPacket; import org.apache.shardingsphere.db.protocol.postgresql.packet.command.PostgreSQLCommandPacketType; +import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.binary.bind.OpenGaussComBatchBindPacket; import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.binary.bind.PostgreSQLComBindPacket; import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.binary.close.PostgreSQLComClosePacket; import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.binary.execute.PostgreSQLComExecutePacket; @@ -31,6 +32,7 @@ import org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.Bac import org.apache.shardingsphere.proxy.frontend.command.executor.CommandExecutor; import org.apache.shardingsphere.proxy.frontend.postgresql.command.generic.PostgreSQLComTerminationExecutor; import org.apache.shardingsphere.proxy.frontend.postgresql.command.generic.PostgreSQLUnsupportedCommandExecutor; +import org.apache.shardingsphere.proxy.frontend.postgresql.command.query.binary.bind.OpenGaussComBatchBindExecutor; import org.apache.shardingsphere.proxy.frontend.postgresql.command.query.binary.bind.PostgreSQLComBindExecutor; import org.apache.shardingsphere.proxy.frontend.postgresql.command.query.binary.close.PostgreSQLComCloseExecutor; import org.apache.shardingsphere.proxy.frontend.postgresql.command.query.binary.describe.PostgreSQLComDescribeExecutor; @@ -70,6 +72,8 @@ public final class PostgreSQLCommandExecutorFactory { case BIND_COMMAND: connectionContext.getPendingExecutors().add(new PostgreSQLComBindExecutor(connectionContext, (PostgreSQLComBindPacket) commandPacket, backendConnection)); break; + case BATCH_BIND_COMMAND: + return new OpenGaussComBatchBindExecutor(connectionContext, (OpenGaussComBatchBindPacket) commandPacket, backendConnection); case DESCRIBE_COMMAND: connectionContext.getPendingExecutors().add(new PostgreSQLComDescribeExecutor(connectionContext)); break; diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/binary/bind/OpenGaussComBatchBindExecutor.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/binary/bind/OpenGaussComBatchBindExecutor.java new file mode 100644 index 0000000..b85a712 --- /dev/null +++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/binary/bind/OpenGaussComBatchBindExecutor.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.proxy.frontend.postgresql.command.query.binary.bind; + +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import org.apache.shardingsphere.db.protocol.binary.BinaryCell; +import org.apache.shardingsphere.db.protocol.packet.DatabasePacket; +import org.apache.shardingsphere.db.protocol.postgresql.constant.PostgreSQLBinaryColumnType; +import org.apache.shardingsphere.db.protocol.postgresql.constant.PostgreSQLColumnFormat; +import org.apache.shardingsphere.db.protocol.postgresql.packet.PostgreSQLPacket; +import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.PostgreSQLColumnDescription; +import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.PostgreSQLRowDescriptionPacket; +import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.binary.bind.OpenGaussComBatchBindPacket; +import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.binary.bind.PostgreSQLBindCompletePacket; +import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.text.PostgreSQLDataRowPacket; +import org.apache.shardingsphere.db.protocol.postgresql.packet.generic.PostgreSQLCommandCompletePacket; +import org.apache.shardingsphere.infra.database.type.DatabaseTypeRegistry; +import org.apache.shardingsphere.infra.parser.ShardingSphereSQLParserEngine; +import org.apache.shardingsphere.proxy.backend.communication.DatabaseCommunicationEngine; +import org.apache.shardingsphere.proxy.backend.communication.DatabaseCommunicationEngineFactory; +import org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.BackendConnection; +import org.apache.shardingsphere.proxy.backend.context.ProxyContext; +import org.apache.shardingsphere.proxy.backend.response.data.QueryResponseCell; +import org.apache.shardingsphere.proxy.backend.response.data.QueryResponseRow; +import org.apache.shardingsphere.proxy.backend.response.data.impl.BinaryQueryResponseCell; +import org.apache.shardingsphere.proxy.backend.response.header.ResponseHeader; +import org.apache.shardingsphere.proxy.backend.response.header.query.QueryResponseHeader; +import org.apache.shardingsphere.proxy.backend.response.header.query.impl.QueryHeader; +import org.apache.shardingsphere.proxy.backend.response.header.update.UpdateResponseHeader; +import org.apache.shardingsphere.proxy.frontend.command.executor.QueryCommandExecutor; +import org.apache.shardingsphere.proxy.frontend.command.executor.ResponseType; +import org.apache.shardingsphere.proxy.frontend.postgresql.command.PostgreSQLConnectionContext; +import org.apache.shardingsphere.proxy.frontend.postgresql.command.query.PostgreSQLCommand; +import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement; +import org.apache.shardingsphere.sql.parser.sql.common.statement.dml.EmptyStatement; + +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; + +/** + * Command batch bind executor for openGauss. + */ +@RequiredArgsConstructor +public final class OpenGaussComBatchBindExecutor implements QueryCommandExecutor { + + private final PostgreSQLConnectionContext connectionContext; + + private final OpenGaussComBatchBindPacket packet; + + private final BackendConnection backendConnection; + + private final List<DatabaseCommunicationEngine> databaseCommunicationEngines = new LinkedList<>(); + + @Getter + private volatile ResponseType responseType; + + private boolean batchBindComplete; + + @Override + public Collection<DatabasePacket<?>> execute() throws SQLException { + List<List<Object>> parameters = packet.getParameters(); + for (int i = 0; i < parameters.size(); i++) { + List<Object> parameter = parameters.get(i); + init(parameter); + ResponseHeader responseHeader = databaseCommunicationEngines.get(i).execute(); + if (responseHeader instanceof QueryResponseHeader && connectionContext.getDescribeExecutor().isPresent()) { + connectionContext.getDescribeExecutor().get().setRowDescriptionPacket(getRowDescriptionPacket((QueryResponseHeader) responseHeader)); + } + if (responseHeader instanceof UpdateResponseHeader) { + responseType = ResponseType.UPDATE; + connectionContext.setUpdateCount(connectionContext.getUpdateCount() + ((UpdateResponseHeader) responseHeader).getUpdateCount()); + } + } + return Collections.singletonList(new PostgreSQLBindCompletePacket()); + } + + private void init(final List<Object> parameter) { + databaseCommunicationEngines.add(DatabaseCommunicationEngineFactory.getInstance().newBinaryProtocolInstance(getSqlStatement(), packet.getSql(), parameter, backendConnection)); + } + + private SQLStatement getSqlStatement() { + return connectionContext.getSqlStatement().orElseGet(() -> { + SQLStatement result = parseSql(packet.getSql(), backendConnection.getSchemaName()); + connectionContext.setSqlStatement(result); + return result; + }); + } + + private SQLStatement parseSql(final String sql, final String schemaName) { + if (sql.isEmpty()) { + return new EmptyStatement(); + } + ShardingSphereSQLParserEngine sqlStatementParserEngine = new ShardingSphereSQLParserEngine( + DatabaseTypeRegistry.getTrunkDatabaseTypeName(ProxyContext.getInstance().getMetaDataContexts().getMetaData(schemaName).getResource().getDatabaseType())); + return sqlStatementParserEngine.parse(sql, true); + } + + private PostgreSQLRowDescriptionPacket getRowDescriptionPacket(final QueryResponseHeader queryResponseHeader) { + responseType = ResponseType.QUERY; + Collection<PostgreSQLColumnDescription> columnDescriptions = createColumnDescriptions(queryResponseHeader); + return new PostgreSQLRowDescriptionPacket(columnDescriptions.size(), columnDescriptions); + } + + private Collection<PostgreSQLColumnDescription> createColumnDescriptions(final QueryResponseHeader queryResponseHeader) { + Collection<PostgreSQLColumnDescription> result = new LinkedList<>(); + int columnIndex = 0; + for (QueryHeader each : queryResponseHeader.getQueryHeaders()) { + result.add(new PostgreSQLColumnDescription(each.getColumnName(), ++columnIndex, each.getColumnType(), each.getColumnLength(), each.getColumnTypeName())); + } + return result; + } + + @Override + public boolean next() throws SQLException { + Iterator<DatabaseCommunicationEngine> iterator = databaseCommunicationEngines.iterator(); + while (iterator.hasNext()) { + if (iterator.next().next()) { + return true; + } else { + iterator.remove(); + } + } + return !batchBindComplete && (batchBindComplete = true); + } + + @Override + public PostgreSQLPacket getQueryRowPacket() throws SQLException { + if (batchBindComplete) { + String sqlCommand = connectionContext.getSqlStatement().map(SQLStatement::getClass).map(PostgreSQLCommand::valueOf).map(command -> command.map(Enum::name).orElse("")).orElse(""); + return new PostgreSQLCommandCompletePacket(sqlCommand, connectionContext.getUpdateCount()); + } + QueryResponseRow queryResponseRow = databaseCommunicationEngines.get(0).getQueryResponseRow(); + return new PostgreSQLDataRowPacket(getData(queryResponseRow)); + } + + private List<Object> getData(final QueryResponseRow queryResponseRow) { + Collection<QueryResponseCell> cells = queryResponseRow.getCells(); + List<Object> result = new ArrayList<>(cells.size()); + List<QueryResponseCell> columns = new ArrayList<>(cells); + for (int i = 0; i < columns.size(); i++) { + PostgreSQLColumnFormat format = packet.getResultFormatByColumnIndex(i); + result.add(PostgreSQLColumnFormat.BINARY == format ? createBinaryCell(columns.get(i)) : columns.get(i).getData()); + } + return result; + } + + private BinaryCell createBinaryCell(final QueryResponseCell cell) { + return new BinaryCell(PostgreSQLBinaryColumnType.valueOfJDBCType(((BinaryQueryResponseCell) cell).getJdbcType()), cell.getData()); + } +}
