[GitHub] flink pull request #3012: [FLINK-2186] Add readCsvAsRow methods to CsvReader...
Github user tonycox commented on a diff in the pull request: https://github.com/apache/flink/pull/3012#discussion_r10033 --- Diff: flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala --- @@ -348,6 +349,47 @@ class ExecutionEnvironment(javaEnv: JavaEnv) { wrap(new DataSource[T](javaEnv, inputFormat, typeInfo, getCallLocationName())) } + def readCsvFileAsRow[T : ClassTag : TypeInformation]( +filePath: String, +rowSize: Int, +additionalTypes: Map[Int, Class[_]] = null, +lineDelimiter: String = "\n", +fieldDelimiter: String = ",", +quoteCharacter: Character = null, +ignoreFirstLine: Boolean = false, +ignoreComments: String = null, +lenient: Boolean = false, +includedFields: Array[Int] = null): DataSet[Row] = { --- End diff -- Or 3 tabs :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3012: [FLINK-2186] Add readCsvAsRow methods to CsvReader...
Github user tonycox commented on a diff in the pull request: https://github.com/apache/flink/pull/3012#discussion_r100337562 --- Diff: flink-tests/src/test/java/org/apache/flink/test/io/CsvReaderITCase.java --- @@ -122,6 +127,80 @@ public void testValueTypes() throws Exception { compareResultAsTuples(result, expected); } + private int fullRowSize = 29; + private String including = "011"; + private String fileContent = + "1,2,3," + 4 + "," + 5.0d + "," + true + + ",7,8,9,11,22,33,44,55,66,77,88,99,00," + + "111,222,333,444,555,666,777,888,999,000\n" + + "a,b,c," + 40 + "," + 50.0d + "," + false + + ",g,h,i,aa,bb,cc,dd,ee,ff,gg,hh,ii,mm," + + "aaa,bbb,ccc,ddd,eee,fff,ggg,hhh,iii,mmm\n"; --- End diff -- Yes, but it's simplier to see values, I'll make it static to creating it once. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3012: [FLINK-2186] Add readCsvAsRow methods to CsvReader...
Github user ex00 commented on a diff in the pull request: https://github.com/apache/flink/pull/3012#discussion_r100247845 --- Diff: flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java --- @@ -351,6 +356,32 @@ public CsvReader ignoreInvalidLines(){ return new DataSource(executionContext, inputFormat, typeInfo, Utils.getCallLocationName()); } + public DataSource rowType(Class mainTargetType, int size, MapadditionalTypes) { --- End diff -- Please add javadoc --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3012: [FLINK-2186] Add readCsvAsRow methods to CsvReader...
Github user ex00 commented on a diff in the pull request: https://github.com/apache/flink/pull/3012#discussion_r100247856 --- Diff: flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java --- @@ -351,6 +356,32 @@ public CsvReader ignoreInvalidLines(){ return new DataSource(executionContext, inputFormat, typeInfo, Utils.getCallLocationName()); } + public DataSource rowType(Class mainTargetType, int size, MapadditionalTypes) { + Preconditions.checkNotNull(mainTargetType, "The main type class must not be null."); + + TypeInformation typeInfo = TypeExtractor.createTypeInfo(mainTargetType); + RowTypeInfo rowTypeInfo; + + if (additionalTypes != null) { + Map addTypeMap = new HashMap<>(additionalTypes.size()); + for (Map.Entry e : additionalTypes.entrySet()) { + addTypeMap.put(e.getKey(), TypeExtractor.createTypeInfo(e.getValue())); + } + rowTypeInfo = new RowTypeInfo(typeInfo, size, addTypeMap); + } else { + rowTypeInfo = new RowTypeInfo(typeInfo, size); + } + + CsvInputFormat inputFormat = new RowCsvInputFormat(path, rowTypeInfo, this.includedMask); + configureInputFormat(inputFormat); + + return new DataSource(executionContext, inputFormat, rowTypeInfo, Utils.getCallLocationName()); + } + + public DataSource rowType(Class mainTargetType, int size) { --- End diff -- Please add javadoc --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3012: [FLINK-2186] Add readCsvAsRow methods to CsvReader...
Github user ex00 commented on a diff in the pull request: https://github.com/apache/flink/pull/3012#discussion_r100247899 --- Diff: flink-tests/src/test/java/org/apache/flink/test/io/CsvReaderITCase.java --- @@ -122,6 +127,80 @@ public void testValueTypes() throws Exception { compareResultAsTuples(result, expected); } + private int fullRowSize = 29; + private String including = "011"; + private String fileContent = + "1,2,3," + 4 + "," + 5.0d + "," + true + + ",7,8,9,11,22,33,44,55,66,77,88,99,00," + + "111,222,333,444,555,666,777,888,999,000\n" + + "a,b,c," + 40 + "," + 50.0d + "," + false + + ",g,h,i,aa,bb,cc,dd,ee,ff,gg,hh,ii,mm," + + "aaa,bbb,ccc,ddd,eee,fff,ggg,hhh,iii,mmm\n"; --- End diff -- A lot of concatenations, the fileContent equals ``` "1,2,3,4,5.0,true" + ",7,8,9,11,22,33,44,55,66,77,88,99,00," + "111,222,333,444,555,666,777,888,999,000\n" + "a,b,c,40,50.0,false," + "g,h,i,aa,bb,cc,dd,ee,ff,gg,hh,ii,mm," + "aaa,bbb,ccc,ddd,eee,fff,ggg,hhh,iii,mmm\n" ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3012: [FLINK-2186] Add readCsvAsRow methods to CsvReader...
Github user ex00 commented on a diff in the pull request: https://github.com/apache/flink/pull/3012#discussion_r100247879 --- Diff: flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala --- @@ -348,6 +349,47 @@ class ExecutionEnvironment(javaEnv: JavaEnv) { wrap(new DataSource[T](javaEnv, inputFormat, typeInfo, getCallLocationName())) } + def readCsvFileAsRow[T : ClassTag : TypeInformation]( +filePath: String, +rowSize: Int, +additionalTypes: Map[Int, Class[_]] = null, +lineDelimiter: String = "\n", +fieldDelimiter: String = ",", +quoteCharacter: Character = null, +ignoreFirstLine: Boolean = false, +ignoreComments: String = null, +lenient: Boolean = false, +includedFields: Array[Int] = null): DataSet[Row] = { --- End diff -- Please add more two spase for parameters like as for other methods in class, space chars should is 6. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3012: [FLINK-2186] Add readCsvAsRow methods to CsvReader...
Github user ex00 commented on a diff in the pull request: https://github.com/apache/flink/pull/3012#discussion_r100247870 --- Diff: flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala --- @@ -348,6 +349,47 @@ class ExecutionEnvironment(javaEnv: JavaEnv) { wrap(new DataSource[T](javaEnv, inputFormat, typeInfo, getCallLocationName())) } + def readCsvFileAsRow[T : ClassTag : TypeInformation]( --- End diff -- Could you add scaladoc for method? may do not understand, what is ```additionalTypes``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3012: [FLINK-2186] Add readCsvAsRow methods to CsvReader...
Github user ex00 commented on a diff in the pull request: https://github.com/apache/flink/pull/3012#discussion_r100247839 --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/RowTypeInfo.java --- @@ -192,6 +193,28 @@ public void getFlatFields(String fieldExpression, int offset, List mainType, int size, MapadditionalTypes) { + this(configureTypes(mainType, size, additionalTypes)); + } + + public RowTypeInfo(TypeInformation mainType, int size) { + this(configureTypes(mainType, size, Collections. emptyMap())); + } + + private static TypeInformation[] configureTypes(TypeInformation mainType, int size, Map additionalTypes) { --- End diff -- Could you format argumets like as for ```RowTypeInfo#createComparator#219``` for example: ``` private static TypeInformation[] configureTypes( TypeInformation mainType, int size, Map additionalTypes) { ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3012: [FLINK-2186] Add readCsvAsRow methods to CsvReader...
GitHub user tonycox opened a pull request: https://github.com/apache/flink/pull/3012 [FLINK-2186] Add readCsvAsRow methods to CsvReader and scala ExecutionEnv Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [ ] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [ ] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [ ] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed Rework CSV import to support very wide files You can merge this pull request into a Git repository by running: $ git pull https://github.com/tonycox/flink FLINK-2186 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3012.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3012 commit 905e1fe5f530bcec92af3d4e3ebc8f2c0e26cdf9 Author: tonycoxDate: 2016-12-12T11:51:56Z [FLINK-2186] Add readCsvAsRow methods to CsvReader and scala ExecutionEnv. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---