Author: tv Date: Sun Dec 9 14:59:45 2018 New Revision: 1848523 URL: http://svn.apache.org/viewvc?rev=1848523&view=rev Log: TORQUE-354: Add doSelectAsStream() to BasePeerImpl
Added: db/torque/torque4/trunk/torque-runtime/src/main/java/org/apache/torque/util/ResultsetSpliterator.java (with props) Modified: db/torque/torque4/trunk/torque-runtime/src/main/java/org/apache/torque/util/BasePeerImpl.java Modified: db/torque/torque4/trunk/torque-runtime/src/main/java/org/apache/torque/util/BasePeerImpl.java URL: http://svn.apache.org/viewvc/db/torque/torque4/trunk/torque-runtime/src/main/java/org/apache/torque/util/BasePeerImpl.java?rev=1848523&r1=1848522&r2=1848523&view=diff ============================================================================== --- db/torque/torque4/trunk/torque-runtime/src/main/java/org/apache/torque/util/BasePeerImpl.java (original) +++ db/torque/torque4/trunk/torque-runtime/src/main/java/org/apache/torque/util/BasePeerImpl.java Sun Dec 9 14:59:45 2018 @@ -32,8 +32,12 @@ import java.util.List; import java.util.Map; import java.util.regex.Matcher; import java.util.regex.Pattern; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang.time.StopWatch; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.torque.Column; @@ -276,15 +280,14 @@ public class BasePeerImpl<T> implements preparedStatement, query.getPreparedStatementReplacements(), 0); - long startTime = System.currentTimeMillis(); + StopWatch stopWatch = new StopWatch(); log.debug("Executing delete " + sql + ", parameters = " + replacements); + stopWatch.start(); int affectedRows = preparedStatement.executeUpdate(); - long queryEndTime = System.currentTimeMillis(); - log.trace("delete took " + (queryEndTime - startTime) - + " milliseconds"); + log.trace("Delete took " + stopWatch.getTime() + " milliseconds"); return affectedRows; } @@ -466,14 +469,14 @@ public class BasePeerImpl<T> implements } position++; } - long startTime = System.currentTimeMillis(); + + StopWatch stopWatch = new StopWatch(); log.debug("Executing insert " + query.toString() + " using parameters " + replacementObjects); + stopWatch.start(); preparedStatement.executeUpdate(); - long queryEndTime = System.currentTimeMillis(); - log.trace("insert took " + (queryEndTime - startTime) - + " milliseconds"); + log.trace("Insert took " + stopWatch.getTime() + " milliseconds"); } catch (SQLException e) { @@ -676,14 +679,14 @@ public class BasePeerImpl<T> implements preparedStatement, selectQuery.getPreparedStatementReplacements(), 0); - long startTime = System.currentTimeMillis(); + + StopWatch stopWatch = new StopWatch(); log.debug("Executing insert " + query.toString() + " using parameters " + replacements); + stopWatch.start(); numberOfInsertedRows = preparedStatement.executeUpdate(); - long queryEndTime = System.currentTimeMillis(); - log.trace("insert took " + (queryEndTime - startTime) - + " milliseconds"); + log.trace("Insert took " + stopWatch.getTime() + " milliseconds"); } catch (SQLException e) { @@ -1003,39 +1006,56 @@ public class BasePeerImpl<T> implements final Connection connection) throws TorqueException { + try (Stream<TT> resultStream = doSelectAsStream(query, mapper, connection)) + { + return resultStream.collect(Collectors.toList()); + } + } + + /** + * Selects rows from a database an maps them to objects. + * This method returns a stream that <b>must</b> be closed after use. + * All resources used by this method will be closed when the stream is + * closed. + * + * @param query the SQL Query to execute, not null. + * @param mapper The mapper creating the objects from the resultSet, + * not null. + * @param connection the database connection, not null. + * + * @return The results of the query, not null. + * + * @throws TorqueException if querying the database fails. + */ + public <TT> Stream<TT> doSelectAsStream( + final String query, + final RecordMapper<TT> mapper, + final Connection connection) + throws TorqueException + { if (connection == null) { throw new NullPointerException("connection is null"); } - List<TT> result = new ArrayList<>(); - try (Statement statement = connection.createStatement()) + try { - long startTime = System.currentTimeMillis(); + Statement statement = connection.createStatement(); + StopWatch stopWatch = new StopWatch(); log.debug("Executing query " + query); - try (ResultSet resultSet = statement.executeQuery(query.toString())) - { - long queryEndTime = System.currentTimeMillis(); - log.trace("query took " + (queryEndTime - startTime) - + " milliseconds"); + stopWatch.start(); + ResultSet resultSet = statement.executeQuery(query.toString()); + ResultsetSpliterator<TT> spliterator = + new ResultsetSpliterator<>(mapper, null, statement, resultSet); + log.trace("Query took " + stopWatch.getTime() + " milliseconds"); - while (resultSet.next()) - { - TT rowResult = mapper.processRow(resultSet, 0, null); - result.add(rowResult); - } - long mappingEndTime = System.currentTimeMillis(); - log.trace("mapping took " + (mappingEndTime - queryEndTime) - + " milliseconds"); - } + return StreamSupport.stream(spliterator, false).onClose(spliterator); } catch (SQLException e) { throw ExceptionMapper.getInstance().toTorqueException(e); } - - return result; } /** @@ -1057,6 +1077,48 @@ public class BasePeerImpl<T> implements final Connection connection) throws TorqueException { + try (Stream<TT> resultStream = doSelectAsStream(criteria, mapper, connection)) + { + List<TT> result = resultStream.collect(Collectors.toList()); + + if (criteria.isSingleRecord() && result.size() > 1) + { + throw new TooManyRowsException( + "Criteria expected single Record and " + + "Multiple Records were selected"); + } + + return result; + } + } + + /** + * Performs a SQL <code>select</code> using a PreparedStatement. + * This method returns a stream that <b>must</b> be closed after use. + * All resources used by this method will be closed when the stream is + * closed. + * + * @param criteria A Criteria specifying the records to select, not null. + * @param mapper The mapper creating the objects from the resultSet, + * not null. + * @param connection the database connection for selecting records, + * not null. + * + * @return The results of the query as a Stream, not null. + * + * @throws TorqueException Error performing database query. + */ + public <TT> Stream<TT> doSelectAsStream( + final Criteria criteria, + final RecordMapper<TT> mapper, + final Connection connection) + throws TorqueException + { + if (connection == null) + { + throw new NullPointerException("connection is null"); + } + correctBooleans(criteria); Query query = SqlBuilder.buildQuery(criteria); @@ -1068,8 +1130,9 @@ public class BasePeerImpl<T> implements query.getFromClause().add(new FromElement(tableName)); } - try (PreparedStatement statement = connection.prepareStatement(query.toString())) + try { + PreparedStatement statement = connection.prepareStatement(query.toString()); if (query.getFetchSize() != null) { statement.setFetchSize(query.getFetchSize()); @@ -1080,85 +1143,18 @@ public class BasePeerImpl<T> implements query.getPreparedStatementReplacements(), 0); - // Set offset and limit - long offset; - Database database = Torque.getDatabase(criteria.getDbName()); - if (database.getAdapter().supportsNativeOffset()) - { - offset = 0; //database takes care of offset - } - else - { - offset = criteria.getOffset(); - } - - long limit; - if (database.getAdapter().supportsNativeLimit()) - { - limit = -1; //database takes care of offset - } - else - { - if (database.getAdapter().supportsNativeOffset()) - { - limit = criteria.getLimit(); - } - else - { - if (criteria.getLimit() == -1) - { - limit = criteria.getLimit(); - } - else - { - limit = offset + criteria.getLimit(); - } - } - } - - long startTime = System.currentTimeMillis(); + StopWatch stopWatch = new StopWatch(); log.debug("Executing query " + query + ", parameters = " + replacements); - try (ResultSet resultSet = statement.executeQuery()) - { - long queryEndTime = System.currentTimeMillis(); - log.trace("query took " + (queryEndTime - startTime) - + " milliseconds"); - - List<TT> result = new ArrayList<>(); - int rowNumber = 0; - while (resultSet.next()) - { - if (rowNumber < offset) - { - rowNumber++; - continue; - } - if (limit >= 0 && rowNumber >= limit) - { - break; - } - - TT rowResult = mapper.processRow(resultSet, 0, criteria); - result.add(rowResult); + stopWatch.start(); + ResultSet resultSet = statement.executeQuery(); + ResultsetSpliterator<TT> spliterator = + new ResultsetSpliterator<>(mapper, criteria, statement, resultSet); + log.trace("Query took " + stopWatch.getTime() + " milliseconds"); - rowNumber++; - } - long mappingEndTime = System.currentTimeMillis(); - log.trace("mapping took " + (mappingEndTime - queryEndTime) - + " milliseconds"); - - if (criteria.isSingleRecord() && result.size() > 1) - { - throw new TooManyRowsException( - "Criteria expected single Record and " - + "Multiple Records were selected"); - } - - return result; - } + return StreamSupport.stream(spliterator, false).onClose(spliterator); } catch (SQLException e) { @@ -1410,16 +1406,15 @@ public class BasePeerImpl<T> implements preparedStatement, query.getPreparedStatementReplacements(), position - 1); - long startTime = System.currentTimeMillis(); + StopWatch stopWatch = new StopWatch(); log.debug("Executing update " + query.toString() + " using update parameters " + replacementObjects + " and query parameters " + replacements); + stopWatch.start(); int affectedRows = preparedStatement.executeUpdate(); - long queryEndTime = System.currentTimeMillis(); - log.trace("update took " + (queryEndTime - startTime) - + " milliseconds"); + log.trace("Update took " + stopWatch.getTime() + " milliseconds"); return affectedRows; } Added: db/torque/torque4/trunk/torque-runtime/src/main/java/org/apache/torque/util/ResultsetSpliterator.java URL: http://svn.apache.org/viewvc/db/torque/torque4/trunk/torque-runtime/src/main/java/org/apache/torque/util/ResultsetSpliterator.java?rev=1848523&view=auto ============================================================================== --- db/torque/torque4/trunk/torque-runtime/src/main/java/org/apache/torque/util/ResultsetSpliterator.java (added) +++ db/torque/torque4/trunk/torque-runtime/src/main/java/org/apache/torque/util/ResultsetSpliterator.java Sun Dec 9 14:59:45 2018 @@ -0,0 +1,159 @@ +package org.apache.torque.util; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Spliterator; +import java.util.Spliterators.AbstractSpliterator; +import java.util.function.Consumer; + +import org.apache.torque.Database; +import org.apache.torque.Torque; +import org.apache.torque.TorqueException; +import org.apache.torque.TorqueRuntimeException; +import org.apache.torque.criteria.Criteria; +import org.apache.torque.om.mapper.RecordMapper; + +/** + * Stream support: Encapsulate iteration over a JDBC ResultSet + * + * @author <a href="mailto:t...@apache.org">Thomas Vandahl</a> + */ +public class ResultsetSpliterator<T> extends AbstractSpliterator<T> implements Runnable +{ + private final RecordMapper<T> recordMapper; + private final Criteria criteria; + private final Statement statement; + private final ResultSet resultSet; + + private long offset; + private long limit; + private long rowNumber; + + /** + * Constructor + * + * @param recordMapper a RecordMapper to map ResultSet rows to entities of type + * <T> + * @param criteria a Criteria + * @param statement the statement that created the ResultSet + * @param resultSet the JDBC result set + * @throws TorqueException + */ + public ResultsetSpliterator(RecordMapper<T> recordMapper, Criteria criteria, + Statement statement, ResultSet resultSet) throws TorqueException + { + super(Long.MAX_VALUE, Spliterator.ORDERED); + + this.recordMapper = recordMapper; + this.criteria = criteria; + this.statement = statement; + this.resultSet = resultSet; + this.offset = 0; //database takes care of offset + this.limit = -1; //database takes care of limit + this.rowNumber = 0; + + // Set offset and limit + if (criteria != null) + { + Database database = Torque.getDatabase(criteria.getDbName()); + if (!database.getAdapter().supportsNativeOffset()) + { + offset = criteria.getOffset(); + } + + if (!database.getAdapter().supportsNativeLimit()) + { + if (database.getAdapter().supportsNativeOffset()) + { + limit = criteria.getLimit(); + } + else if (criteria.getLimit() != -1) + { + limit = offset + criteria.getLimit(); + } + } + } + } + + /** + * Advance ResultSet and map row to entity <T> + * + * @see java.util.Spliterator#tryAdvance(java.util.function.Consumer) + */ + @Override + public boolean tryAdvance(Consumer<? super T> action) + { + try + { + while (resultSet.next()) + { + if (rowNumber < offset) + { + rowNumber++; + continue; + } + if (limit >= 0 && rowNumber >= limit) + { + return false; + } + + rowNumber++; + T result = recordMapper.processRow(resultSet, 0, criteria); + action.accept(result); + return true; + } + } + catch (SQLException e) + { + throw new TorqueRuntimeException(e); + } + + return false; + } + + /** + * Method to be run onClose() of associated stream + * + * @see java.lang.Runnable#run() + */ + @Override + public void run() + { + try + { + resultSet.close(); + } + catch (SQLException e) + { + throw new TorqueRuntimeException(e); + } + try + { + statement.close(); + } + catch (SQLException e) + { + throw new TorqueRuntimeException(e); + } + } +} Propchange: db/torque/torque4/trunk/torque-runtime/src/main/java/org/apache/torque/util/ResultsetSpliterator.java ------------------------------------------------------------------------------ svn:mime-type = text/plain --------------------------------------------------------------------- To unsubscribe, e-mail: torque-dev-unsubscr...@db.apache.org For additional commands, e-mail: torque-dev-h...@db.apache.org