Re: how to write dataset in a file?

2015-11-22 Thread jun aoki
Hi Stephan, thank you for further explanation and clarification.
The current behavior now makes sense to me.
The hard part is, that those information is spread along documents.
I have made a pull request to clarify the current behavior in the Javadoc.
https://github.com/apache/flink/pull/1392


On Sun, Nov 22, 2015 at 9:00 AM, Stephan Ewen  wrote:

> You can configure the system to always create a directly (not just on
> parallelism > 1),
>  see "fs.output.always-create-directory"under
>
> https://ci.apache.org/projects/flink/flink-docs-release-0.10/setup/config.html#file-systems
>
> The behavior we support right now is pretty much what people coming from
> the Hadoop world are used to, that's why it behaves the way it does.
>
> Greetings,
> Stephan
>
>
> On Sun, Nov 22, 2015 at 8:49 AM, jun aoki  wrote:
>
> > Thank you guys for helping me understand!
> > Precisely I was able to control the behavior on my research work with
> your
> > help.
> >
> > Does anybody think, however, the behavior is not straightforward? (At
> least
> > there is another guy on StackOverflow who misunderstand the same way I
> did)
> >
> > I'd like to ask the community if they like my suggestions
> > 1. Make the method signatures writeAsText(String directoryPath) and
> > writeAsCsv(String directoryPath) (not filePath but directoryPath) and
> they
> > ALWAYS create a directory instead of sometimes a file and sometimes a
> > directory depending on the sink's parallelism.
> > This creates a directory and a sole "1" file is created even when
> > parallelism is set to 1.
> > This is more consistent and no confusion of what it says it does.
> >
> > 2. And create another methods called writeAsTextFile(String filePath) and
> > writeAsCsvFile(String filePath) which ALWAYS create a file and there is
> no
> > directory. In order to make this happen, either its sink's parallelism is
> > implicitly set to 1, or collect all data from all workers into one
> dataset
> > behind the scene.
> >
> > What do you guys think?
> >
> >
> > On Sat, Nov 21, 2015 at 6:18 AM, Matthias J. Sax 
> wrote:
> >
> > > I would not set
> > >
> > > > ExecutionEnvironment env =
> > > ExecutionEnvironment.createLocalEnvironment().setParallelism(1);
> > >
> > > because this changes the default parallelism of *all* operator to one.
> > > Instead, only set the parallelism of the **sink** to one (as described
> > > here:
> > >
> > >
> >
> https://stackoverflow.com/questions/32580970/writeascsv-and-writeastext-is-unexpected/32581813#32581813
> > > )
> > >
> > > filteredData.writeAsText("file:///output1.txt").setParallelism(1);
> > >
> > > -Matthias
> > >
> > > On 11/21/2015 02:23 PM, Márton Balassi wrote:
> > > > Additionally as having multiple files under /output1.txt is standard
> in
> > > the
> > > > Hadoop ecosystem you can transparently read all the files with
> > > > env.readTextFile("/output1.txt").
> > > >
> > > > You can also set parallelism on individual operators (e.g the file
> > > writer)
> > > > if you really need a single output.
> > > >
> > > > On Fri, Nov 20, 2015, 21:27 Suneel Marthi 
> wrote:
> > > >
> > > >> You can write to a single output file by setting parallelism == 1
> > > >>
> > > >>  So final ExecutionEnvironment env = ExecutionEnvironment.
> > > >> createLocalEnvironment().setParallelism(1);
> > > >>
> > > >> The reason u see multiple output files is because, each worker is
> > > writing
> > > >> to a different file.
> > > >>
> > > >> On Fri, Nov 20, 2015 at 10:06 PM, jun aoki 
> wrote:
> > > >>
> > > >>> Hi Flink community
> > > >>>
> > > >>> I know I'm mistaken but could not find what I want.
> > > >>>
> > > >>> final ExecutionEnvironment env =
> > > >>> ExecutionEnvironment.createLocalEnvironment();
> > > >>> DataSet data = env.readTextFile("file:///text1.txt");
> > > >>> FilterFunction filter = new MyFilterFunction();  // looks
> > for a
> > > >>> line starts with "[ERROR]"
> > > >>> DataSet filteredData = data.filter(filter);
> > > >>> filteredData.writeAsText("file:///output1.txt");
> > > >>> env.execute();
> > > >>>
> > > >>> Then I expect to get a single file /output1.txt , but actually get
> > > >>> /output1.txt/1, /output1.txt/2, /output1.txt/3...
> > > >>> I assumed I was getting a single file because the method signature
> > says
> > > >>> writeAsText(String filePath).  <-- filePath instead of
> directoryPath
> > > >>> Also the Javadoc comment sounds like I assumed right.
> > > >>>
> > > >>>
> > > >>
> > >
> >
> https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java#L1354
> > > >>>
> > > >>> Can anyone tell if the method signature and document should be
> fixed?
> > > or
> > > >> if
> > > >>> I am missing some configuration?
> > > >>>
> > > >>> --
> > > >>> -jun
> > > >>>
> > > >>
> > > >
> > >
> > >
> >
> >
> > --
> > -jun
> >
>



