For the moment I give up with streaming...too many missing/unclear features
wrt batch.
For example:

   - checkpointing: it's not clear which checkpointing system to use and
   how to tune/monitor it and avoid OOM exceptions. Moreover is it really
   necessary to use it? For example if I read a file from HDFS and I don't
   have a checkpoint it could be ok to re-run the job on all the data in case
   of errors (i.e. the stream is managed like a batch)
   - cleanup: BucketingSink doesn't always move to final state
   - missing output formats: parquet support to write generic Rows not very
   well supported (at least out of the box) [1]
   - progress monitoring: for example in the ES connector there's no way
   (apart from using accumulators) to monitor the progress of the indexing

[1]
https://stackoverflow.com/questions/41144659/flink-avro-parquet-writer-in-rollingsink

Maybe I'm wrong with those points but the attempt to replace my current
batch system with a streaming one had no luck with those points.

Best,
Flavio

On Fri, Sep 8, 2017 at 5:29 PM, Aljoscha Krettek <aljos...@apache.org>
wrote:

> Hi,
>
> Expanding a bit on Kostas' answer. Yes, your analysis is correct, the
> problem is that the job is shutting down before a last checkpoint can
> "confirm" the written bucket data by moving it to the final state. The
> problem, as Kostas noted is that a user function (and thus also
> BucketingSink) does not know whether close() is being called because of a
> failure or because normal job shutdown. Therefore, we cannot move data to
> the final stage there.
>
> Once we have the issue that Kostas posted resolve we can also resolve this
> problem for the BucketingSink.
>
> Best,
> Aljoscha
>
> On 8. Sep 2017, at 16:48, Kostas Kloudas <k.klou...@data-artisans.com>
> wrote:
>
> Hi Flavio,
>
> If I understand correctly, I think you bumped into this issue:
> https://issues.apache.org/jira/browse/FLINK-2646
>
> There is also a similar discussion on the BucketingSink here:
> http://apache-flink-mailing-list-archive.1008284.n3.
> nabble.com/DISCUSS-Adding-a-dispose-method-in-the-
> RichFunction-td14466.html#a14468
>
> Kostas
>
> On Sep 8, 2017, at 4:27 PM, Flavio Pompermaier <pomperma...@okkam.it>
> wrote:
>
> Hi to all,
> I'm trying to test a streaming job but the files written by
> the BucketingSink are never finalized (remains into the pending state).
> Is this caused by the fact that the job finishes before the checkpoint?
> Shouldn't the sink properly close anyway?
>
> This is my code:
>
>   @Test
>   public void testBucketingSink() throws Exception {
>     final StreamExecutionEnvironment senv = StreamExecutionEnvironment.
> getExecutionEnvironment();
>     final StreamTableEnvironment tEnv = TableEnvironment.
> getTableEnvironment(senv);
>     senv.enableCheckpointing(5000);
>     DataStream<String> testStream = senv.fromElements(//
>         "1,aaa,white", //
>         "2,bbb,gray", //
>         "3,ccc,white", //
>         "4,bbb,gray", //
>         "5,bbb,gray" //
>     );
>     final RowTypeInfo rtf = new RowTypeInfo(
>         BasicTypeInfo.STRING_TYPE_INFO,
>         BasicTypeInfo.STRING_TYPE_INFO,
>         BasicTypeInfo.STRING_TYPE_INFO);
>     DataStream<Row> rows = testStream.map(new MapFunction<String, Row>() {
>
>       private static final long serialVersionUID = 1L;
>
>       @Override
>       public Row map(String str) throws Exception {
>         String[] split = str.split(Pattern.quote(","));
>         Row ret = new Row(3);
>         ret.setField(0, split[0]);
>         ret.setField(1, split[1]);
>         ret.setField(2, split[2]);
>         return ret;
>       }
>     }).returns(rtf);
>
>     String columnNames = "id,value,state";
>     final String dsName = "test";
>     tEnv.registerDataStream(dsName, rows, columnNames);
>     final String whiteAreaFilter = "state = 'white'";
>     DataStream<Row> grayArea = rows;
>     DataStream<Row> whiteArea = null;
>     if (whiteAreaFilter != null) {
>       String sql = "SELECT *, (%s) as _WHITE FROM %s";
>       sql = String.format(sql, whiteAreaFilter, dsName);
>       Table table = tEnv.sql(sql);
>       grayArea = tEnv.toDataStream(table.where(
> "!_WHITE").select(columnNames), rtf);
>       DataStream<Row> nw = 
> tEnv.toDataStream(table.where("_WHITE").select(columnNames),
> rtf);
>       whiteArea = whiteArea == null ? nw : whiteArea.union(nw);
>     }
>     Writer<Row> bucketSinkwriter = new RowCsvWriter("UTF-8", "\t", "\n");
>
>     String datasetWhiteDir = "/tmp/bucket/white";
>     BucketingSink<Row> whiteAreaSink = new BucketingSink<>(
> datasetWhiteDir.toString());
>     whiteAreaSink.setWriter(bucketSinkwriter);
>     whiteAreaSink.setBatchSize(10);
>     whiteArea.addSink(whiteAreaSink);
>
>     String datasetGrayDir = "/tmp/bucket/gray";
>     BucketingSink<Row> grayAreaSink = new BucketingSink<>(
> datasetGrayDir.toString());
>     grayAreaSink.setWriter(bucketSinkwriter);
>     grayAreaSink.setBatchSize(10);
>     grayArea.addSink(grayAreaSink);
>
>     JobExecutionResult jobInfo = senv.execute("Buketing sink test ");
>     System.out.printf("Job took %s minutes", jobInfo.getNetRuntime(
> TimeUnit.MINUTES));
>   }
>
>
>
>
>
>
>
> public class RowCsvWriter extends StreamWriterBase<Row> {
>   private static final long serialVersionUID = 1L;
>
>   private final String charsetName;
>   private transient Charset charset;
>   private String fieldDelimiter;
>   private String recordDelimiter;
>   private boolean allowNullValues = true;
>   private boolean quoteStrings = false;
>
>   /**
>    * Creates a new {@code StringWriter} that uses {@code "UTF-8"} charset
> to convert strings to
>    * bytes.
>    */
>   public RowCsvWriter() {
>     this("UTF-8", CsvOutputFormat.DEFAULT_FIELD_DELIMITER,
> CsvOutputFormat.DEFAULT_LINE_DELIMITER);
>   }
>
>   /**
>    * Creates a new {@code StringWriter} that uses the given charset to
> convert strings to bytes.
>    *
>    * @param charsetName Name of the charset to be used, must be valid
> input for
>    *        {@code Charset.forName(charsetName)}
>    */
>   public RowCsvWriter(String charsetName, String fieldDelimiter, String
> recordDelimiter) {
>     this.charsetName = charsetName;
>     this.fieldDelimiter = fieldDelimiter;
>     this.recordDelimiter = recordDelimiter;
>   }
>
>   @Override
>   public void open(FileSystem fs, Path path) throws IOException {
>     super.open(fs, path);
>
>     try {
>       this.charset = Charset.forName(charsetName);
>     } catch (IllegalCharsetNameException ex) {
>       throw new IOException("The charset " + charsetName + " is not
> valid.", ex);
>     } catch (UnsupportedCharsetException ex) {
>       throw new IOException("The charset " + charsetName + " is not
> supported.", ex);
>     }
>   }
>
>   @Override
>   public void write(Row element) throws IOException {
>     FSDataOutputStream outputStream = getStream();
>     writeRow(element, outputStream);
>   }
>
>   private void writeRow(Row element, FSDataOutputStream out) throws
> IOException {
>     int numFields = element.getArity();
>
>     for (int i = 0; i < numFields; i++) {
>       Object obj = element.getField(i);
>       if (obj != null) {
>         if (i != 0) {
>           out.write(this.fieldDelimiter.getBytes(charset));
>         }
>
>         if (quoteStrings) {
>           if (obj instanceof String || obj instanceof StringValue) {
>             out.write('"');
>             out.write(obj.toString().getBytes(charset));
>             out.write('"');
>           } else {
>             out.write(obj.toString().getBytes(charset));
>           }
>         } else {
>           out.write(obj.toString().getBytes(charset));
>         }
>       } else {
>         if (this.allowNullValues) {
>           if (i != 0) {
>             out.write(this.fieldDelimiter.getBytes(charset));
>           }
>         } else {
>           throw new RuntimeException("Cannot write tuple with <null> value
> at position: " + i);
>         }
>       }
>     }
>
>     // add the record delimiter
>     out.write(this.recordDelimiter.getBytes(charset));
>   }
>
>   @Override
>   public Writer<Row> duplicate() {
>     return new RowCsvWriter(charsetName, fieldDelimiter, recordDelimiter);
>   }
> }
>
>
>
> Any help is appreciated,
> Flavio
>
>
>
>

Reply via email to