I think you have to call 'addRow' to the writer: https://hive.apache.org/javadocs/r0.12.0/api/org/apache/hadoop/hive/ql/io/orc/Writer.html
That's just based on the javadoc, i don't have any experience doing this. On Tue, Apr 7, 2015 at 8:43 AM, Grant Overby (groverby) <grove...@cisco.com> wrote: > I have a Storm Trident Bolt for writing ORC File. The files are > created; however, they are always zero length. This code eventually causes > an OOME. I suspect I am missing some sort of flushing action, but don’t see > anything like that in the api. > > My bolt follows. Any thoughts as to what I’m doing wrong or links to > reference uses of org.apache.hadoop.hive.ql.io.orc.Writer ? > > package com.cisco.tinderbox.burner.trident.functions; > > import storm.trident.operation.BaseFunction; > import storm.trident.operation.TridentCollector; > import storm.trident.tuple.TridentTuple; > > import com.cisco.tinderbox.burner.io.system.CurrentUnixTime; > import com.cisco.tinderbox.burner.trident.Topology; > import com.cisco.tinderbox.model.ConnectionEvent; > import com.google.common.base.Throwables; > > import java.io.IOException; > import java.util.List; > import java.util.UUID; > > import org.apache.hadoop.conf.Configuration; > import org.apache.hadoop.fs.FileSystem; > import org.apache.hadoop.fs.Path; > import org.apache.hadoop.fs.RawLocalFileSystem; > import org.apache.hadoop.hive.ql.io.orc.OrcFile; > import org.apache.hadoop.hive.ql.io.orc.Writer; > import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; > import org.apache.hive.hcatalog.streaming.FlatTableColumn; > import org.apache.hive.hcatalog.streaming.FlatTableObjectInspector; > import org.slf4j.Logger; > import org.slf4j.LoggerFactory; > > import static org.apache.hadoop.hive.ql.io.orc.CompressionKind.*; > > public class OrcSink extends BaseFunction { > private static final Logger logger = > LoggerFactory.getLogger(OrcSink.class); > private static final CurrentUnixTime currentUnixTime = > CurrentUnixTime.getInstance(); > private static final long serialVersionUID = 7435558912956446385L; > private final String dbName; > private final String tableName; > private final List<FlatTableColumn<?>> fields; > private final String hdfsUrl; > private transient volatile int partition; > private transient volatile Writer writer; > private transient volatile Path path; > > public OrcSink(String hdfsUrl, String dbName, String tableName, > List<FlatTableColumn<?>> fields) { > this.hdfsUrl = hdfsUrl; > this.dbName = dbName; > this.tableName = tableName; > this.fields = fields; > } > > @Override > public void cleanup() { > closeWriter(); > } > > @Override > public synchronized void execute(TridentTuple tuple, TridentCollector > collector) { > try { > refreshWriterIfNeeded(); > ConnectionEvent connectionEvent = (ConnectionEvent) > tuple.getValueByField(Topology.FIELD_CORRELATED); > writer.addRow(connectionEvent); > } catch (IOException e) { > logger.error("could not write to orc", e); > } > } > > private void closeWriter() { > if (writer != null) { > try { > writer.close(); > } catch (IOException e) { > Throwables.propagate(e); > } finally { > writer = null; > } > } > } > > private void createWriter() { > try { > Configuration fsConf = new Configuration(); > fsConf.set("fs.defaultFS", hdfsUrl); > FileSystem fs = new RawLocalFileSystem(); > //FileSystem.get(fsConf); > String fileName = System.currentTimeMillis() + "-" + > UUID.randomUUID().toString() + ".orc"; > path = new Path("/data/diska/orc/" + dbName + "/" + tableName + > "/" + partition + "/" + fileName); > Configuration writerConf = new Configuration(); > ObjectInspector oi = new FlatTableObjectInspector(dbName + "." + > tableName, fields); > int stripeSize = 250 * 1024 * 1024; > int compressBufferSize = 256 * 1024; > int rowIndexStride = 10000; > writer = OrcFile.createWriter(fs, path, writerConf, oi, > stripeSize, SNAPPY, compressBufferSize, rowIndexStride); > } catch (IOException e) { > throw Throwables.propagate(e); > } > } > > private void refreshWriter() { > partition = currentUnixTime.getQuarterHour(); > closeWriter(); > createWriter(); > } > > private void refreshWriterIfNeeded() { > if (writer == null || partition != currentUnixTime.getQuarterHour()) { > refreshWriter(); > } > } > } > > > > *Grant Overby* > Software Engineer > Cisco.com <http://www.cisco.com/> > grove...@cisco.com > Mobile: *865 724 4910 <865%20724%204910>* > > > > Think before you print. > > This email may contain confidential and privileged material for the sole > use of the intended recipient. Any review, use, distribution or disclosure > by others is strictly prohibited. If you are not the intended recipient (or > authorized to receive for the recipient), please contact the sender by > reply email and delete all copies of this message. > > Please click here > <http://www.cisco.com/web/about/doing_business/legal/cri/index.html> for > Company Registration Information. > > > >