-- 
-jun


Re: how to write dataset in a file?

2015-11-22 Thread Stephan Ewen
You can configure the system to always create a directly (not just on
parallelism > 1),
 see "fs.output.always-create-directory"under
https://ci.apache.org/projects/flink/flink-docs-release-0.10/setup/config.html#file-systems

The behavior we support right now is pretty much what people coming from
the Hadoop world are used to, that's why it behaves the way it does.

Greetings,
Stephan


On Sun, Nov 22, 2015 at 8:49 AM, jun aoki  wrote:

> Thank you guys for helping me understand!
> Precisely I was able to control the behavior on my research work with your
> help.
>
> Does anybody think, however, the behavior is not straightforward? (At least
> there is another guy on StackOverflow who misunderstand the same way I did)
>
> I'd like to ask the community if they like my suggestions
> 1. Make the method signatures writeAsText(String directoryPath) and
> writeAsCsv(String directoryPath) (not filePath but directoryPath) and they
> ALWAYS create a directory instead of sometimes a file and sometimes a
> directory depending on the sink's parallelism.
> This creates a directory and a sole "1" file is created even when
> parallelism is set to 1.
> This is more consistent and no confusion of what it says it does.
>
> 2. And create another methods called writeAsTextFile(String filePath) and
> writeAsCsvFile(String filePath) which ALWAYS create a file and there is no
> directory. In order to make this happen, either its sink's parallelism is
> implicitly set to 1, or collect all data from all workers into one dataset
> behind the scene.
>
> What do you guys think?
>
>
> On Sat, Nov 21, 2015 at 6:18 AM, Matthias J. Sax  wrote:
>
> > I would not set
> >
> > > ExecutionEnvironment env =
> > ExecutionEnvironment.createLocalEnvironment().setParallelism(1);
> >
> > because this changes the default parallelism of *all* operator to one.
> > Instead, only set the parallelism of the **sink** to one (as described
> > here:
> >
> >
> https://stackoverflow.com/questions/32580970/writeascsv-and-writeastext-is-unexpected/32581813#32581813
> > )
> >
> > filteredData.writeAsText("file:///output1.txt").setParallelism(1);
> >
> > -Matthias
> >
> > On 11/21/2015 02:23 PM, Márton Balassi wrote:
> > > Additionally as having multiple files under /output1.txt is standard in
> > the
> > > Hadoop ecosystem you can transparently read all the files with
> > > env.readTextFile("/output1.txt").
> > >
> > > You can also set parallelism on individual operators (e.g the file
> > writer)
> > > if you really need a single output.
> > >
> > > On Fri, Nov 20, 2015, 21:27 Suneel Marthi  wrote:
> > >
> > >> You can write to a single output file by setting parallelism == 1
> > >>
> > >>  So final ExecutionEnvironment env = ExecutionEnvironment.
> > >> createLocalEnvironment().setParallelism(1);
> > >>
> > >> The reason u see multiple output files is because, each worker is
> > writing
> > >> to a different file.
> > >>
> > >> On Fri, Nov 20, 2015 at 10:06 PM, jun aoki  wrote:
> > >>
> > >>> Hi Flink community
> > >>>
> > >>> I know I'm mistaken but could not find what I want.
> > >>>
> > >>> final ExecutionEnvironment env =
> > >>> ExecutionEnvironment.createLocalEnvironment();
> > >>> DataSet data = env.readTextFile("file:///text1.txt");
> > >>> FilterFunction filter = new MyFilterFunction();  // looks
> for a
> > >>> line starts with "[ERROR]"
> > >>> DataSet filteredData = data.filter(filter);
> > >>> filteredData.writeAsText("file:///output1.txt");
> > >>> env.execute();
> > >>>
> > >>> Then I expect to get a single file /output1.txt , but actually get
> > >>> /output1.txt/1, /output1.txt/2, /output1.txt/3...
> > >>> I assumed I was getting a single file because the method signature
> says
> > >>> writeAsText(String filePath).  <-- filePath instead of directoryPath
> > >>> Also the Javadoc comment sounds like I assumed right.
> > >>>
> > >>>
> > >>
> >
> https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java#L1354
> > >>>
> > >>> Can anyone tell if the method signature and document should be fixed?
> > or
> > >> if
> > >>> I am missing some configuration?
> > >>>
> > >>> --
> > >>> -jun
> > >>>
> > >>
> > >
> >
> >
>
>
> --
> -jun
>


