Re: streaming output in just one files

2017-08-10 Thread Aljoscha Krettek
Hi,

I think Beam File Writing was recently changed to write one file per bundle. 
Currently, with the Flink Streaming Runner every element is considered to be 
one bundle, i.e. all bundles have size 1. This means that we write one file per 
output element.

@Reuven, could you please confirm? Also, won't this be a problem for other 
Runners that can have very small bundle sizes? IIRC the Dataflow Runner also 
has rather small bundles in streaming mode, I'm not sure, though.

Best,
Aljoscha

> On 9. Aug 2017, at 19:14, Claire Yuan  wrote:
> 
> Hi Aljoscha,
>   I used the same sink as in the example TfIdf, and set the streamingOptions 
> to be True, then I got one record per files. Here is the function to write my 
> output. I called it in my pipeline at main method.
>   public static class WriteOut extends PTransform String>>, PDone> {
> private String output;
> public WriteOut(String output) {
>   this.output = output;
> }
> @Override
> public PDone expand(PCollection> someInfos) {
>   return outputInfos
>   .apply("Format", ParDo.of(new DoFn, String>() {
> @ProcessElement
> public void processElement(ProcessContext c) {
>   c.output(String.format("%s %s:\t%s",
>   c.element().getKey().getName(),
>   c.element().getKey().getNum(),
>   c.element().getValue()
>   ));
> }
>   }))
>   .apply(TextIO.write()
>   .to(output) /* name of the output files */
>   .withSuffix(".csv"));
> }
>   }
> 
> Claire
> 
> 
> On Wednesday, August 9, 2017 2:21 AM, Aljoscha Krettek  
> wrote:
> 
> 
> Hi,
> 
> I think Flink should not create one output file per record. Could you maybe 
> post a snipped or minimal code example that shows how you're setting up your 
> sinks?
> 
> Best,
> Aljoscha
>> On 8. Aug 2017, at 19:08, Claire Yuan > > wrote:
>> 
>> Hi all,
>>   I am currently running some jobs coded in Beam in streaming mode on Yarn 
>> session by Flink. My data sink was CSV files like the one in examples of 
>> TfIdf. And I noticed that the output format for Beam is to produce one file 
>> for every record, and also temp files for them. That would result in my 
>> space used exceed maximum. 
>>   I am not sure whether is the problem that I used the API incorrectly but I 
>> am wondering if there any way I can put all those records into one file, or 
>> keep updating in that file, or delete those tempt files by windowing or 
>> triggering?
>> 
>> Claire
> 
> 
> 



Re: Spark job hangs up at Evaluating ParMultiDo(ParseInput)

2017-08-10 Thread Aviem Zur
Hi Jayaraman,

Thanks for reaching out.
We run Beam using Spark runner daily on a yarn cluster.

It appears that in many of the logs you sent there is hanging when
connecting to certain servers on certain ports, could this be a network
issue or an issue with your Spark setup?

Could you please share which version of Beam you are running?

On Thu, Aug 3, 2017 at 12:18 PM Sathish Jayaraman 
wrote:

> Hi,
>
> Thanks for trying it out.
>
> I was running the job in local single node setup. I also spawn a
> HDInsights cluster in Azure platform just to test the WordCount program.
> Its the same result there too, stuck at the Evaluating ParMultiDo step. It
> runs fine in mvn compile exec, but when bundled into jar & submitted via
> spark-submit there is no result. If there is no support from Beam to run on
> top of Spark then I have to write Spark native code which is what I am
> doing currently.
>
> Regards,
> Sathish. J
>
>
> On 03-Aug-2017, at 2:34 PM, Jean-Baptiste Onofré  wrote:
>
> nanthrax.net 
>
>
>