For the moment I give up with streaming...too many missing/unclear features wrt batch. For example:
- checkpointing: it's not clear which checkpointing system to use and how to tune/monitor it and avoid OOM exceptions. Moreover is it really necessary to use it? For example if I read a file from HDFS and I don't have a checkpoint it could be ok to re-run the job on all the data in case of errors (i.e. the stream is managed like a batch) - cleanup: BucketingSink doesn't always move to final state - missing output formats: parquet support to write generic Rows not very well supported (at least out of the box) [1] - progress monitoring: for example in the ES connector there's no way (apart from using accumulators) to monitor the progress of the indexing [1] https://stackoverflow.com/questions/41144659/flink-avro-parquet-writer-in-rollingsink Maybe I'm wrong with those points but the attempt to replace my current batch system with a streaming one had no luck with those points. Best, Flavio On Fri, Sep 8, 2017 at 5:29 PM, Aljoscha Krettek <aljos...@apache.org> wrote: > Hi, > > Expanding a bit on Kostas' answer. Yes, your analysis is correct, the > problem is that the job is shutting down before a last checkpoint can > "confirm" the written bucket data by moving it to the final state. The > problem, as Kostas noted is that a user function (and thus also > BucketingSink) does not know whether close() is being called because of a > failure or because normal job shutdown. Therefore, we cannot move data to > the final stage there. > > Once we have the issue that Kostas posted resolve we can also resolve this > problem for the BucketingSink. > > Best, > Aljoscha > > On 8. Sep 2017, at 16:48, Kostas Kloudas <k.klou...@data-artisans.com> > wrote: > > Hi Flavio, > > If I understand correctly, I think you bumped into this issue: > https://issues.apache.org/jira/browse/FLINK-2646 > > There is also a similar discussion on the BucketingSink here: > http://apache-flink-mailing-list-archive.1008284.n3. > nabble.com/DISCUSS-Adding-a-dispose-method-in-the- > RichFunction-td14466.html#a14468 > > Kostas > > On Sep 8, 2017, at 4:27 PM, Flavio Pompermaier <pomperma...@okkam.it> > wrote: > > Hi to all, > I'm trying to test a streaming job but the files written by > the BucketingSink are never finalized (remains into the pending state). > Is this caused by the fact that the job finishes before the checkpoint? > Shouldn't the sink properly close anyway? > > This is my code: > > @Test > public void testBucketingSink() throws Exception { > final StreamExecutionEnvironment senv = StreamExecutionEnvironment. > getExecutionEnvironment(); > final StreamTableEnvironment tEnv = TableEnvironment. > getTableEnvironment(senv); > senv.enableCheckpointing(5000); > DataStream<String> testStream = senv.fromElements(// > "1,aaa,white", // > "2,bbb,gray", // > "3,ccc,white", // > "4,bbb,gray", // > "5,bbb,gray" // > ); > final RowTypeInfo rtf = new RowTypeInfo( > BasicTypeInfo.STRING_TYPE_INFO, > BasicTypeInfo.STRING_TYPE_INFO, > BasicTypeInfo.STRING_TYPE_INFO); > DataStream<Row> rows = testStream.map(new MapFunction<String, Row>() { > > private static final long serialVersionUID = 1L; > > @Override > public Row map(String str) throws Exception { > String[] split = str.split(Pattern.quote(",")); > Row ret = new Row(3); > ret.setField(0, split[0]); > ret.setField(1, split[1]); > ret.setField(2, split[2]); > return ret; > } > }).returns(rtf); > > String columnNames = "id,value,state"; > final String dsName = "test"; > tEnv.registerDataStream(dsName, rows, columnNames); > final String whiteAreaFilter = "state = 'white'"; > DataStream<Row> grayArea = rows; > DataStream<Row> whiteArea = null; > if (whiteAreaFilter != null) { > String sql = "SELECT *, (%s) as _WHITE FROM %s"; > sql = String.format(sql, whiteAreaFilter, dsName); > Table table = tEnv.sql(sql); > grayArea = tEnv.toDataStream(table.where( > "!_WHITE").select(columnNames), rtf); > DataStream<Row> nw = > tEnv.toDataStream(table.where("_WHITE").select(columnNames), > rtf); > whiteArea = whiteArea == null ? nw : whiteArea.union(nw); > } > Writer<Row> bucketSinkwriter = new RowCsvWriter("UTF-8", "\t", "\n"); > > String datasetWhiteDir = "/tmp/bucket/white"; > BucketingSink<Row> whiteAreaSink = new BucketingSink<>( > datasetWhiteDir.toString()); > whiteAreaSink.setWriter(bucketSinkwriter); > whiteAreaSink.setBatchSize(10); > whiteArea.addSink(whiteAreaSink); > > String datasetGrayDir = "/tmp/bucket/gray"; > BucketingSink<Row> grayAreaSink = new BucketingSink<>( > datasetGrayDir.toString()); > grayAreaSink.setWriter(bucketSinkwriter); > grayAreaSink.setBatchSize(10); > grayArea.addSink(grayAreaSink); > > JobExecutionResult jobInfo = senv.execute("Buketing sink test "); > System.out.printf("Job took %s minutes", jobInfo.getNetRuntime( > TimeUnit.MINUTES)); > } > > > > > > > > public class RowCsvWriter extends StreamWriterBase<Row> { > private static final long serialVersionUID = 1L; > > private final String charsetName; > private transient Charset charset; > private String fieldDelimiter; > private String recordDelimiter; > private boolean allowNullValues = true; > private boolean quoteStrings = false; > > /** > * Creates a new {@code StringWriter} that uses {@code "UTF-8"} charset > to convert strings to > * bytes. > */ > public RowCsvWriter() { > this("UTF-8", CsvOutputFormat.DEFAULT_FIELD_DELIMITER, > CsvOutputFormat.DEFAULT_LINE_DELIMITER); > } > > /** > * Creates a new {@code StringWriter} that uses the given charset to > convert strings to bytes. > * > * @param charsetName Name of the charset to be used, must be valid > input for > * {@code Charset.forName(charsetName)} > */ > public RowCsvWriter(String charsetName, String fieldDelimiter, String > recordDelimiter) { > this.charsetName = charsetName; > this.fieldDelimiter = fieldDelimiter; > this.recordDelimiter = recordDelimiter; > } > > @Override > public void open(FileSystem fs, Path path) throws IOException { > super.open(fs, path); > > try { > this.charset = Charset.forName(charsetName); > } catch (IllegalCharsetNameException ex) { > throw new IOException("The charset " + charsetName + " is not > valid.", ex); > } catch (UnsupportedCharsetException ex) { > throw new IOException("The charset " + charsetName + " is not > supported.", ex); > } > } > > @Override > public void write(Row element) throws IOException { > FSDataOutputStream outputStream = getStream(); > writeRow(element, outputStream); > } > > private void writeRow(Row element, FSDataOutputStream out) throws > IOException { > int numFields = element.getArity(); > > for (int i = 0; i < numFields; i++) { > Object obj = element.getField(i); > if (obj != null) { > if (i != 0) { > out.write(this.fieldDelimiter.getBytes(charset)); > } > > if (quoteStrings) { > if (obj instanceof String || obj instanceof StringValue) { > out.write('"'); > out.write(obj.toString().getBytes(charset)); > out.write('"'); > } else { > out.write(obj.toString().getBytes(charset)); > } > } else { > out.write(obj.toString().getBytes(charset)); > } > } else { > if (this.allowNullValues) { > if (i != 0) { > out.write(this.fieldDelimiter.getBytes(charset)); > } > } else { > throw new RuntimeException("Cannot write tuple with <null> value > at position: " + i); > } > } > } > > // add the record delimiter > out.write(this.recordDelimiter.getBytes(charset)); > } > > @Override > public Writer<Row> duplicate() { > return new RowCsvWriter(charsetName, fieldDelimiter, recordDelimiter); > } > } > > > > Any help is appreciated, > Flavio > > > >