Re: how to write dataset in a file?
Hi Stephan, thank you for further explanation and clarification. The current behavior now makes sense to me. The hard part is, that those information is spread along documents. I have made a pull request to clarify the current behavior in the Javadoc. https://github.com/apache/flink/pull/1392 On Sun, Nov 22, 2015 at 9:00 AM, Stephan Ewenwrote: > You can configure the system to always create a directly (not just on > parallelism > 1), > see "fs.output.always-create-directory"under > > https://ci.apache.org/projects/flink/flink-docs-release-0.10/setup/config.html#file-systems > > The behavior we support right now is pretty much what people coming from > the Hadoop world are used to, that's why it behaves the way it does. > > Greetings, > Stephan > > > On Sun, Nov 22, 2015 at 8:49 AM, jun aoki wrote: > > > Thank you guys for helping me understand! > > Precisely I was able to control the behavior on my research work with > your > > help. > > > > Does anybody think, however, the behavior is not straightforward? (At > least > > there is another guy on StackOverflow who misunderstand the same way I > did) > > > > I'd like to ask the community if they like my suggestions > > 1. Make the method signatures writeAsText(String directoryPath) and > > writeAsCsv(String directoryPath) (not filePath but directoryPath) and > they > > ALWAYS create a directory instead of sometimes a file and sometimes a > > directory depending on the sink's parallelism. > > This creates a directory and a sole "1" file is created even when > > parallelism is set to 1. > > This is more consistent and no confusion of what it says it does. > > > > 2. And create another methods called writeAsTextFile(String filePath) and > > writeAsCsvFile(String filePath) which ALWAYS create a file and there is > no > > directory. In order to make this happen, either its sink's parallelism is > > implicitly set to 1, or collect all data from all workers into one > dataset > > behind the scene. > > > > What do you guys think? > > > > > > On Sat, Nov 21, 2015 at 6:18 AM, Matthias J. Sax > wrote: > > > > > I would not set > > > > > > > ExecutionEnvironment env = > > > ExecutionEnvironment.createLocalEnvironment().setParallelism(1); > > > > > > because this changes the default parallelism of *all* operator to one. > > > Instead, only set the parallelism of the **sink** to one (as described > > > here: > > > > > > > > > https://stackoverflow.com/questions/32580970/writeascsv-and-writeastext-is-unexpected/32581813#32581813 > > > ) > > > > > > filteredData.writeAsText("file:///output1.txt").setParallelism(1); > > > > > > -Matthias > > > > > > On 11/21/2015 02:23 PM, Márton Balassi wrote: > > > > Additionally as having multiple files under /output1.txt is standard > in > > > the > > > > Hadoop ecosystem you can transparently read all the files with > > > > env.readTextFile("/output1.txt"). > > > > > > > > You can also set parallelism on individual operators (e.g the file > > > writer) > > > > if you really need a single output. > > > > > > > > On Fri, Nov 20, 2015, 21:27 Suneel Marthi > wrote: > > > > > > > >> You can write to a single output file by setting parallelism == 1 > > > >> > > > >> So final ExecutionEnvironment env = ExecutionEnvironment. > > > >> createLocalEnvironment().setParallelism(1); > > > >> > > > >> The reason u see multiple output files is because, each worker is > > > writing > > > >> to a different file. > > > >> > > > >> On Fri, Nov 20, 2015 at 10:06 PM, jun aoki > wrote: > > > >> > > > >>> Hi Flink community > > > >>> > > > >>> I know I'm mistaken but could not find what I want. > > > >>> > > > >>> final ExecutionEnvironment env = > > > >>> ExecutionEnvironment.createLocalEnvironment(); > > > >>> DataSet data = env.readTextFile("file:///text1.txt"); > > > >>> FilterFunction filter = new MyFilterFunction(); // looks > > for a > > > >>> line starts with "[ERROR]" > > > >>> DataSet filteredData = data.filter(filter); > > > >>> filteredData.writeAsText("file:///output1.txt"); > > > >>> env.execute(); > > > >>> > > > >>> Then I expect to get a single file /output1.txt , but actually get > > > >>> /output1.txt/1, /output1.txt/2, /output1.txt/3... > > > >>> I assumed I was getting a single file because the method signature > > says > > > >>> writeAsText(String filePath). <-- filePath instead of > directoryPath > > > >>> Also the Javadoc comment sounds like I assumed right. > > > >>> > > > >>> > > > >> > > > > > > https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java#L1354 > > > >>> > > > >>> Can anyone tell if the method signature and document should be > fixed? > > > or > > > >> if > > > >>> I am missing some configuration? > > > >>> > > > >>> -- > > > >>> -jun > > > >>> > > > >> > > > > > > > > > > > > > > > > -- > > -jun > > > -- -jun
Re: how to write dataset in a file?
You can configure the system to always create a directly (not just on parallelism > 1), see "fs.output.always-create-directory"under https://ci.apache.org/projects/flink/flink-docs-release-0.10/setup/config.html#file-systems The behavior we support right now is pretty much what people coming from the Hadoop world are used to, that's why it behaves the way it does. Greetings, Stephan On Sun, Nov 22, 2015 at 8:49 AM, jun aokiwrote: > Thank you guys for helping me understand! > Precisely I was able to control the behavior on my research work with your > help. > > Does anybody think, however, the behavior is not straightforward? (At least > there is another guy on StackOverflow who misunderstand the same way I did) > > I'd like to ask the community if they like my suggestions > 1. Make the method signatures writeAsText(String directoryPath) and > writeAsCsv(String directoryPath) (not filePath but directoryPath) and they > ALWAYS create a directory instead of sometimes a file and sometimes a > directory depending on the sink's parallelism. > This creates a directory and a sole "1" file is created even when > parallelism is set to 1. > This is more consistent and no confusion of what it says it does. > > 2. And create another methods called writeAsTextFile(String filePath) and > writeAsCsvFile(String filePath) which ALWAYS create a file and there is no > directory. In order to make this happen, either its sink's parallelism is > implicitly set to 1, or collect all data from all workers into one dataset > behind the scene. > > What do you guys think? > > > On Sat, Nov 21, 2015 at 6:18 AM, Matthias J. Sax wrote: > > > I would not set > > > > > ExecutionEnvironment env = > > ExecutionEnvironment.createLocalEnvironment().setParallelism(1); > > > > because this changes the default parallelism of *all* operator to one. > > Instead, only set the parallelism of the **sink** to one (as described > > here: > > > > > https://stackoverflow.com/questions/32580970/writeascsv-and-writeastext-is-unexpected/32581813#32581813 > > ) > > > > filteredData.writeAsText("file:///output1.txt").setParallelism(1); > > > > -Matthias > > > > On 11/21/2015 02:23 PM, Márton Balassi wrote: > > > Additionally as having multiple files under /output1.txt is standard in > > the > > > Hadoop ecosystem you can transparently read all the files with > > > env.readTextFile("/output1.txt"). > > > > > > You can also set parallelism on individual operators (e.g the file > > writer) > > > if you really need a single output. > > > > > > On Fri, Nov 20, 2015, 21:27 Suneel Marthi wrote: > > > > > >> You can write to a single output file by setting parallelism == 1 > > >> > > >> So final ExecutionEnvironment env = ExecutionEnvironment. > > >> createLocalEnvironment().setParallelism(1); > > >> > > >> The reason u see multiple output files is because, each worker is > > writing > > >> to a different file. > > >> > > >> On Fri, Nov 20, 2015 at 10:06 PM, jun aoki wrote: > > >> > > >>> Hi Flink community > > >>> > > >>> I know I'm mistaken but could not find what I want. > > >>> > > >>> final ExecutionEnvironment env = > > >>> ExecutionEnvironment.createLocalEnvironment(); > > >>> DataSet data = env.readTextFile("file:///text1.txt"); > > >>> FilterFunction filter = new MyFilterFunction(); // looks > for a > > >>> line starts with "[ERROR]" > > >>> DataSet filteredData = data.filter(filter); > > >>> filteredData.writeAsText("file:///output1.txt"); > > >>> env.execute(); > > >>> > > >>> Then I expect to get a single file /output1.txt , but actually get > > >>> /output1.txt/1, /output1.txt/2, /output1.txt/3... > > >>> I assumed I was getting a single file because the method signature > says > > >>> writeAsText(String filePath). <-- filePath instead of directoryPath > > >>> Also the Javadoc comment sounds like I assumed right. > > >>> > > >>> > > >> > > > https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java#L1354 > > >>> > > >>> Can anyone tell if the method signature and document should be fixed? > > or > > >> if > > >>> I am missing some configuration? > > >>> > > >>> -- > > >>> -jun > > >>> > > >> > > > > > > > > > > -- > -jun >
Re: how to write dataset in a file?
Additionally as having multiple files under /output1.txt is standard in the Hadoop ecosystem you can transparently read all the files with env.readTextFile("/output1.txt"). You can also set parallelism on individual operators (e.g the file writer) if you really need a single output. On Fri, Nov 20, 2015, 21:27 Suneel Marthiwrote: > You can write to a single output file by setting parallelism == 1 > > So final ExecutionEnvironment env = ExecutionEnvironment. > createLocalEnvironment().setParallelism(1); > > The reason u see multiple output files is because, each worker is writing > to a different file. > > On Fri, Nov 20, 2015 at 10:06 PM, jun aoki wrote: > > > Hi Flink community > > > > I know I'm mistaken but could not find what I want. > > > > final ExecutionEnvironment env = > > ExecutionEnvironment.createLocalEnvironment(); > > DataSet data = env.readTextFile("file:///text1.txt"); > > FilterFunction filter = new MyFilterFunction(); // looks for a > > line starts with "[ERROR]" > > DataSet filteredData = data.filter(filter); > > filteredData.writeAsText("file:///output1.txt"); > > env.execute(); > > > > Then I expect to get a single file /output1.txt , but actually get > > /output1.txt/1, /output1.txt/2, /output1.txt/3... > > I assumed I was getting a single file because the method signature says > > writeAsText(String filePath). <-- filePath instead of directoryPath > > Also the Javadoc comment sounds like I assumed right. > > > > > https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java#L1354 > > > > Can anyone tell if the method signature and document should be fixed? or > if > > I am missing some configuration? > > > > -- > > -jun > > >
Re: how to write dataset in a file?
I would not set > ExecutionEnvironment env = > ExecutionEnvironment.createLocalEnvironment().setParallelism(1); because this changes the default parallelism of *all* operator to one. Instead, only set the parallelism of the **sink** to one (as described here: https://stackoverflow.com/questions/32580970/writeascsv-and-writeastext-is-unexpected/32581813#32581813) filteredData.writeAsText("file:///output1.txt").setParallelism(1); -Matthias On 11/21/2015 02:23 PM, Márton Balassi wrote: > Additionally as having multiple files under /output1.txt is standard in the > Hadoop ecosystem you can transparently read all the files with > env.readTextFile("/output1.txt"). > > You can also set parallelism on individual operators (e.g the file writer) > if you really need a single output. > > On Fri, Nov 20, 2015, 21:27 Suneel Marthiwrote: > >> You can write to a single output file by setting parallelism == 1 >> >> So final ExecutionEnvironment env = ExecutionEnvironment. >> createLocalEnvironment().setParallelism(1); >> >> The reason u see multiple output files is because, each worker is writing >> to a different file. >> >> On Fri, Nov 20, 2015 at 10:06 PM, jun aoki wrote: >> >>> Hi Flink community >>> >>> I know I'm mistaken but could not find what I want. >>> >>> final ExecutionEnvironment env = >>> ExecutionEnvironment.createLocalEnvironment(); >>> DataSet data = env.readTextFile("file:///text1.txt"); >>> FilterFunction filter = new MyFilterFunction(); // looks for a >>> line starts with "[ERROR]" >>> DataSet filteredData = data.filter(filter); >>> filteredData.writeAsText("file:///output1.txt"); >>> env.execute(); >>> >>> Then I expect to get a single file /output1.txt , but actually get >>> /output1.txt/1, /output1.txt/2, /output1.txt/3... >>> I assumed I was getting a single file because the method signature says >>> writeAsText(String filePath). <-- filePath instead of directoryPath >>> Also the Javadoc comment sounds like I assumed right. >>> >>> >> https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java#L1354 >>> >>> Can anyone tell if the method signature and document should be fixed? or >> if >>> I am missing some configuration? >>> >>> -- >>> -jun >>> >> > signature.asc Description: OpenPGP digital signature
Re: how to write dataset in a file?
Thank you guys for helping me understand! Precisely I was able to control the behavior on my research work with your help. Does anybody think, however, the behavior is not straightforward? (At least there is another guy on StackOverflow who misunderstand the same way I did) I'd like to ask the community if they like my suggestions 1. Make the method signatures writeAsText(String directoryPath) and writeAsCsv(String directoryPath) (not filePath but directoryPath) and they ALWAYS create a directory instead of sometimes a file and sometimes a directory depending on the sink's parallelism. This creates a directory and a sole "1" file is created even when parallelism is set to 1. This is more consistent and no confusion of what it says it does. 2. And create another methods called writeAsTextFile(String filePath) and writeAsCsvFile(String filePath) which ALWAYS create a file and there is no directory. In order to make this happen, either its sink's parallelism is implicitly set to 1, or collect all data from all workers into one dataset behind the scene. What do you guys think? On Sat, Nov 21, 2015 at 6:18 AM, Matthias J. Saxwrote: > I would not set > > > ExecutionEnvironment env = > ExecutionEnvironment.createLocalEnvironment().setParallelism(1); > > because this changes the default parallelism of *all* operator to one. > Instead, only set the parallelism of the **sink** to one (as described > here: > > https://stackoverflow.com/questions/32580970/writeascsv-and-writeastext-is-unexpected/32581813#32581813 > ) > > filteredData.writeAsText("file:///output1.txt").setParallelism(1); > > -Matthias > > On 11/21/2015 02:23 PM, Márton Balassi wrote: > > Additionally as having multiple files under /output1.txt is standard in > the > > Hadoop ecosystem you can transparently read all the files with > > env.readTextFile("/output1.txt"). > > > > You can also set parallelism on individual operators (e.g the file > writer) > > if you really need a single output. > > > > On Fri, Nov 20, 2015, 21:27 Suneel Marthi wrote: > > > >> You can write to a single output file by setting parallelism == 1 > >> > >> So final ExecutionEnvironment env = ExecutionEnvironment. > >> createLocalEnvironment().setParallelism(1); > >> > >> The reason u see multiple output files is because, each worker is > writing > >> to a different file. > >> > >> On Fri, Nov 20, 2015 at 10:06 PM, jun aoki wrote: > >> > >>> Hi Flink community > >>> > >>> I know I'm mistaken but could not find what I want. > >>> > >>> final ExecutionEnvironment env = > >>> ExecutionEnvironment.createLocalEnvironment(); > >>> DataSet data = env.readTextFile("file:///text1.txt"); > >>> FilterFunction filter = new MyFilterFunction(); // looks for a > >>> line starts with "[ERROR]" > >>> DataSet filteredData = data.filter(filter); > >>> filteredData.writeAsText("file:///output1.txt"); > >>> env.execute(); > >>> > >>> Then I expect to get a single file /output1.txt , but actually get > >>> /output1.txt/1, /output1.txt/2, /output1.txt/3... > >>> I assumed I was getting a single file because the method signature says > >>> writeAsText(String filePath). <-- filePath instead of directoryPath > >>> Also the Javadoc comment sounds like I assumed right. > >>> > >>> > >> > https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java#L1354 > >>> > >>> Can anyone tell if the method signature and document should be fixed? > or > >> if > >>> I am missing some configuration? > >>> > >>> -- > >>> -jun > >>> > >> > > > > -- -jun
Re: how to write dataset in a file?
You can write to a single output file by setting parallelism == 1 So final ExecutionEnvironment env = ExecutionEnvironment. createLocalEnvironment().setParallelism(1); The reason u see multiple output files is because, each worker is writing to a different file. On Fri, Nov 20, 2015 at 10:06 PM, jun aokiwrote: > Hi Flink community > > I know I'm mistaken but could not find what I want. > > final ExecutionEnvironment env = > ExecutionEnvironment.createLocalEnvironment(); > DataSet data = env.readTextFile("file:///text1.txt"); > FilterFunction filter = new MyFilterFunction(); // looks for a > line starts with "[ERROR]" > DataSet filteredData = data.filter(filter); > filteredData.writeAsText("file:///output1.txt"); > env.execute(); > > Then I expect to get a single file /output1.txt , but actually get > /output1.txt/1, /output1.txt/2, /output1.txt/3... > I assumed I was getting a single file because the method signature says > writeAsText(String filePath). <-- filePath instead of directoryPath > Also the Javadoc comment sounds like I assumed right. > > https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java#L1354 > > Can anyone tell if the method signature and document should be fixed? or if > I am missing some configuration? > > -- > -jun >
how to write dataset in a file?
Hi Flink community I know I'm mistaken but could not find what I want. final ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); DataSet data = env.readTextFile("file:///text1.txt"); FilterFunction filter = new MyFilterFunction(); // looks for a line starts with "[ERROR]" DataSet filteredData = data.filter(filter); filteredData.writeAsText("file:///output1.txt"); env.execute(); Then I expect to get a single file /output1.txt , but actually get /output1.txt/1, /output1.txt/2, /output1.txt/3... I assumed I was getting a single file because the method signature says writeAsText(String filePath). <-- filePath instead of directoryPath Also the Javadoc comment sounds like I assumed right. https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java#L1354 Can anyone tell if the method signature and document should be fixed? or if I am missing some configuration? -- -jun