This is an automated email from the ASF dual-hosted git repository.

leet pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/metron.git


The following commit(s) were added to refs/heads/master by this push:
     new 5e9cf70  METRON-2005 Batch Writer writes 0-byte files to HDFS on 
rotation (justinleet) closes apache/metron#1338
5e9cf70 is described below

commit 5e9cf705ef5feb1d723f584c1b6134bcc1eda9cf
Author: justinleet <justinjl...@gmail.com>
AuthorDate: Tue Feb 19 14:09:02 2019 -0500

    METRON-2005 Batch Writer writes 0-byte files to HDFS on rotation 
(justinleet) closes apache/metron#1338
---
 metron-platform/metron-writer/README.md            |  3 +
 metron-platform/metron-writer/pom.xml              |  5 ++
 .../writer/hdfs/ClonedSyncPolicyCreator.java       |  3 +
 .../org/apache/metron/writer/hdfs/HdfsWriter.java  | 74 +++++++++++++------
 .../apache/metron/writer/hdfs/SourceHandler.java   | 25 +++++--
 .../metron/writer/hdfs/SourceHandlerCallback.java  | 23 +++++-
 .../metron/writer/hdfs/SourceHandlerKey.java       |  8 ++
 .../apache/metron/writer/hdfs/HdfsWriterTest.java  | 38 ----------
 .../metron/writer/hdfs/SourceHandlerTest.java      | 86 ++++++++++++++++++++++
 9 files changed, 194 insertions(+), 71 deletions(-)

diff --git a/metron-platform/metron-writer/README.md 
b/metron-platform/metron-writer/README.md
index bbec39b..ed4f053 100644
--- a/metron-platform/metron-writer/README.md
+++ b/metron-platform/metron-writer/README.md
@@ -65,6 +65,9 @@ To manage the output path, a base path argument is provided 
by the Flux file, wi
 This means that all output will land in `/apps/metron/`.  With no further 
adjustment, it will be `/apps/metron/<sensor>/`.
 However, by modifying the sensor's JSON config, it is possible to provide 
additional pathing based on the the message itself.
 
