keith-turner commented on a change in pull request #140:
URL: https://github.com/apache/accumulo/pull/140#discussion_r432139013



##########
File path: 
core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/codec/CompressorFactory.java
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.file.rfile.bcfile.codec;
+
+import java.io.IOException;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.file.rfile.bcfile.Compression.Algorithm;
+import org.apache.hadoop.io.compress.Compressor;
+import org.apache.hadoop.io.compress.Decompressor;
+
+/**
+ * Compressor Factory is an interface to create compressors and decompressors 
based on the supplied
+ * algorithm. Extensions may allow for alternative factory methods, such as 
object pooling.
+ */
+public interface CompressorFactory {
+
+  /**
+   * Provides the caller a compressor object.
+   *
+   * @param compressionAlgorithm
+   *          compressor's algorithm.
+   * @return compressor.
+   * @throws IOException
+   *           I/O Exception during factory implementation
+   */
+  public Compressor getCompressor(Algorithm compressionAlgorithm) throws 
IOException;
+
+  /**
+   * Method to release a compressor. This implementation will call end on the 
compressor.
+   *
+   * @param algorithm
+   *          Supplied compressor's Algorithm.
+   * @param compressor
+   *          Compressor object
+   */
+  public void releaseCompressor(Algorithm algorithm, Compressor compressor);
+
+  /**
+   * Method to release the decompressor. This implementation will call end on 
the decompressor.
+   *
+   * @param algorithm
+   *          Supplied decompressor's Algorithm.
+   * @param decompressor
+   *          decompressor object.
+   */
+  public void releaseDecompressor(Algorithm algorithm, Decompressor 
decompressor);
+
+  /**
+   * Provides the caller a decompressor object.
+   *
+   * @param compressionAlgorithm
+   *          decompressor's algorithm.
+   * @return decompressor.
+   */
+  public Decompressor getDecompressor(Algorithm compressionAlgorithm);
+
+  /**
+   * Implementations may choose to have a close call implemented.
+   */
+  public void close();
+
+  /**
+   * Provides the capability to update the compression factory
+   *
+   * @param acuConf
+   *          accumulo configuration
+   */
+  public void update(final AccumuloConfiguration acuConf);

Review comment:
       AccumuloConfiguration is not a stable type for a plugin interface.  Its 
constantly changing w/ internal refactoring. There are alternatives meant for 
plugins in the SPI package.
   
   If a pluggable component is desired I would recommend moving the plugabble 
interfaces to the 
[spi](https://github.com/apache/accumulo/blob/master/core/src/main/java/org/apache/accumulo/core/spi/package-info.java)
 packages.  Code in the spi package can only use SPI and API types and this 
analyzed at build time.  Current pluggable types outside of the SPI package 
have faired very poorly over time in terms of stability, people accidentally 
break them w/o realizing it.  Thinking of this reminded me to open #1617.  A 
really nice internal change to clean up tech debt was made in #1519, but this 
broke the plugin interface mentioned in #1617.  There are multiple other 
examples of this where a plugin is accidentally broken and its hard to know it 
even happens before release.  With SPI this will be a simple check using git 
diff of the package.
   
    

##########
File path: core/src/main/java/org/apache/accumulo/core/conf/Property.java
##########
@@ -516,7 +516,23 @@
   TSERV_SUMMARY_RETRIEVAL_THREADS("tserver.summary.retrieval.threads", "10", 
PropertyType.COUNT,
       "The number of threads on each tablet server available to retrieve"
           + " summary data, that is not currently in cache, from RFiles."),
-
+  TSERV_COMPRESSOR_FACTORY("tserver.compressor.factory.class",
+      "org.apache.accumulo.core.file.rfile.bcfile.codec.CompressorFactory", 
PropertyType.CLASSNAME,
+      "Tablet Server configuration for the compressor factory that will be 
used when requesting compressors."),
+  TSERV_COMPRESSOR_IN_BUFFER("tserver.compressor.factory.input.buffer.size", 
"1K",
+      PropertyType.MEMORY,
+      "Tablet Server configuration for the compressor factory that adjusts the 
input buffer size. Zero uses the full compression block size."),
+  TSERV_COMPRESSOR_OUT_BUFFER("tserver.compressor.factory.output.buffer.size", 
"1K",
+      PropertyType.MEMORY,
+      "Tablet Server configuration for the compressor factory that adjusts the 
output buffer size. Default uses the full compression block size."),
+  TSERV_COMPRESSOR_POOL_IDLE("tserver.compressor.pool.max.idle", "25", 
PropertyType.COUNT,
+      "Tablet Server configuration to contrain the maximum number of idle 
compressors within the pool"),
+  
TSERV_COMPRESSOR_POOL_IDLE_SWEEP_TIME("tserver.compressor.pool.max.idle.sweep.time",
 "0ms",

Review comment:
       I think these are options for a specific plugin.  If someone configures 
another plugin class, then these options may not be relevant.  Could possibly 
structure props like :
   
   ```
   tserver.compressor.factory.class=<class>
   tserver.compressor.factory.opts.<key1>=<value1>
   tserver.compressor.factory.opts.<key2>=<value2>
   ```
   
   And the plugin would be given a map containing 
`<key1>=<value1>,<key2>=<value2>`

##########
File path: 
core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/codec/CompressorPool.java
##########
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.file.rfile.bcfile.codec;
+
+import java.io.IOException;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.file.rfile.bcfile.Compression.Algorithm;
+import org.apache.commons.pool.impl.GenericKeyedObjectPool;
+import org.apache.hadoop.io.compress.Compressor;
+import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.log4j.Logger;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Compressor factory extension that enables object pooling using Commons 
Pool. The design will have a keyed compressor pool and decompressor pool. The 
key of
+ * which will be the Algorithm itself.
+ *
+ */
+public class CompressorPool extends CompressorFactory {
+
+  private static final Logger LOG = 
Logger.getLogger(CompressorObjectFactory.class);
+
+  /**
+   * Compressor pool.
+   */
+  GenericKeyedObjectPool<Algorithm,Compressor> compressorPool;
+
+  /**
+   * Decompressor pool
+   */
+  GenericKeyedObjectPool<Algorithm,Decompressor> decompressorPool;
+
+  public CompressorPool(AccumuloConfiguration acuConf) {
+
+    super(acuConf);
+
+    compressorPool = new GenericKeyedObjectPool<Algorithm,Compressor>(new 
CompressorObjectFactory());
+    // ensure that the pool grows when needed
+    
compressorPool.setWhenExhaustedAction(GenericKeyedObjectPool.WHEN_EXHAUSTED_GROW);
+
+    decompressorPool = new GenericKeyedObjectPool<Algorithm,Decompressor>(new 
DecompressorObjectFactory());
+    // ensure that the pool grows when needed.
+    
decompressorPool.setWhenExhaustedAction(GenericKeyedObjectPool.WHEN_EXHAUSTED_GROW);
+
+    // perform the initial update.
+    update(acuConf);
+
+  }
+
+  public void setMaxIdle(final int size) {
+    // check that we are changing the value.
+    // this will avoid synchronization within the pool
+    if (size != compressorPool.getMaxIdle())
+      compressorPool.setMaxIdle(size);
+    if (size != decompressorPool.getMaxIdle())
+      decompressorPool.setMaxIdle(size);
+  }
+
+  @Override
+  public Compressor getCompressor(Algorithm compressionAlgorithm) throws 
IOException {
+    Preconditions.checkNotNull(compressionAlgorithm, "Algorithm cannot be 
null");
+    try {
+      return compressorPool.borrowObject(compressionAlgorithm);
+    } catch (Exception e) {

Review comment:
       In [this 
javadoc](https://commons.apache.org/proper/commons-pool/api-2.8.0/org/apache/commons/pool2/impl/GenericKeyedObjectPool.html#borrowObject-K-)
 I saw the following.
   
   ```
   IllegalStateException - after close has been called on this pool
   Exception - when makeObject throws an exception
   NoSuchElementException - when the pool is exhausted and cannot or will not 
return another instance
   ```
   

##########
File path: core/src/main/java/org/apache/accumulo/core/conf/Property.java
##########
@@ -516,7 +516,23 @@
   TSERV_SUMMARY_RETRIEVAL_THREADS("tserver.summary.retrieval.threads", "10", 
PropertyType.COUNT,
       "The number of threads on each tablet server available to retrieve"
           + " summary data, that is not currently in cache, from RFiles."),
-
+  TSERV_COMPRESSOR_FACTORY("tserver.compressor.factory.class",
+      "org.apache.accumulo.core.file.rfile.bcfile.codec.CompressorFactory", 
PropertyType.CLASSNAME,

Review comment:
       Was CompressorPool intended to be the default?
   
   ```suggestion
         "org.apache.accumulo.core.file.rfile.bcfile.codec.CompressorPool", 
PropertyType.CLASSNAME,
   ```

##########
File path: 
core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/codec/CompressorPool.java
##########
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.file.rfile.bcfile.codec;
+
+import java.io.IOException;
+import java.util.Objects;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.file.rfile.bcfile.Compression.Algorithm;
+import org.apache.commons.pool2.impl.GenericKeyedObjectPool;

Review comment:
       I see this is in an impl package of commons.pool2. I wonder if that 
implies it may not be stable.

##########
File path: 
core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/codec/CompressorPool.java
##########
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.file.rfile.bcfile.codec;
+
+import java.io.IOException;
+import java.util.Objects;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.file.rfile.bcfile.Compression.Algorithm;
+import org.apache.commons.pool2.impl.GenericKeyedObjectPool;
+import org.apache.hadoop.io.compress.Compressor;
+import org.apache.hadoop.io.compress.Decompressor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Compressor factory extension that enables object pooling using Commons 
Pool. The design will have
+ * a keyed compressor pool and decompressor pool. The key of which will be the 
Algorithm itself.
+ *
+ */
+public class CompressorPool extends DefaultCompressorFactory {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(CompressorPoolFactory.class);
+
+  /**
+   * Compressor pool.
+   */
+  GenericKeyedObjectPool<Algorithm,Compressor> compressorPool;
+
+  /**
+   * Decompressor pool
+   */
+  GenericKeyedObjectPool<Algorithm,Decompressor> decompressorPool;
+
+  public CompressorPool(AccumuloConfiguration acuConf) {
+
+    super(acuConf);
+
+    compressorPool = new GenericKeyedObjectPool<Algorithm,Compressor>(new 
CompressorPoolFactory());

Review comment:
       I looked at the commons pool javadoc a bit and noticed it had a 
GenericKeyedObjectPoolConfig object.  Not sure if this is workable, but I was 
wondering about making the config pass through.  Where a prop could be set in 
the Accumulo config for any setter on GenericKeyedObjectPoolConfig.  Like 
GenericKeyedObjectPoolConfig has a setMaxTotal method. The accumulo config 
could have something like.
   
   ```
   tserver.compressor.factory.class=...CompressorPool
   tserver.compressor.factory.opts.maxTotal=42
   ```
   
   and here when it sees maxTotal it calls the setMaxTotal method on 
GenericKeyedObjectPoolConfig.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to