This is an automated email from the ASF dual-hosted git repository. leet pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/metron.git
The following commit(s) were added to refs/heads/master by this push: new 5e9cf70 METRON-2005 Batch Writer writes 0-byte files to HDFS on rotation (justinleet) closes apache/metron#1338 5e9cf70 is described below commit 5e9cf705ef5feb1d723f584c1b6134bcc1eda9cf Author: justinleet <justinjl...@gmail.com> AuthorDate: Tue Feb 19 14:09:02 2019 -0500 METRON-2005 Batch Writer writes 0-byte files to HDFS on rotation (justinleet) closes apache/metron#1338 --- metron-platform/metron-writer/README.md | 3 + metron-platform/metron-writer/pom.xml | 5 ++ .../writer/hdfs/ClonedSyncPolicyCreator.java | 3 + .../org/apache/metron/writer/hdfs/HdfsWriter.java | 74 +++++++++++++------ .../apache/metron/writer/hdfs/SourceHandler.java | 25 +++++-- .../metron/writer/hdfs/SourceHandlerCallback.java | 23 +++++- .../metron/writer/hdfs/SourceHandlerKey.java | 8 ++ .../apache/metron/writer/hdfs/HdfsWriterTest.java | 38 ---------- .../metron/writer/hdfs/SourceHandlerTest.java | 86 ++++++++++++++++++++++ 9 files changed, 194 insertions(+), 71 deletions(-) diff --git a/metron-platform/metron-writer/README.md b/metron-platform/metron-writer/README.md index bbec39b..ed4f053 100644 --- a/metron-platform/metron-writer/README.md +++ b/metron-platform/metron-writer/README.md @@ -65,6 +65,9 @@ To manage the output path, a base path argument is provided by the Flux file, wi This means that all output will land in `/apps/metron/`. With no further adjustment, it will be `/apps/metron/<sensor>/`. However, by modifying the sensor's JSON config, it is possible to provide additional pathing based on the the message itself. +The output format of a file will be `{prefix}{componentId}-{taskId}-{rotationNum}-{timestamp}{extension}`. Notably, because of the way +file rotations are handled by the HdfsWriter, `rotationNum` will always be 0, but RotationActions still get executed normally. + E.g. ``` { diff --git a/metron-platform/metron-writer/pom.xml b/metron-platform/metron-writer/pom.xml index 0002f7a..a11ce6e 100644 --- a/metron-platform/metron-writer/pom.xml +++ b/metron-platform/metron-writer/pom.xml @@ -213,6 +213,11 @@ <version>${project.parent.version}</version> <scope>test</scope> </dependency> + <dependency> + <groupId>org.apache.metron</groupId> + <artifactId>stellar-common</artifactId> + <version>${project.parent.version}</version> + </dependency> </dependencies> <build> diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/ClonedSyncPolicyCreator.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/ClonedSyncPolicyCreator.java index 4d32fc9..1f908a9 100644 --- a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/ClonedSyncPolicyCreator.java +++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/ClonedSyncPolicyCreator.java @@ -37,6 +37,9 @@ public class ClonedSyncPolicyCreator implements SyncPolicyCreator { // SyncPolicy object does not implement Cloneable, so we'll need to clone it via serialization //to get a fresh policy object. Note: this would be expensive if it was in the critical path, // but should be called infrequently (once per sync). + + // Reset the SyncPolicy to ensure that the new count properly resets. + syncPolicy.reset(); byte[] serializedForm = SerDeUtils.toBytes(syncPolicy); return SerDeUtils.fromBytes(serializedForm, SyncPolicy.class); } diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/HdfsWriter.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/HdfsWriter.java index 1ba9a6b..aaa58fa 100644 --- a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/HdfsWriter.java +++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/HdfsWriter.java @@ -17,29 +17,37 @@ */ package org.apache.metron.writer.hdfs; +import java.io.IOException; +import java.io.Serializable; +import java.lang.invoke.MethodHandles; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import org.apache.metron.common.configuration.IndexingConfigurations; +import org.apache.metron.common.configuration.writer.WriterConfiguration; +import org.apache.metron.common.writer.BulkMessageWriter; +import org.apache.metron.common.writer.BulkWriterResponse; +import org.apache.metron.stellar.common.StellarProcessor; import org.apache.metron.stellar.dsl.Context; import org.apache.metron.stellar.dsl.MapVariableResolver; import org.apache.metron.stellar.dsl.StellarFunctions; import org.apache.metron.stellar.dsl.VariableResolver; -import org.apache.metron.stellar.common.StellarProcessor; -import org.apache.storm.task.TopologyContext; -import org.apache.storm.tuple.Tuple; -import org.apache.metron.common.configuration.writer.WriterConfiguration; -import org.apache.metron.common.writer.BulkMessageWriter; -import org.apache.metron.common.writer.BulkWriterResponse; import org.apache.storm.hdfs.bolt.format.FileNameFormat; import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy; import org.apache.storm.hdfs.bolt.rotation.NoRotationPolicy; import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy; import org.apache.storm.hdfs.bolt.sync.SyncPolicy; import org.apache.storm.hdfs.common.rotation.RotationAction; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.tuple.Tuple; import org.json.simple.JSONObject; - -import java.io.*; -import java.util.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class HdfsWriter implements BulkMessageWriter<JSONObject>, Serializable { + private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + List<RotationAction> rotationActions = new ArrayList<>(); FileRotationPolicy rotationPolicy = new NoRotationPolicy(); SyncPolicy syncPolicy; @@ -82,38 +90,48 @@ public class HdfsWriter implements BulkMessageWriter<JSONObject>, Serializable { this.fileNameFormat.prepare(stormConfig,topologyContext); if(syncPolicy != null) { //if the user has specified the sync policy, we don't want to override their wishes. + LOG.debug("Using user specified sync policy {}", syncPolicy.getClass().getSimpleName()); syncPolicyCreator = new ClonedSyncPolicyCreator(syncPolicy); } else { //if the user has not, then we want to have the sync policy depend on the batch size. + LOG.debug("No user specified sync policy, using CountSyncPolicy based on batch size"); syncPolicyCreator = (source, config) -> new CountSyncPolicy(config == null?1:config.getBatchSize(source)); } } - @Override public BulkWriterResponse write(String sourceType , WriterConfiguration configurations , Iterable<Tuple> tuples , List<JSONObject> messages - ) throws Exception - { + ) throws Exception { BulkWriterResponse response = new BulkWriterResponse(); // Currently treating all the messages in a group for pass/failure. - try { - // Messages can all result in different HDFS paths, because of Stellar Expressions, so we'll need to iterate through - for(JSONObject message : messages) { - String path = getHdfsPathExtension( - sourceType, - (String)configurations.getSensorConfig(sourceType).getOrDefault(IndexingConfigurations.OUTPUT_PATH_FUNCTION_CONF, ""), - message - ); + // Messages can all result in different HDFS paths, because of Stellar Expressions, so we'll need to iterate through + for (JSONObject message : messages) { + String path = getHdfsPathExtension( + sourceType, + (String) configurations.getSensorConfig(sourceType) + .getOrDefault(IndexingConfigurations.OUTPUT_PATH_FUNCTION_CONF, ""), + message + ); + + try { + LOG.trace("Writing message {} to path: {}", message.toJSONString(), path); SourceHandler handler = getSourceHandler(sourceType, path, configurations); handler.handle(message, sourceType, configurations, syncPolicyCreator); + } catch (Exception e) { + LOG.error( + "HdfsWriter encountered error writing. Source type: {}. # messages: {}. Output path: {}.", + sourceType, + messages.size(), + path, + e + ); + response.addAllErrors(e, tuples); } - } catch (Exception e) { - response.addAllErrors(e, tuples); } response.addAllSuccesses(tuples); @@ -123,6 +141,7 @@ public class HdfsWriter implements BulkMessageWriter<JSONObject>, Serializable { public String getHdfsPathExtension(String sourceType, String stellarFunction, JSONObject message) { // If no function is provided, just use the sourceType directly if(stellarFunction == null || stellarFunction.trim().isEmpty()) { + LOG.debug("No HDFS path extension provided; using source type {} directly", sourceType); return sourceType; } @@ -130,7 +149,9 @@ public class HdfsWriter implements BulkMessageWriter<JSONObject>, Serializable { VariableResolver resolver = new MapVariableResolver(message); Object objResult = stellarProcessor.parse(stellarFunction, resolver, StellarFunctions.FUNCTION_RESOLVER(), Context.EMPTY_CONTEXT()); if(objResult != null && !(objResult instanceof String)) { - throw new IllegalArgumentException("Stellar Function <" + stellarFunction + "> did not return a String value. Returned: " + objResult); + String errorMsg = "Stellar Function <" + stellarFunction + "> did not return a String value. Returned: " + objResult; + LOG.error(errorMsg); + throw new IllegalArgumentException(errorMsg); } return objResult == null ? "" : (String)objResult; } @@ -143,6 +164,7 @@ public class HdfsWriter implements BulkMessageWriter<JSONObject>, Serializable { @Override public void close() { for(SourceHandler handler : sourceHandlerMap.values()) { + LOG.debug("Closing SourceHandler {}", handler.toString()); handler.close(); } // Everything is closed, so just clear it @@ -154,13 +176,17 @@ public class HdfsWriter implements BulkMessageWriter<JSONObject>, Serializable { SourceHandler ret = sourceHandlerMap.get(key); if(ret == null) { if(sourceHandlerMap.size() >= maxOpenFiles) { - throw new IllegalStateException("Too many HDFS files open!"); + String errorMsg = "Too many HDFS files open! Maximum number of open files is: " + maxOpenFiles + + ". Current number of open files is: " + sourceHandlerMap.size(); + LOG.error(errorMsg); + throw new IllegalStateException(errorMsg); } ret = new SourceHandler(rotationActions, rotationPolicy, syncPolicyCreator.create(sourceType, config), new PathExtensionFileNameFormat(key.getStellarResult(), fileNameFormat), new SourceHandlerCallback(sourceHandlerMap, key)); + LOG.debug("Placing key in sourceHandlerMap: {}", key); sourceHandlerMap.put(key, ret); } return ret; diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceHandler.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceHandler.java index b841249..d94b7cf 100644 --- a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceHandler.java +++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceHandler.java @@ -49,7 +49,6 @@ public class SourceHandler { FileNameFormat fileNameFormat; SourceHandlerCallback cleanupCallback; private long offset = 0; - private int rotation = 0; private transient FSDataOutputStream out; private transient final Object writeLock = new Object(); protected transient Timer rotationTimer; // only used for TimedRotationPolicy @@ -89,6 +88,7 @@ public class SourceHandler { this.offset += bytes.length; if (this.syncPolicy.mark(null, this.offset)) { + LOG.debug("Calling hsync per Sync Policy"); if (this.out instanceof HdfsDataOutputStream) { ((HdfsDataOutputStream) this.out) .hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH)); @@ -97,11 +97,13 @@ public class SourceHandler { } //recreate the sync policy for the next batch just in case something changed in the config //and the sync policy depends on the config. + LOG.debug("Recreating sync policy"); this.syncPolicy = syncPolicyCreator.create(sensor, config); } } if (this.rotationPolicy.mark(null, this.offset)) { + LOG.debug("Rotating due to rotationPolicy"); rotateOutputFile(); // synchronized this.offset = 0; this.rotationPolicy.reset(); @@ -109,8 +111,10 @@ public class SourceHandler { } private void initialize() throws IOException { + LOG.debug("Initializing Source Handler"); this.fs = FileSystem.get(new Configuration()); this.currentFile = createOutputFile(); + LOG.debug("Source Handler initialized with starting file: {}", currentFile); if(this.rotationPolicy instanceof TimedRotationPolicy){ long interval = ((TimedRotationPolicy)this.rotationPolicy).getInterval(); this.rotationTimer = new Timer(true); @@ -118,6 +122,7 @@ public class SourceHandler { @Override public void run() { try { + LOG.debug("Rotating output file from TimerTask"); rotateOutputFile(); } catch(IOException e){ LOG.warn("IOException during scheduled file rotation.", e); @@ -128,28 +133,30 @@ public class SourceHandler { } } + // Closes the output file, but ensures any RotationActions are performed. protected void rotateOutputFile() throws IOException { - LOG.info("Rotating output file..."); + LOG.debug("Rotating output file..."); long start = System.currentTimeMillis(); synchronized (this.writeLock) { closeOutputFile(); // Want to use the callback to make sure we have an accurate count of open files. cleanupCallback(); - this.rotation++; - Path newFile = createOutputFile(); - LOG.info("Performing {} file rotation actions.", this.rotationActions.size()); + LOG.debug("Performing {} file rotation actions.", this.rotationActions.size()); for (RotationAction action : this.rotationActions) { action.execute(this.fs, this.currentFile); } - this.currentFile = newFile; } long time = System.currentTimeMillis() - start; LOG.info("File rotation took {} ms", time); } private Path createOutputFile() throws IOException { - Path path = new Path(this.fileNameFormat.getPath(), this.fileNameFormat.getName(this.rotation, System.currentTimeMillis())); + // The rotation is set to 0. With the way open files are tracked and managed with the callback, there will + // never be data that would go into a rotation > 0. Instead a new SourceHandler, and by extension file, will + // be created. + Path path = new Path(this.fileNameFormat.getPath(), this.fileNameFormat.getName(0, System.currentTimeMillis())); + LOG.debug("Creating new output file: {}", path.getName()); if(fs.getScheme().equals("file")) { //in the situation where we're running this in a local filesystem, flushing doesn't work. fs.mkdirs(path.getParent()); @@ -172,6 +179,9 @@ public class SourceHandler { public void close() { try { closeOutputFile(); + if(rotationTimer != null) { + rotationTimer.cancel(); + } // Don't call cleanup, to avoid HashMap's ConcurrentModificationException while iterating } catch (IOException e) { throw new RuntimeException("Unable to close output file.", e); @@ -186,7 +196,6 @@ public class SourceHandler { ", syncPolicy=" + syncPolicy + ", fileNameFormat=" + fileNameFormat + ", offset=" + offset + - ", rotation=" + rotation + ", out=" + out + ", writeLock=" + writeLock + ", rotationTimer=" + rotationTimer + diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceHandlerCallback.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceHandlerCallback.java index bfd1daf..89089f9 100644 --- a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceHandlerCallback.java +++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceHandlerCallback.java @@ -18,9 +18,18 @@ package org.apache.metron.writer.hdfs; +import java.lang.invoke.MethodHandles; import java.util.Map; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +/** + * Callback intended to be able to manage open files in {@link HdfsWriter}. This callback will close + * the associated {@link SourceHandler} and remove it from the map of open files. + */ public class SourceHandlerCallback { + private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + Map<SourceHandlerKey, SourceHandler> sourceHandlerMap; SourceHandlerKey key; SourceHandlerCallback(Map<SourceHandlerKey, SourceHandler> sourceHandlerMap, SourceHandlerKey key) { @@ -28,8 +37,20 @@ public class SourceHandlerCallback { this.key = key; } + /** + * Removes {@link SourceHandler} from the map of open files. Also closes it to ensure resources such as + * {@link java.util.Timer} is closed. + */ public void removeKey() { - sourceHandlerMap.remove(key); + SourceHandler removed = sourceHandlerMap.remove(key); + if(removed != null) { + removed.close(); + } + LOG.debug("Removed {} -> {}. Current state of sourceHandlerMap: {}", + key, + removed, + sourceHandlerMap + ); } } diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceHandlerKey.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceHandlerKey.java index 6bf0917..ce5f33a 100644 --- a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceHandlerKey.java +++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceHandlerKey.java @@ -58,5 +58,13 @@ class SourceHandlerKey { result = 31 * result + (stellarResult != null ? stellarResult.hashCode() : 0); return result; } + + @Override + public String toString() { + return "SourceHandlerKey{" + + "sourceType='" + sourceType + '\'' + + ", stellarResult='" + stellarResult + '\'' + + '}'; + } } diff --git a/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/hdfs/HdfsWriterTest.java b/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/hdfs/HdfsWriterTest.java index 832f8bf..09ecafc 100644 --- a/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/hdfs/HdfsWriterTest.java +++ b/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/hdfs/HdfsWriterTest.java @@ -453,44 +453,6 @@ public class HdfsWriterTest { } } - @Test - @SuppressWarnings("unchecked") - public void testHandleAttemptsRotateIfStreamClosed() throws Exception { - String function = "FORMAT('test-%s/%s', test.key, test.key)"; - WriterConfiguration config = buildWriterConfiguration(function); - HdfsWriter writer = new HdfsWriter().withFileNameFormat(testFormat); - writer.init(new HashMap<String, String>(), createTopologyContext(), config); - - JSONObject message = new JSONObject(); - message.put("test.key", "test.value"); - ArrayList<JSONObject> messages = new ArrayList<>(); - messages.add(message); - ArrayList<Tuple> tuples = new ArrayList<>(); - - CountSyncPolicy basePolicy = new CountSyncPolicy(5); - ClonedSyncPolicyCreator creator = new ClonedSyncPolicyCreator(basePolicy); - - writer.write(SENSOR_NAME, config, tuples, messages); - writer.getSourceHandler(SENSOR_NAME, "test-test.value/test.value", config).closeOutputFile(); - writer.getSourceHandler(SENSOR_NAME, "test-test.value/test.value", config).handle(message, SENSOR_NAME, config, creator); - writer.close(); - - File outputFolder = new File(folder.getAbsolutePath() + "/test-test.value/test.value/"); - - // The message should show up twice, once in each file - ArrayList<String> expected = new ArrayList<>(); - expected.add(message.toJSONString()); - - // Assert this went into a new file because it actually rotated - Assert.assertEquals(2, outputFolder.listFiles().length); - for (File file : outputFolder.listFiles()) { - List<String> lines = Files.readAllLines(file.toPath()); - // One line per file - Assert.assertEquals(1, lines.size()); - Assert.assertEquals(expected, lines); - } - } - protected WriterConfiguration buildWriterConfiguration(String function) { IndexingConfigurations indexingConfig = new IndexingConfigurations(); Map<String, Object> sensorIndexingConfig = new HashMap<>(); diff --git a/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/hdfs/SourceHandlerTest.java b/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/hdfs/SourceHandlerTest.java new file mode 100644 index 0000000..b4f3d46 --- /dev/null +++ b/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/hdfs/SourceHandlerTest.java @@ -0,0 +1,86 @@ +package org.apache.metron.writer.hdfs;/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat; +import org.apache.storm.hdfs.bolt.format.FileNameFormat; +import org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy; +import org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy.Units; +import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy; +import org.apache.storm.hdfs.common.rotation.RotationAction; +import org.json.simple.JSONObject; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import static org.mockito.Mockito.*; + +public class SourceHandlerTest { + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + private static final String SENSOR_NAME = "sensor"; + private static final String WRITER_NAME = "writerName"; + + private File folder; + private FileNameFormat testFormat; + + RotationAction rotAction1 = mock(RotationAction.class); + RotationAction rotAction2 = mock(RotationAction.class); + List<RotationAction> rotActions; + + SourceHandlerCallback callback = mock(SourceHandlerCallback.class); + + @Before + public void setup() throws IOException { + // Ensure each test has a unique folder to work with. + folder = tempFolder.newFolder(); + testFormat = new DefaultFileNameFormat() + .withPath(folder.toString()) + .withExtension(".json") + .withPrefix("prefix-"); + + rotActions = new ArrayList<>(); + rotActions.add(rotAction1); + rotActions.add(rotAction2); + } + + @Test + public void testRotateOutputFile() throws IOException { + SourceHandler handler = new SourceHandler( + rotActions, + new FileSizeRotationPolicy(10000, Units.MB), // Don't actually care about the rotation + new CountSyncPolicy(1), + testFormat, + callback + ); + + handler.rotateOutputFile(); + + // Function should ensure rotation actions and callback are called. + verify(rotAction1).execute(any(), any()); + verify(rotAction2).execute(any(), any()); + verify(callback).removeKey(); + } +}