+The output format of a file will be 
`{prefix}{componentId}-{taskId}-{rotationNum}-{timestamp}{extension}`. Notably, 
because of the way
+file rotations are handled by the HdfsWriter, `rotationNum` will always be 0, 
but RotationActions still get executed normally.
+
 E.g.
 ```
 {
diff --git a/metron-platform/metron-writer/pom.xml 
b/metron-platform/metron-writer/pom.xml
index 0002f7a..a11ce6e 100644
--- a/metron-platform/metron-writer/pom.xml
+++ b/metron-platform/metron-writer/pom.xml
@@ -213,6 +213,11 @@
             <version>${project.parent.version}</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.metron</groupId>
+            <artifactId>stellar-common</artifactId>
+            <version>${project.parent.version}</version>
+        </dependency>
     </dependencies>
 
     <build>
diff --git 
a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/ClonedSyncPolicyCreator.java
 
b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/ClonedSyncPolicyCreator.java
index 4d32fc9..1f908a9 100644
--- 
a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/ClonedSyncPolicyCreator.java
+++ 
b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/ClonedSyncPolicyCreator.java
@@ -37,6 +37,9 @@ public class ClonedSyncPolicyCreator implements 
SyncPolicyCreator {
       // SyncPolicy object does not implement Cloneable, so we'll need to 
clone it via serialization
       //to get a fresh policy object.  Note: this would be expensive if it was 
in the critical path,
       // but should be called infrequently (once per sync).
+
+      // Reset the SyncPolicy to ensure that the new count properly resets.
+      syncPolicy.reset();
       byte[] serializedForm = SerDeUtils.toBytes(syncPolicy);
       return SerDeUtils.fromBytes(serializedForm, SyncPolicy.class);
     }
diff --git 
a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/HdfsWriter.java
 
b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/HdfsWriter.java
index 1ba9a6b..aaa58fa 100644
--- 
a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/HdfsWriter.java
+++ 
b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/HdfsWriter.java
@@ -17,29 +17,37 @@
  */
 package org.apache.metron.writer.hdfs;
 
+import java.io.IOException;
+import java.io.Serializable;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import org.apache.metron.common.configuration.IndexingConfigurations;
+import org.apache.metron.common.configuration.writer.WriterConfiguration;
+import org.apache.metron.common.writer.BulkMessageWriter;
+import org.apache.metron.common.writer.BulkWriterResponse;
+import org.apache.metron.stellar.common.StellarProcessor;
 import org.apache.metron.stellar.dsl.Context;
 import org.apache.metron.stellar.dsl.MapVariableResolver;
 import org.apache.metron.stellar.dsl.StellarFunctions;
 import org.apache.metron.stellar.dsl.VariableResolver;
-import org.apache.metron.stellar.common.StellarProcessor;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.tuple.Tuple;
-import org.apache.metron.common.configuration.writer.WriterConfiguration;
-import org.apache.metron.common.writer.BulkMessageWriter;
-import org.apache.metron.common.writer.BulkWriterResponse;
 import org.apache.storm.hdfs.bolt.format.FileNameFormat;
 import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
 import org.apache.storm.hdfs.bolt.rotation.NoRotationPolicy;
 import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy;
 import org.apache.storm.hdfs.bolt.sync.SyncPolicy;
 import org.apache.storm.hdfs.common.rotation.RotationAction;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.tuple.Tuple;
 import org.json.simple.JSONObject;
-
-import java.io.*;
-import java.util.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class HdfsWriter implements BulkMessageWriter<JSONObject>, Serializable 
{
+  private static final Logger LOG = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
   List<RotationAction> rotationActions = new ArrayList<>();
   FileRotationPolicy rotationPolicy = new NoRotationPolicy();
   SyncPolicy syncPolicy;
@@ -82,38 +90,48 @@ public class HdfsWriter implements 
BulkMessageWriter<JSONObject>, Serializable {
     this.fileNameFormat.prepare(stormConfig,topologyContext);
     if(syncPolicy != null) {
       //if the user has specified the sync policy, we don't want to override 
their wishes.
+      LOG.debug("Using user specified sync policy {}", 
syncPolicy.getClass().getSimpleName());
       syncPolicyCreator = new ClonedSyncPolicyCreator(syncPolicy);
     }
     else {
       //if the user has not, then we want to have the sync policy depend on 
the batch size.
+      LOG.debug("No user specified sync policy, using CountSyncPolicy based on 
batch size");
       syncPolicyCreator = (source, config) -> new CountSyncPolicy(config == 
null?1:config.getBatchSize(source));
     }
   }
 
-
   @Override
   public BulkWriterResponse write(String sourceType
                    , WriterConfiguration configurations
                    , Iterable<Tuple> tuples
                    , List<JSONObject> messages
-                   ) throws Exception
-  {
+                   ) throws Exception {
     BulkWriterResponse response = new BulkWriterResponse();
 
     // Currently treating all the messages in a group for pass/failure.
-    try {
-      // Messages can all result in different HDFS paths, because of Stellar 
Expressions, so we'll need to iterate through
-      for(JSONObject message : messages) {
-        String path = getHdfsPathExtension(
-                sourceType,
-                
(String)configurations.getSensorConfig(sourceType).getOrDefault(IndexingConfigurations.OUTPUT_PATH_FUNCTION_CONF,
 ""),
-                message
-        );
+    // Messages can all result in different HDFS paths, because of Stellar 
Expressions, so we'll need to iterate through
+    for (JSONObject message : messages) {
+      String path = getHdfsPathExtension(
+          sourceType,
+          (String) configurations.getSensorConfig(sourceType)
+              .getOrDefault(IndexingConfigurations.OUTPUT_PATH_FUNCTION_CONF, 
""),
+          message
+      );
+
+      try {
+        LOG.trace("Writing message {} to path: {}", message.toJSONString(), 
path);
         SourceHandler handler = getSourceHandler(sourceType, path, 
configurations);
         handler.handle(message, sourceType, configurations, syncPolicyCreator);
+      } catch (Exception e) {
+        LOG.error(
+            "HdfsWriter encountered error writing. Source type: {}. # 
messages: {}. Output path: {}.",
+            sourceType,
+            messages.size(),
+            path,
+            e
+        );
+        response.addAllErrors(e, tuples);
       }
-    } catch (Exception e) {
-      response.addAllErrors(e, tuples);
     }
 
     response.addAllSuccesses(tuples);
@@ -123,6 +141,7 @@ public class HdfsWriter implements 
BulkMessageWriter<JSONObject>, Serializable {
   public String getHdfsPathExtension(String sourceType, String 
stellarFunction, JSONObject message) {
     // If no function is provided, just use the sourceType directly
     if(stellarFunction == null || stellarFunction.trim().isEmpty()) {
+      LOG.debug("No HDFS path extension provided; using source type {} 
directly", sourceType);
       return sourceType;
     }
 
@@ -130,7 +149,9 @@ public class HdfsWriter implements 
BulkMessageWriter<JSONObject>, Serializable {
     VariableResolver resolver = new MapVariableResolver(message);
     Object objResult = stellarProcessor.parse(stellarFunction, resolver, 
StellarFunctions.FUNCTION_RESOLVER(), Context.EMPTY_CONTEXT());
     if(objResult != null && !(objResult instanceof String)) {
-      throw new IllegalArgumentException("Stellar Function <" + 
stellarFunction + "> did not return a String value. Returned: " + objResult);
+      String errorMsg = "Stellar Function <" + stellarFunction + "> did not 
return a String value. Returned: " + objResult;
+      LOG.error(errorMsg);
+      throw new IllegalArgumentException(errorMsg);
     }
     return objResult == null ? "" : (String)objResult;
   }
@@ -143,6 +164,7 @@ public class HdfsWriter implements 
BulkMessageWriter<JSONObject>, Serializable {
   @Override
   public void close() {
     for(SourceHandler handler : sourceHandlerMap.values()) {
+      LOG.debug("Closing SourceHandler {}", handler.toString());
       handler.close();
     }
     // Everything is closed, so just clear it
@@ -154,13 +176,17 @@ public class HdfsWriter implements 
BulkMessageWriter<JSONObject>, Serializable {
     SourceHandler ret = sourceHandlerMap.get(key);
     if(ret == null) {
       if(sourceHandlerMap.size() >= maxOpenFiles) {
-        throw new IllegalStateException("Too many HDFS files open!");
+        String errorMsg = "Too many HDFS files open! Maximum number of open 
files is: " + maxOpenFiles +
+            ". Current number of open files is: " + sourceHandlerMap.size();
+        LOG.error(errorMsg);
+        throw new IllegalStateException(errorMsg);
       }
       ret = new SourceHandler(rotationActions,
                               rotationPolicy,
                               syncPolicyCreator.create(sourceType, config),
                               new 
PathExtensionFileNameFormat(key.getStellarResult(), fileNameFormat),
                               new SourceHandlerCallback(sourceHandlerMap, 
key));
+      LOG.debug("Placing key in sourceHandlerMap: {}", key);
       sourceHandlerMap.put(key, ret);
     }
     return ret;
diff --git 
a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceHandler.java
 
b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceHandler.java
index b841249..d94b7cf 100644
--- 
a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceHandler.java
+++ 
b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceHandler.java
@@ -49,7 +49,6 @@ public class SourceHandler {
   FileNameFormat fileNameFormat;
   SourceHandlerCallback cleanupCallback;
   private long offset = 0;
-  private int rotation = 0;
   private transient FSDataOutputStream out;
   private transient final Object writeLock = new Object();
   protected transient Timer rotationTimer; // only used for TimedRotationPolicy
@@ -89,6 +88,7 @@ public class SourceHandler {
       this.offset += bytes.length;
 
       if (this.syncPolicy.mark(null, this.offset)) {
+        LOG.debug("Calling hsync per Sync Policy");
         if (this.out instanceof HdfsDataOutputStream) {
           ((HdfsDataOutputStream) this.out)
               .hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));
@@ -97,11 +97,13 @@ public class SourceHandler {
         }
         //recreate the sync policy for the next batch just in case something 
changed in the config
         //and the sync policy depends on the config.
+        LOG.debug("Recreating sync policy");
         this.syncPolicy = syncPolicyCreator.create(sensor, config);
       }
     }
 
     if (this.rotationPolicy.mark(null, this.offset)) {
+      LOG.debug("Rotating due to rotationPolicy");
       rotateOutputFile(); // synchronized
       this.offset = 0;
       this.rotationPolicy.reset();
@@ -109,8 +111,10 @@ public class SourceHandler {
   }
 
   private void initialize() throws IOException {
+    LOG.debug("Initializing Source Handler");
     this.fs = FileSystem.get(new Configuration());
     this.currentFile = createOutputFile();
+    LOG.debug("Source Handler initialized with starting file: {}", 
currentFile);
     if(this.rotationPolicy instanceof TimedRotationPolicy){
       long interval = ((TimedRotationPolicy)this.rotationPolicy).getInterval();
       this.rotationTimer = new Timer(true);
@@ -118,6 +122,7 @@ public class SourceHandler {
         @Override
         public void run() {
           try {
+            LOG.debug("Rotating output file from TimerTask");
             rotateOutputFile();
           } catch(IOException e){
             LOG.warn("IOException during scheduled file rotation.", e);
@@ -128,28 +133,30 @@ public class SourceHandler {
     }
   }
 
+  // Closes the output file, but ensures any RotationActions are performed.
   protected void rotateOutputFile() throws IOException {
-    LOG.info("Rotating output file...");
+    LOG.debug("Rotating output file...");
     long start = System.currentTimeMillis();
     synchronized (this.writeLock) {
       closeOutputFile();
       // Want to use the callback to make sure we have an accurate count of 
open files.
       cleanupCallback();
-      this.rotation++;
 
-      Path newFile = createOutputFile();
-      LOG.info("Performing {} file rotation actions.", 
this.rotationActions.size());
+      LOG.debug("Performing {} file rotation actions.", 
this.rotationActions.size());
       for (RotationAction action : this.rotationActions) {
         action.execute(this.fs, this.currentFile);
       }
-      this.currentFile = newFile;
     }
     long time = System.currentTimeMillis() - start;
     LOG.info("File rotation took {} ms", time);
   }
 
   private Path createOutputFile() throws IOException {
-    Path path = new Path(this.fileNameFormat.getPath(), 
this.fileNameFormat.getName(this.rotation, System.currentTimeMillis()));
+    // The rotation is set to 0. With the way open files are tracked and 
managed with the callback, there will
+    // never be data that would go into a rotation > 0. Instead a new 
SourceHandler, and by extension file, will
+    // be created.
+    Path path = new Path(this.fileNameFormat.getPath(), 
this.fileNameFormat.getName(0, System.currentTimeMillis()));
+    LOG.debug("Creating new output file: {}", path.getName());
     if(fs.getScheme().equals("file")) {
       //in the situation where we're running this in a local filesystem, 
flushing doesn't work.
       fs.mkdirs(path.getParent());
@@ -172,6 +179,9 @@ public class SourceHandler {
   public void close() {
     try {
       closeOutputFile();
+      if(rotationTimer != null) {
+        rotationTimer.cancel();
+      }
       // Don't call cleanup, to avoid HashMap's 
ConcurrentModificationException while iterating
     } catch (IOException e) {
       throw new RuntimeException("Unable to close output file.", e);
@@ -186,7 +196,6 @@ public class SourceHandler {
             ", syncPolicy=" + syncPolicy +
             ", fileNameFormat=" + fileNameFormat +
             ", offset=" + offset +
-            ", rotation=" + rotation +
             ", out=" + out +
             ", writeLock=" + writeLock +
             ", rotationTimer=" + rotationTimer +
diff --git 
a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceHandlerCallback.java
 
b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceHandlerCallback.java
index bfd1daf..89089f9 100644
--- 
a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceHandlerCallback.java
+++ 
b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceHandlerCallback.java
@@ -18,9 +18,18 @@
 
 package org.apache.metron.writer.hdfs;
 
+import java.lang.invoke.MethodHandles;
 import java.util.Map;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+/**
+ * Callback intended to be able to manage open files in {@link HdfsWriter}. 
This callback will close
+ * the associated {@link SourceHandler} and remove it from the map of open 
files.
+ */
 public class SourceHandlerCallback {
+  private static final Logger LOG = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
   Map<SourceHandlerKey, SourceHandler> sourceHandlerMap;
   SourceHandlerKey key;
   SourceHandlerCallback(Map<SourceHandlerKey, SourceHandler> sourceHandlerMap, 
SourceHandlerKey key) {
@@ -28,8 +37,20 @@ public class SourceHandlerCallback {
     this.key = key;
   }
 
+  /**
+   * Removes {@link SourceHandler} from the map of open files. Also closes it 
to ensure resources such as
+   * {@link java.util.Timer} is closed.
+   */
   public void removeKey() {
-    sourceHandlerMap.remove(key);
+    SourceHandler removed = sourceHandlerMap.remove(key);
+    if(removed != null) {
+      removed.close();
+    }
+    LOG.debug("Removed {} -> {}. Current state of sourceHandlerMap: {}",
+        key,
+        removed,
+        sourceHandlerMap
+    );
   }
 }
 
diff --git 
a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceHandlerKey.java
 
b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceHandlerKey.java
index 6bf0917..ce5f33a 100644
--- 
a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceHandlerKey.java
+++ 
b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceHandlerKey.java
@@ -58,5 +58,13 @@ class SourceHandlerKey {
     result = 31 * result + (stellarResult != null ? stellarResult.hashCode() : 
0);
     return result;
   }
+
+  @Override
+  public String toString() {
+    return "SourceHandlerKey{" +
+        "sourceType='" + sourceType + '\'' +
+        ", stellarResult='" + stellarResult + '\'' +
+        '}';
+  }
 }
 
diff --git 
a/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/hdfs/HdfsWriterTest.java
 
b/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/hdfs/HdfsWriterTest.java
index 832f8bf..09ecafc 100644
--- 
a/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/hdfs/HdfsWriterTest.java
+++ 
b/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/hdfs/HdfsWriterTest.java
@@ -453,44 +453,6 @@ public class HdfsWriterTest {
     }
   }
 
-  @Test
-  @SuppressWarnings("unchecked")
-  public void testHandleAttemptsRotateIfStreamClosed() throws Exception {
-    String function = "FORMAT('test-%s/%s', test.key, test.key)";
-    WriterConfiguration config = buildWriterConfiguration(function);
-    HdfsWriter writer = new HdfsWriter().withFileNameFormat(testFormat);
-    writer.init(new HashMap<String, String>(), createTopologyContext(), 
config);
-
-    JSONObject message = new JSONObject();
-    message.put("test.key", "test.value");
-    ArrayList<JSONObject> messages = new ArrayList<>();
-    messages.add(message);
-    ArrayList<Tuple> tuples = new ArrayList<>();
-
-    CountSyncPolicy basePolicy = new CountSyncPolicy(5);
-    ClonedSyncPolicyCreator creator = new ClonedSyncPolicyCreator(basePolicy);
-
-    writer.write(SENSOR_NAME, config, tuples, messages);
-    writer.getSourceHandler(SENSOR_NAME, "test-test.value/test.value", 
config).closeOutputFile();
-    writer.getSourceHandler(SENSOR_NAME, "test-test.value/test.value", 
config).handle(message, SENSOR_NAME, config, creator);
-    writer.close();
-
-    File outputFolder = new File(folder.getAbsolutePath() + 
"/test-test.value/test.value/");
-
-    // The message should show up twice, once in each file
-    ArrayList<String> expected = new ArrayList<>();
-    expected.add(message.toJSONString());
-
-    // Assert this went into a new file because it actually rotated
-    Assert.assertEquals(2, outputFolder.listFiles().length);
-    for (File file : outputFolder.listFiles()) {
-      List<String> lines = Files.readAllLines(file.toPath());
-      // One line per file
-      Assert.assertEquals(1, lines.size());
-      Assert.assertEquals(expected, lines);
-    }
-  }
-
   protected WriterConfiguration buildWriterConfiguration(String function) {
     IndexingConfigurations indexingConfig = new IndexingConfigurations();
     Map<String, Object> sensorIndexingConfig = new HashMap<>();
diff --git 
a/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/hdfs/SourceHandlerTest.java
 
b/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/hdfs/SourceHandlerTest.java
new file mode 100644
index 0000000..b4f3d46
--- /dev/null
+++ 
b/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/hdfs/SourceHandlerTest.java
@@ -0,0 +1,86 @@
+package org.apache.metron.writer.hdfs;/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat;
+import org.apache.storm.hdfs.bolt.format.FileNameFormat;
+import org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy;
+import org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy.Units;
+import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy;
+import org.apache.storm.hdfs.common.rotation.RotationAction;
+import org.json.simple.JSONObject;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import static org.mockito.Mockito.*;
+
+public class SourceHandlerTest {
+  @Rule
+  public TemporaryFolder tempFolder = new TemporaryFolder();
+
+  private static final String SENSOR_NAME = "sensor";
+  private static final String WRITER_NAME = "writerName";
+
+  private File folder;
+  private FileNameFormat testFormat;
+
+  RotationAction rotAction1 = mock(RotationAction.class);
+  RotationAction rotAction2 = mock(RotationAction.class);
+  List<RotationAction> rotActions;
+
+  SourceHandlerCallback callback = mock(SourceHandlerCallback.class);
+
+  @Before
+  public void setup() throws IOException {
+    // Ensure each test has a unique folder to work with.
+    folder = tempFolder.newFolder();
+    testFormat = new DefaultFileNameFormat()
+        .withPath(folder.toString())
+        .withExtension(".json")
+        .withPrefix("prefix-");
+
+    rotActions = new ArrayList<>();
+    rotActions.add(rotAction1);
+    rotActions.add(rotAction2);
+  }
+
+  @Test
+  public void testRotateOutputFile() throws IOException {
+    SourceHandler handler = new SourceHandler(
+        rotActions,
+        new FileSizeRotationPolicy(10000, Units.MB), // Don't actually care 
about the rotation
+        new CountSyncPolicy(1),
+        testFormat,
+        callback
+    );
+
+    handler.rotateOutputFile();
+
+    // Function should ensure rotation actions and callback are called.
+    verify(rotAction1).execute(any(), any());
+    verify(rotAction2).execute(any(), any());
+    verify(callback).removeKey();
+  }
+}

Reply via email to