Hi,

I think Beam File Writing was recently changed to write one file per bundle. 
Currently, with the Flink Streaming Runner every element is considered to be 
one bundle, i.e. all bundles have size 1. This means that we write one file per 
output element.

@Reuven, could you please confirm? Also, won't this be a problem for other 
Runners that can have very small bundle sizes? IIRC the Dataflow Runner also 
has rather small bundles in streaming mode, I'm not sure, though.

Best,
Aljoscha

> On 9. Aug 2017, at 19:14, Claire Yuan <clairey...@yahoo-inc.com> wrote:
> 
> Hi Aljoscha,
>   I used the same sink as in the example TfIdf, and set the streamingOptions 
> to be True, then I got one record per files. Here is the function to write my 
> output. I called it in my pipeline at main method.
>   public static class WriteOut extends PTransform<PCollection<KV<String, 
> String>>, PDone> {
>     private String output;
>     public WriteOut(String output) {
>       this.output = output;
>     }
>     @Override
>     public PDone expand(PCollection<KV<String, String>> someInfos) {
>       return outputInfos
>           .apply("Format", ParDo.of(new DoFn<KV<String, String>, String>() {
>             @ProcessElement
>             public void processElement(ProcessContext c) {
>               c.output(String.format("%s %s:\t%s",
>                   c.element().getKey().getName(),
>                   c.element().getKey().getNum(),
>                   c.element().getValue()
>                   ));
>             }
>           }))
>           .apply(TextIO.write()
>               .to(output) /* name of the output files */
>               .withSuffix(".csv"));
>     }
>   }
> 
> Claire
> 
> 
> On Wednesday, August 9, 2017 2:21 AM, Aljoscha Krettek <aljos...@apache.org> 
> wrote:
> 
> 
> Hi,
> 
> I think Flink should not create one output file per record. Could you maybe 
> post a snipped or minimal code example that shows how you're setting up your 
> sinks?
> 
> Best,
> Aljoscha
>> On 8. Aug 2017, at 19:08, Claire Yuan <clairey...@yahoo-inc.com 
>> <mailto:clairey...@yahoo-inc.com>> wrote:
>> 
>> Hi all,
>>   I am currently running some jobs coded in Beam in streaming mode on Yarn 
>> session by Flink. My data sink was CSV files like the one in examples of 
>> TfIdf. And I noticed that the output format for Beam is to produce one file 
>> for every record, and also temp files for them. That would result in my 
>> space used exceed maximum. 
>>   I am not sure whether is the problem that I used the API incorrectly but I 
>> am wondering if there any way I can put all those records into one file, or 
>> keep updating in that file, or delete those tempt files by windowing or 
>> triggering?
>> 
>> Claire
> 
> 
> 

Reply via email to