keith-turner commented on a change in pull request #140: URL: https://github.com/apache/accumulo/pull/140#discussion_r432139013
########## File path: core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/codec/CompressorFactory.java ########## @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.file.rfile.bcfile.codec; + +import java.io.IOException; + +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.file.rfile.bcfile.Compression.Algorithm; +import org.apache.hadoop.io.compress.Compressor; +import org.apache.hadoop.io.compress.Decompressor; + +/** + * Compressor Factory is an interface to create compressors and decompressors based on the supplied + * algorithm. Extensions may allow for alternative factory methods, such as object pooling. + */ +public interface CompressorFactory { + + /** + * Provides the caller a compressor object. + * + * @param compressionAlgorithm + * compressor's algorithm. + * @return compressor. + * @throws IOException + * I/O Exception during factory implementation + */ + public Compressor getCompressor(Algorithm compressionAlgorithm) throws IOException; + + /** + * Method to release a compressor. This implementation will call end on the compressor. + * + * @param algorithm + * Supplied compressor's Algorithm. + * @param compressor + * Compressor object + */ + public void releaseCompressor(Algorithm algorithm, Compressor compressor); + + /** + * Method to release the decompressor. This implementation will call end on the decompressor. + * + * @param algorithm + * Supplied decompressor's Algorithm. + * @param decompressor + * decompressor object. + */ + public void releaseDecompressor(Algorithm algorithm, Decompressor decompressor); + + /** + * Provides the caller a decompressor object. + * + * @param compressionAlgorithm + * decompressor's algorithm. + * @return decompressor. + */ + public Decompressor getDecompressor(Algorithm compressionAlgorithm); + + /** + * Implementations may choose to have a close call implemented. + */ + public void close(); + + /** + * Provides the capability to update the compression factory + * + * @param acuConf + * accumulo configuration + */ + public void update(final AccumuloConfiguration acuConf); Review comment: AccumuloConfiguration is not a stable type for a plugin interface. Its constantly changing w/ internal refactoring. There are alternatives meant for plugins in the SPI package. If a pluggable component is desired I would recommend moving the plugabble interfaces to the [spi](https://github.com/apache/accumulo/blob/master/core/src/main/java/org/apache/accumulo/core/spi/package-info.java) packages. Code in the spi package can only use SPI and API types and this analyzed at build time. Current pluggable types outside of the SPI package have faired very poorly over time in terms of stability, people accidentally break them w/o realizing it. Thinking of this reminded me to open #1617. A really nice internal change to clean up tech debt was made in #1519, but this broke the plugin interface mentioned in #1617. There are multiple other examples of this where a plugin is accidentally broken and its hard to know it even happens before release. With SPI this will be a simple check using git diff of the package. ########## File path: core/src/main/java/org/apache/accumulo/core/conf/Property.java ########## @@ -516,7 +516,23 @@ TSERV_SUMMARY_RETRIEVAL_THREADS("tserver.summary.retrieval.threads", "10", PropertyType.COUNT, "The number of threads on each tablet server available to retrieve" + " summary data, that is not currently in cache, from RFiles."), - + TSERV_COMPRESSOR_FACTORY("tserver.compressor.factory.class", + "org.apache.accumulo.core.file.rfile.bcfile.codec.CompressorFactory", PropertyType.CLASSNAME, + "Tablet Server configuration for the compressor factory that will be used when requesting compressors."), + TSERV_COMPRESSOR_IN_BUFFER("tserver.compressor.factory.input.buffer.size", "1K", + PropertyType.MEMORY, + "Tablet Server configuration for the compressor factory that adjusts the input buffer size. Zero uses the full compression block size."), + TSERV_COMPRESSOR_OUT_BUFFER("tserver.compressor.factory.output.buffer.size", "1K", + PropertyType.MEMORY, + "Tablet Server configuration for the compressor factory that adjusts the output buffer size. Default uses the full compression block size."), + TSERV_COMPRESSOR_POOL_IDLE("tserver.compressor.pool.max.idle", "25", PropertyType.COUNT, + "Tablet Server configuration to contrain the maximum number of idle compressors within the pool"), + TSERV_COMPRESSOR_POOL_IDLE_SWEEP_TIME("tserver.compressor.pool.max.idle.sweep.time", "0ms", Review comment: I think these are options for a specific plugin. If someone configures another plugin class, then these options may not be relevant. Could possibly structure props like : ``` tserver.compressor.factory.class=<class> tserver.compressor.factory.opts.<key1>=<value1> tserver.compressor.factory.opts.<key2>=<value2> ``` And the plugin would be given a map containing `<key1>=<value1>,<key2>=<value2>` ########## File path: core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/codec/CompressorPool.java ########## @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.file.rfile.bcfile.codec; + +import java.io.IOException; + +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.file.rfile.bcfile.Compression.Algorithm; +import org.apache.commons.pool.impl.GenericKeyedObjectPool; +import org.apache.hadoop.io.compress.Compressor; +import org.apache.hadoop.io.compress.Decompressor; +import org.apache.log4j.Logger; + +import com.google.common.base.Preconditions; + +/** + * Compressor factory extension that enables object pooling using Commons Pool. The design will have a keyed compressor pool and decompressor pool. The key of + * which will be the Algorithm itself. + * + */ +public class CompressorPool extends CompressorFactory { + + private static final Logger LOG = Logger.getLogger(CompressorObjectFactory.class); + + /** + * Compressor pool. + */ + GenericKeyedObjectPool<Algorithm,Compressor> compressorPool; + + /** + * Decompressor pool + */ + GenericKeyedObjectPool<Algorithm,Decompressor> decompressorPool; + + public CompressorPool(AccumuloConfiguration acuConf) { + + super(acuConf); + + compressorPool = new GenericKeyedObjectPool<Algorithm,Compressor>(new CompressorObjectFactory()); + // ensure that the pool grows when needed + compressorPool.setWhenExhaustedAction(GenericKeyedObjectPool.WHEN_EXHAUSTED_GROW); + + decompressorPool = new GenericKeyedObjectPool<Algorithm,Decompressor>(new DecompressorObjectFactory()); + // ensure that the pool grows when needed. + decompressorPool.setWhenExhaustedAction(GenericKeyedObjectPool.WHEN_EXHAUSTED_GROW); + + // perform the initial update. + update(acuConf); + + } + + public void setMaxIdle(final int size) { + // check that we are changing the value. + // this will avoid synchronization within the pool + if (size != compressorPool.getMaxIdle()) + compressorPool.setMaxIdle(size); + if (size != decompressorPool.getMaxIdle()) + decompressorPool.setMaxIdle(size); + } + + @Override + public Compressor getCompressor(Algorithm compressionAlgorithm) throws IOException { + Preconditions.checkNotNull(compressionAlgorithm, "Algorithm cannot be null"); + try { + return compressorPool.borrowObject(compressionAlgorithm); + } catch (Exception e) { Review comment: In [this javadoc](https://commons.apache.org/proper/commons-pool/api-2.8.0/org/apache/commons/pool2/impl/GenericKeyedObjectPool.html#borrowObject-K-) I saw the following. ``` IllegalStateException - after close has been called on this pool Exception - when makeObject throws an exception NoSuchElementException - when the pool is exhausted and cannot or will not return another instance ``` ########## File path: core/src/main/java/org/apache/accumulo/core/conf/Property.java ########## @@ -516,7 +516,23 @@ TSERV_SUMMARY_RETRIEVAL_THREADS("tserver.summary.retrieval.threads", "10", PropertyType.COUNT, "The number of threads on each tablet server available to retrieve" + " summary data, that is not currently in cache, from RFiles."), - + TSERV_COMPRESSOR_FACTORY("tserver.compressor.factory.class", + "org.apache.accumulo.core.file.rfile.bcfile.codec.CompressorFactory", PropertyType.CLASSNAME, Review comment: Was CompressorPool intended to be the default? ```suggestion "org.apache.accumulo.core.file.rfile.bcfile.codec.CompressorPool", PropertyType.CLASSNAME, ``` ########## File path: core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/codec/CompressorPool.java ########## @@ -0,0 +1,273 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.file.rfile.bcfile.codec; + +import java.io.IOException; +import java.util.Objects; + +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.file.rfile.bcfile.Compression.Algorithm; +import org.apache.commons.pool2.impl.GenericKeyedObjectPool; Review comment: I see this is in an impl package of commons.pool2. I wonder if that implies it may not be stable. ########## File path: core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/codec/CompressorPool.java ########## @@ -0,0 +1,273 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.file.rfile.bcfile.codec; + +import java.io.IOException; +import java.util.Objects; + +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.file.rfile.bcfile.Compression.Algorithm; +import org.apache.commons.pool2.impl.GenericKeyedObjectPool; +import org.apache.hadoop.io.compress.Compressor; +import org.apache.hadoop.io.compress.Decompressor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Compressor factory extension that enables object pooling using Commons Pool. The design will have + * a keyed compressor pool and decompressor pool. The key of which will be the Algorithm itself. + * + */ +public class CompressorPool extends DefaultCompressorFactory { + + private static final Logger LOG = LoggerFactory.getLogger(CompressorPoolFactory.class); + + /** + * Compressor pool. + */ + GenericKeyedObjectPool<Algorithm,Compressor> compressorPool; + + /** + * Decompressor pool + */ + GenericKeyedObjectPool<Algorithm,Decompressor> decompressorPool; + + public CompressorPool(AccumuloConfiguration acuConf) { + + super(acuConf); + + compressorPool = new GenericKeyedObjectPool<Algorithm,Compressor>(new CompressorPoolFactory()); Review comment: I looked at the commons pool javadoc a bit and noticed it had a GenericKeyedObjectPoolConfig object. Not sure if this is workable, but I was wondering about making the config pass through. Where a prop could be set in the Accumulo config for any setter on GenericKeyedObjectPoolConfig. Like GenericKeyedObjectPoolConfig has a setMaxTotal method. The accumulo config could have something like. ``` tserver.compressor.factory.class=...CompressorPool tserver.compressor.factory.opts.maxTotal=42 ``` and here when it sees maxTotal it calls the setMaxTotal method on GenericKeyedObjectPoolConfig. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