Re: how to write dataset in a file?

2015-11-21 Thread Márton Balassi
Additionally as having multiple files under /output1.txt is standard in the
Hadoop ecosystem you can transparently read all the files with
env.readTextFile("/output1.txt").

You can also set parallelism on individual operators (e.g the file writer)
if you really need a single output.

On Fri, Nov 20, 2015, 21:27 Suneel Marthi  wrote:

> You can write to a single output file by setting parallelism == 1
>
>  So final ExecutionEnvironment env = ExecutionEnvironment.
> createLocalEnvironment().setParallelism(1);
>
> The reason u see multiple output files is because, each worker is writing
> to a different file.
>
> On Fri, Nov 20, 2015 at 10:06 PM, jun aoki  wrote:
>
> > Hi Flink community
> >
> > I know I'm mistaken but could not find what I want.
> >
> > final ExecutionEnvironment env =
> > ExecutionEnvironment.createLocalEnvironment();
> > DataSet data = env.readTextFile("file:///text1.txt");
> > FilterFunction filter = new MyFilterFunction();  // looks for a
> > line starts with "[ERROR]"
> > DataSet filteredData = data.filter(filter);
> > filteredData.writeAsText("file:///output1.txt");
> > env.execute();
> >
> > Then I expect to get a single file /output1.txt , but actually get
> > /output1.txt/1, /output1.txt/2, /output1.txt/3...
> > I assumed I was getting a single file because the method signature says
> > writeAsText(String filePath).  <-- filePath instead of directoryPath
> > Also the Javadoc comment sounds like I assumed right.
> >
> >
> https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java#L1354
> >
> > Can anyone tell if the method signature and document should be fixed? or
> if
> > I am missing some configuration?
> >
> > --
> > -jun
> >
>


Re: how to write dataset in a file?

2015-11-21 Thread Matthias J. Sax
I would not set

> ExecutionEnvironment env = 
> ExecutionEnvironment.createLocalEnvironment().setParallelism(1);

because this changes the default parallelism of *all* operator to one.
Instead, only set the parallelism of the **sink** to one (as described
here:
https://stackoverflow.com/questions/32580970/writeascsv-and-writeastext-is-unexpected/32581813#32581813)

filteredData.writeAsText("file:///output1.txt").setParallelism(1);

-Matthias

On 11/21/2015 02:23 PM, Márton Balassi wrote:
> Additionally as having multiple files under /output1.txt is standard in the
> Hadoop ecosystem you can transparently read all the files with
> env.readTextFile("/output1.txt").
> 
> You can also set parallelism on individual operators (e.g the file writer)
> if you really need a single output.
> 
> On Fri, Nov 20, 2015, 21:27 Suneel Marthi  wrote:
> 
>> You can write to a single output file by setting parallelism == 1
>>
>>  So final ExecutionEnvironment env = ExecutionEnvironment.
>> createLocalEnvironment().setParallelism(1);
>>
>> The reason u see multiple output files is because, each worker is writing
>> to a different file.
>>
>> On Fri, Nov 20, 2015 at 10:06 PM, jun aoki  wrote:
>>
>>> Hi Flink community
>>>
>>> I know I'm mistaken but could not find what I want.
>>>
>>> final ExecutionEnvironment env =
>>> ExecutionEnvironment.createLocalEnvironment();
>>> DataSet data = env.readTextFile("file:///text1.txt");
>>> FilterFunction filter = new MyFilterFunction();  // looks for a
>>> line starts with "[ERROR]"
>>> DataSet filteredData = data.filter(filter);
>>> filteredData.writeAsText("file:///output1.txt");
>>> env.execute();
>>>
>>> Then I expect to get a single file /output1.txt , but actually get
>>> /output1.txt/1, /output1.txt/2, /output1.txt/3...
>>> I assumed I was getting a single file because the method signature says
>>> writeAsText(String filePath).  <-- filePath instead of directoryPath
>>> Also the Javadoc comment sounds like I assumed right.
>>>
>>>
>> https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java#L1354
>>>
>>> Can anyone tell if the method signature and document should be fixed? or
>> if
>>> I am missing some configuration?
>>>
>>> --
>>> -jun
>>>
>>
> 



signature.asc
Description: OpenPGP digital signature


Re: how to write dataset in a file?

2015-11-21 Thread jun aoki
Thank you guys for helping me understand!
Precisely I was able to control the behavior on my research work with your
help.

Does anybody think, however, the behavior is not straightforward? (At least
there is another guy on StackOverflow who misunderstand the same way I did)

I'd like to ask the community if they like my suggestions
1. Make the method signatures writeAsText(String directoryPath) and
writeAsCsv(String directoryPath) (not filePath but directoryPath) and they
ALWAYS create a directory instead of sometimes a file and sometimes a
directory depending on the sink's parallelism.
This creates a directory and a sole "1" file is created even when
parallelism is set to 1.
This is more consistent and no confusion of what it says it does.

2. And create another methods called writeAsTextFile(String filePath) and
writeAsCsvFile(String filePath) which ALWAYS create a file and there is no
directory. In order to make this happen, either its sink's parallelism is
implicitly set to 1, or collect all data from all workers into one dataset
behind the scene.

What do you guys think?


On Sat, Nov 21, 2015 at 6:18 AM, Matthias J. Sax  wrote:

> I would not set
>
> > ExecutionEnvironment env =
> ExecutionEnvironment.createLocalEnvironment().setParallelism(1);
>
> because this changes the default parallelism of *all* operator to one.
> Instead, only set the parallelism of the **sink** to one (as described
> here:
>
> https://stackoverflow.com/questions/32580970/writeascsv-and-writeastext-is-unexpected/32581813#32581813
> )
>
> filteredData.writeAsText("file:///output1.txt").setParallelism(1);
>
> -Matthias
>
> On 11/21/2015 02:23 PM, Márton Balassi wrote:
> > Additionally as having multiple files under /output1.txt is standard in
> the
> > Hadoop ecosystem you can transparently read all the files with
> > env.readTextFile("/output1.txt").
> >
> > You can also set parallelism on individual operators (e.g the file
> writer)
> > if you really need a single output.
> >
> > On Fri, Nov 20, 2015, 21:27 Suneel Marthi  wrote:
> >
> >> You can write to a single output file by setting parallelism == 1
> >>
> >>  So final ExecutionEnvironment env = ExecutionEnvironment.
> >> createLocalEnvironment().setParallelism(1);
> >>
> >> The reason u see multiple output files is because, each worker is
> writing
> >> to a different file.
> >>
> >> On Fri, Nov 20, 2015 at 10:06 PM, jun aoki  wrote:
> >>
> >>> Hi Flink community
> >>>
> >>> I know I'm mistaken but could not find what I want.
> >>>
> >>> final ExecutionEnvironment env =
> >>> ExecutionEnvironment.createLocalEnvironment();
> >>> DataSet data = env.readTextFile("file:///text1.txt");
> >>> FilterFunction filter = new MyFilterFunction();  // looks for a
> >>> line starts with "[ERROR]"
> >>> DataSet filteredData = data.filter(filter);
> >>> filteredData.writeAsText("file:///output1.txt");
> >>> env.execute();
> >>>
> >>> Then I expect to get a single file /output1.txt , but actually get
> >>> /output1.txt/1, /output1.txt/2, /output1.txt/3...
> >>> I assumed I was getting a single file because the method signature says
> >>> writeAsText(String filePath).  <-- filePath instead of directoryPath
> >>> Also the Javadoc comment sounds like I assumed right.
> >>>
> >>>
> >>
> https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java#L1354
> >>>
> >>> Can anyone tell if the method signature and document should be fixed?
> or
> >> if
> >>> I am missing some configuration?
> >>>
> >>> --
> >>> -jun
> >>>
> >>
> >
>
>


-- 
-jun


Re: how to write dataset in a file?

2015-11-20 Thread Suneel Marthi
You can write to a single output file by setting parallelism == 1

 So final ExecutionEnvironment env = ExecutionEnvironment.
createLocalEnvironment().setParallelism(1);

The reason u see multiple output files is because, each worker is writing
to a different file.

On Fri, Nov 20, 2015 at 10:06 PM, jun aoki  wrote:

> Hi Flink community
>
> I know I'm mistaken but could not find what I want.
>
> final ExecutionEnvironment env =
> ExecutionEnvironment.createLocalEnvironment();
> DataSet data = env.readTextFile("file:///text1.txt");
> FilterFunction filter = new MyFilterFunction();  // looks for a
> line starts with "[ERROR]"
> DataSet filteredData = data.filter(filter);
> filteredData.writeAsText("file:///output1.txt");
> env.execute();
>
> Then I expect to get a single file /output1.txt , but actually get
> /output1.txt/1, /output1.txt/2, /output1.txt/3...
> I assumed I was getting a single file because the method signature says
> writeAsText(String filePath).  <-- filePath instead of directoryPath
> Also the Javadoc comment sounds like I assumed right.
>
> https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java#L1354
>
> Can anyone tell if the method signature and document should be fixed? or if
> I am missing some configuration?
>
> --
> -jun
>


how to write dataset in a file?

2015-11-20 Thread jun aoki
Hi Flink community

I know I'm mistaken but could not find what I want.

final ExecutionEnvironment env =
ExecutionEnvironment.createLocalEnvironment();
DataSet data = env.readTextFile("file:///text1.txt");
FilterFunction filter = new MyFilterFunction();  // looks for a
line starts with "[ERROR]"
DataSet filteredData = data.filter(filter);
filteredData.writeAsText("file:///output1.txt");
env.execute();

Then I expect to get a single file /output1.txt , but actually get
/output1.txt/1, /output1.txt/2, /output1.txt/3...
I assumed I was getting a single file because the method signature says
writeAsText(String filePath).  <-- filePath instead of directoryPath
Also the Javadoc comment sounds like I assumed right.
https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java#L1354

Can anyone tell if the method signature and document should be fixed? or if
I am missing some configuration?

-- 
-jun