[jira] [Commented] (APEXMALHAR-2116) File Record reader module
[ https://issues.apache.org/jira/browse/APEXMALHAR-2116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15401320#comment-15401320 ] ASF GitHub Bot commented on APEXMALHAR-2116: Github user asfgit closed the pull request at: https://github.com/apache/apex-malhar/pull/326 > File Record reader module > - > > Key: APEXMALHAR-2116 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2116 > Project: Apache Apex Malhar > Issue Type: New Feature >Reporter: Yogi Devendra >Assignee: Yogi Devendra > > This will be useful for the usecases which involves reading from files "line > by line" in parallel and emit each line as seperate tuple. > Proposal is to have new Module which would allow users to monitor > directories, read files and emit data records(tuple). Records are based on > record separator (e.g. newline) or fixed size (no of bytes). > Plan is as follows: > 1. New operator FileRecordReader which will extend BlockReader. > 2. This operator will have configuration option to select mode for > FIXED_LENGTH, SEPARATOR_BASED recors. > 3. Using appropriate ReaderContext based on mode. > 4. New module FileRecordReaderModule which wraps (FileSplitter (existing) + > FileRecordReader operator) > Reason for having different operator than BlockReader is because output port > signature is different than BlockReader. > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (APEXMALHAR-2116) File Record reader module
[ https://issues.apache.org/jira/browse/APEXMALHAR-2116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15379410#comment-15379410 ] ASF GitHub Bot commented on APEXMALHAR-2116: Github user amberarrow commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/326#discussion_r70977817 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordReaderModule.java --- @@ -0,0 +1,344 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.apex.malhar.lib.fs; + +import javax.validation.constraints.Min; + +import javax.validation.constraints.NotNull; +import javax.validation.constraints.Size; + +import org.apache.apex.malhar.lib.fs.FSRecordReader.RECORD_READER_MODE; +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.api.Module; +import com.datatorrent.common.partitioner.StatelessPartitioner; +import com.datatorrent.lib.codec.KryoSerializableStreamCodec; +import com.datatorrent.lib.io.block.BlockMetadata; +import com.datatorrent.lib.io.block.FSSliceReader; +import com.datatorrent.lib.io.fs.FileSplitterInput; + +/** + * This module is used for reading records/tuples from FileSystem. Records can + * be read in parallel using multiple partitions of record reader operator. + * (Ordering is not guaranteed when records are read in parallel) + * + * Input directory is scanned at specified interval to poll for new data. + * + * The module reads data in parallel, following parameters can be configured + * + * 1. files: list of file(s)/directories to read + * 2. filePatternRegularExp: Files with names matching given regex will be read + * 3. scanIntervalMillis: interval between two scans to discover new files in + * input directory + * 4. recursive: if true, scan input directories recursively + * 5. blockSize: block size used to read input blocks of file + * 6. readersCount: count of readers to read input file + * 7. sequentialFileRead: if true, then each reader partition will read different file. + *instead of reading different offsets of the same file. + *(File level parallelism instead of block level parallelism) + * 8. blocksThreshold: number of blocks emitted per window + */ +@org.apache.hadoop.classification.InterfaceStability.Evolving +public class FSRecordReaderModule implements Module +{ + @NotNull + @Size(min = 1) + private String files; + private String filePatternRegularExp; + @Min(1) + private long scanIntervalMillis = 5000; + private boolean recursive = true; + private boolean sequentialFileRead = false; + @Min(1) + private int readersCount = 1; + @Min(1) + protected int blocksThreshold = 1; + + public final transient ProxyOutputPortrecords = new ProxyOutputPort (); + + /** + * Criteria for record split + */ + private RECORD_READER_MODE mode = RECORD_READER_MODE.DELIMITED_RECORD; + + /** + * Length for fixed width record + */ + @Min(1) + private int recordLength; --- End diff -- Adding another class seems like overkill; how about removing the annotation and adding a check in the code to ensure that the value is positive if mode is FIXED_WIDTH_RECORD ? > File Record reader module > - > > Key: APEXMALHAR-2116 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2116 > Project: Apache Apex Malhar > Issue Type: New Feature >Reporter: Yogi Devendra >Assignee: Yogi Devendra > > This will be useful for the usecases which involves reading from files "line > by line" in parallel and emit each line as seperate tuple. > Proposal is to have new Module which would allow users to monitor > directories,
[jira] [Commented] (APEXMALHAR-2116) File Record reader module
[ https://issues.apache.org/jira/browse/APEXMALHAR-2116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15378938#comment-15378938 ] ASF GitHub Bot commented on APEXMALHAR-2116: Github user yogidevendra commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/326#discussion_r70930388 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordReaderModule.java --- @@ -0,0 +1,344 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.apex.malhar.lib.fs; + +import javax.validation.constraints.Min; + +import javax.validation.constraints.NotNull; +import javax.validation.constraints.Size; + +import org.apache.apex.malhar.lib.fs.FSRecordReader.RECORD_READER_MODE; +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.api.Module; +import com.datatorrent.common.partitioner.StatelessPartitioner; +import com.datatorrent.lib.codec.KryoSerializableStreamCodec; +import com.datatorrent.lib.io.block.BlockMetadata; +import com.datatorrent.lib.io.block.FSSliceReader; +import com.datatorrent.lib.io.fs.FileSplitterInput; + +/** + * This module is used for reading records/tuples from FileSystem. Records can + * be read in parallel using multiple partitions of record reader operator. + * (Ordering is not guaranteed when records are read in parallel) + * + * Input directory is scanned at specified interval to poll for new data. + * + * The module reads data in parallel, following parameters can be configured + * + * 1. files: list of file(s)/directories to read + * 2. filePatternRegularExp: Files with names matching given regex will be read + * 3. scanIntervalMillis: interval between two scans to discover new files in + * input directory + * 4. recursive: if true, scan input directories recursively + * 5. blockSize: block size used to read input blocks of file + * 6. readersCount: count of readers to read input file + * 7. sequentialFileRead: if true, then each reader partition will read different file. + *instead of reading different offsets of the same file. + *(File level parallelism instead of block level parallelism) + * 8. blocksThreshold: number of blocks emitted per window + */ +@org.apache.hadoop.classification.InterfaceStability.Evolving +public class FSRecordReaderModule implements Module +{ + @NotNull + @Size(min = 1) + private String files; + private String filePatternRegularExp; + @Min(1) + private long scanIntervalMillis = 5000; + private boolean recursive = true; + private boolean sequentialFileRead = false; + @Min(1) + private int readersCount = 1; + @Min(1) + protected int blocksThreshold = 1; + + public final transient ProxyOutputPortrecords = new ProxyOutputPort (); + + /** + * Criteria for record split + */ + private RECORD_READER_MODE mode = RECORD_READER_MODE.DELIMITED_RECORD; + + /** + * Length for fixed width record + */ + @Min(1) + private int recordLength; --- End diff -- Oh. Good point. Should we separate this into two different classes. FixedWidthRecordReader, DelimitedRecordReader? That will make configuration clean. @amberarrow Any thoughts? > File Record reader module > - > > Key: APEXMALHAR-2116 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2116 > Project: Apache Apex Malhar > Issue Type: New Feature >Reporter: Yogi Devendra >Assignee: Yogi Devendra > > This will be useful for the usecases which involves reading from files "line > by line" in parallel and emit each line as seperate tuple. > Proposal is to have new Module which would allow users to monitor >
[jira] [Commented] (APEXMALHAR-2116) File Record reader module
[ https://issues.apache.org/jira/browse/APEXMALHAR-2116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15378933#comment-15378933 ] ASF GitHub Bot commented on APEXMALHAR-2116: Github user DT-Priyanka commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/326#discussion_r70930124 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordReaderModule.java --- @@ -0,0 +1,344 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.apex.malhar.lib.fs; + +import javax.validation.constraints.Min; + +import javax.validation.constraints.NotNull; +import javax.validation.constraints.Size; + +import org.apache.apex.malhar.lib.fs.FSRecordReader.RECORD_READER_MODE; +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.api.Module; +import com.datatorrent.common.partitioner.StatelessPartitioner; +import com.datatorrent.lib.codec.KryoSerializableStreamCodec; +import com.datatorrent.lib.io.block.BlockMetadata; +import com.datatorrent.lib.io.block.FSSliceReader; +import com.datatorrent.lib.io.fs.FileSplitterInput; + +/** + * This module is used for reading records/tuples from FileSystem. Records can + * be read in parallel using multiple partitions of record reader operator. + * (Ordering is not guaranteed when records are read in parallel) + * + * Input directory is scanned at specified interval to poll for new data. + * + * The module reads data in parallel, following parameters can be configured + * + * 1. files: list of file(s)/directories to read + * 2. filePatternRegularExp: Files with names matching given regex will be read + * 3. scanIntervalMillis: interval between two scans to discover new files in + * input directory + * 4. recursive: if true, scan input directories recursively + * 5. blockSize: block size used to read input blocks of file + * 6. readersCount: count of readers to read input file + * 7. sequentialFileRead: if true, then each reader partition will read different file. + *instead of reading different offsets of the same file. + *(File level parallelism instead of block level parallelism) + * 8. blocksThreshold: number of blocks emitted per window + */ +@org.apache.hadoop.classification.InterfaceStability.Evolving +public class FSRecordReaderModule implements Module +{ + @NotNull + @Size(min = 1) + private String files; + private String filePatternRegularExp; + @Min(1) + private long scanIntervalMillis = 5000; + private boolean recursive = true; + private boolean sequentialFileRead = false; + @Min(1) + private int readersCount = 1; + @Min(1) + protected int blocksThreshold = 1; + + public final transient ProxyOutputPortrecords = new ProxyOutputPort (); + + /** + * Criteria for record split + */ + private RECORD_READER_MODE mode = RECORD_READER_MODE.DELIMITED_RECORD; + + /** + * Length for fixed width record + */ + @Min(1) + private int recordLength; --- End diff -- Record length will be optional in case of DELIMITED_RECORD and is compulsory in case of FIXED_LENGHT records. so keeping it either optional or compulsory field both have pros and cons. > File Record reader module > - > > Key: APEXMALHAR-2116 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2116 > Project: Apache Apex Malhar > Issue Type: New Feature >Reporter: Yogi Devendra >Assignee: Yogi Devendra > > This will be useful for the usecases which involves reading from files "line > by line" in parallel and emit each line as seperate tuple. > Proposal is to have new Module which would allow users to monitor >
[jira] [Commented] (APEXMALHAR-2116) File Record reader module
[ https://issues.apache.org/jira/browse/APEXMALHAR-2116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15367309#comment-15367309 ] ASF GitHub Bot commented on APEXMALHAR-2116: Github user yogidevendra commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/326#discussion_r70032545 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordReaderModule.java --- @@ -0,0 +1,334 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.apex.malhar.lib.fs; + +import javax.validation.constraints.Min; + +import javax.validation.constraints.NotNull; +import javax.validation.constraints.Size; + +import org.apache.apex.malhar.lib.fs.FSRecordReader.RECORD_READER_MODE; +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.api.Module; +import com.datatorrent.common.partitioner.StatelessPartitioner; +import com.datatorrent.lib.codec.KryoSerializableStreamCodec; +import com.datatorrent.lib.io.block.BlockMetadata; +import com.datatorrent.lib.io.block.FSSliceReader; +import com.datatorrent.lib.io.fs.FileSplitterInput; + +/** + * This module is used for reading records/tuples from FileSystem. Records can + * be read in parallel using multiple partitions of record reader operator. + * (Ordering is not guaranteed when records are read in parallel) + * + * Input directory is scanned at specified interval to poll for new data. + * + * The module reads data in parallel, following parameters can be configured + * + * 1. files: list of file(s)/directories to read + * 2. filePatternRegularExp: Files with names matching given regex will be read + * 3. scanIntervalMillis: interval between two scans to discover new files in + * input directory + * 4. recursive: if true, scan input directories recursively + * 5. blockSize: block size used to read input blocks of file + * 6. readersCount: count of readers to read input file + * 7. sequentialFileRead: if true, then each reader partition will read different file. + *instead of reading different offsets of the same file. + *(File level parallelism instead of block level parallelism) + * 8. blocksThreshold: number of blocks emitted per window + */ +@org.apache.hadoop.classification.InterfaceStability.Evolving +public class FSRecordReaderModule implements Module +{ + @NotNull + @Size(min = 1) + private String files; + private String filePatternRegularExp; + @Min(0) + private long scanIntervalMillis; + private boolean recursive = true; + private boolean sequentialFileRead = false; + private int readersCount; + @Min(1) + protected int blocksThreshold; + + public final transient ProxyOutputPortrecords = new ProxyOutputPort (); + + /** + * Criteria for record split + */ + private RECORD_READER_MODE mode; + + /** + * Length for fixed width record + */ + private int recordLength; + + public FileSplitterInput createFileSplitter() + { +return new FileSplitterInput(); + } + + public FSRecordReader createBlockReader() + { +FSRecordReader recordReader = new FSRecordReader(); +recordReader.setMode(mode); +recordReader.setRecordLength(recordLength); + +return recordReader; + } + + @Override + public void populateDAG(DAG dag, Configuration configuration) + { +FileSplitterInput fileSplitter = dag.addOperator("FileSplitter", createFileSplitter()); +FSRecordReader recordReader = dag.addOperator("BlockReader", createBlockReader()); + +dag.addStream("BlockMetadata", fileSplitter.blocksMetadataOutput, recordReader.blocksMetadataInput); + +if (sequentialFileRead) { +
[jira] [Commented] (APEXMALHAR-2116) File Record reader module
[ https://issues.apache.org/jira/browse/APEXMALHAR-2116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15366023#comment-15366023 ] ASF GitHub Bot commented on APEXMALHAR-2116: Github user DT-Priyanka commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/326#discussion_r69897823 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordReaderModule.java --- @@ -0,0 +1,334 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.apex.malhar.lib.fs; + +import javax.validation.constraints.Min; + +import javax.validation.constraints.NotNull; +import javax.validation.constraints.Size; + +import org.apache.apex.malhar.lib.fs.FSRecordReader.RECORD_READER_MODE; +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.api.Module; +import com.datatorrent.common.partitioner.StatelessPartitioner; +import com.datatorrent.lib.codec.KryoSerializableStreamCodec; +import com.datatorrent.lib.io.block.BlockMetadata; +import com.datatorrent.lib.io.block.FSSliceReader; +import com.datatorrent.lib.io.fs.FileSplitterInput; + +/** + * This module is used for reading records/tuples from FileSystem. Records can + * be read in parallel using multiple partitions of record reader operator. + * (Ordering is not guaranteed when records are read in parallel) + * + * Input directory is scanned at specified interval to poll for new data. + * + * The module reads data in parallel, following parameters can be configured + * + * 1. files: list of file(s)/directories to read + * 2. filePatternRegularExp: Files with names matching given regex will be read + * 3. scanIntervalMillis: interval between two scans to discover new files in + * input directory + * 4. recursive: if true, scan input directories recursively + * 5. blockSize: block size used to read input blocks of file + * 6. readersCount: count of readers to read input file + * 7. sequentialFileRead: if true, then each reader partition will read different file. + *instead of reading different offsets of the same file. + *(File level parallelism instead of block level parallelism) + * 8. blocksThreshold: number of blocks emitted per window + */ +@org.apache.hadoop.classification.InterfaceStability.Evolving +public class FSRecordReaderModule implements Module +{ + @NotNull + @Size(min = 1) + private String files; + private String filePatternRegularExp; + @Min(0) + private long scanIntervalMillis; + private boolean recursive = true; + private boolean sequentialFileRead = false; + private int readersCount; + @Min(1) + protected int blocksThreshold; + + public final transient ProxyOutputPortrecords = new ProxyOutputPort (); + + /** + * Criteria for record split + */ + private RECORD_READER_MODE mode; + + /** + * Length for fixed width record + */ + private int recordLength; + + public FileSplitterInput createFileSplitter() + { +return new FileSplitterInput(); + } + + public FSRecordReader createBlockReader() + { +FSRecordReader recordReader = new FSRecordReader(); +recordReader.setMode(mode); +recordReader.setRecordLength(recordLength); + +return recordReader; + } + + @Override + public void populateDAG(DAG dag, Configuration configuration) + { +FileSplitterInput fileSplitter = dag.addOperator("FileSplitter", createFileSplitter()); +FSRecordReader recordReader = dag.addOperator("BlockReader", createBlockReader()); + +dag.addStream("BlockMetadata", fileSplitter.blocksMetadataOutput, recordReader.blocksMetadataInput); + +if (sequentialFileRead) { +
[jira] [Commented] (APEXMALHAR-2116) File Record reader module
[ https://issues.apache.org/jira/browse/APEXMALHAR-2116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15366020#comment-15366020 ] ASF GitHub Bot commented on APEXMALHAR-2116: Github user DT-Priyanka commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/326#discussion_r69897612 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordReaderModule.java --- @@ -0,0 +1,334 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.apex.malhar.lib.fs; + +import javax.validation.constraints.Min; + +import javax.validation.constraints.NotNull; +import javax.validation.constraints.Size; + +import org.apache.apex.malhar.lib.fs.FSRecordReader.RECORD_READER_MODE; +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.api.Module; +import com.datatorrent.common.partitioner.StatelessPartitioner; +import com.datatorrent.lib.codec.KryoSerializableStreamCodec; +import com.datatorrent.lib.io.block.BlockMetadata; +import com.datatorrent.lib.io.block.FSSliceReader; +import com.datatorrent.lib.io.fs.FileSplitterInput; + +/** + * This module is used for reading records/tuples from FileSystem. Records can + * be read in parallel using multiple partitions of record reader operator. + * (Ordering is not guaranteed when records are read in parallel) + * + * Input directory is scanned at specified interval to poll for new data. + * + * The module reads data in parallel, following parameters can be configured + * + * 1. files: list of file(s)/directories to read + * 2. filePatternRegularExp: Files with names matching given regex will be read + * 3. scanIntervalMillis: interval between two scans to discover new files in + * input directory + * 4. recursive: if true, scan input directories recursively + * 5. blockSize: block size used to read input blocks of file + * 6. readersCount: count of readers to read input file + * 7. sequentialFileRead: if true, then each reader partition will read different file. + *instead of reading different offsets of the same file. + *(File level parallelism instead of block level parallelism) + * 8. blocksThreshold: number of blocks emitted per window + */ +@org.apache.hadoop.classification.InterfaceStability.Evolving +public class FSRecordReaderModule implements Module +{ + @NotNull + @Size(min = 1) + private String files; + private String filePatternRegularExp; + @Min(0) + private long scanIntervalMillis; + private boolean recursive = true; + private boolean sequentialFileRead = false; + private int readersCount; + @Min(1) + protected int blocksThreshold; + + public final transient ProxyOutputPortrecords = new ProxyOutputPort (); + + /** + * Criteria for record split + */ + private RECORD_READER_MODE mode; + + /** + * Length for fixed width record + */ + private int recordLength; + + public FileSplitterInput createFileSplitter() + { +return new FileSplitterInput(); + } + + public FSRecordReader createBlockReader() --- End diff -- we can rename it to createRecordReader > File Record reader module > - > > Key: APEXMALHAR-2116 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2116 > Project: Apache Apex Malhar > Issue Type: New Feature >Reporter: Yogi Devendra >Assignee: Yogi Devendra > > This will be useful for the usecases which involves reading from files "line > by line" in parallel and emit each line as seperate tuple. > Proposal is to have new Module which would allow users to monitor > directories, read files and emit data records(tuple). Records
[jira] [Commented] (APEXMALHAR-2116) File Record reader module
[ https://issues.apache.org/jira/browse/APEXMALHAR-2116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15361186#comment-15361186 ] ASF GitHub Bot commented on APEXMALHAR-2116: Github user yogidevendra closed the pull request at: https://github.com/apache/apex-malhar/pull/326 > File Record reader module > - > > Key: APEXMALHAR-2116 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2116 > Project: Apache Apex Malhar > Issue Type: New Feature >Reporter: Yogi Devendra >Assignee: Yogi Devendra > > This will be useful for the usecases which involves reading from files "line > by line" in parallel and emit each line as seperate tuple. > Proposal is to have new Module which would allow users to monitor > directories, read files and emit data records(tuple). Records are based on > record separator (e.g. newline) or fixed size (no of bytes). > Plan is as follows: > 1. New operator FileRecordReader which will extend BlockReader. > 2. This operator will have configuration option to select mode for > FIXED_LENGTH, SEPARATOR_BASED recors. > 3. Using appropriate ReaderContext based on mode. > 4. New module FileRecordReaderModule which wraps (FileSplitter (existing) + > FileRecordReader operator) > Reason for having different operator than BlockReader is because output port > signature is different than BlockReader. > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (APEXMALHAR-2116) File Record reader module
[ https://issues.apache.org/jira/browse/APEXMALHAR-2116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15361187#comment-15361187 ] ASF GitHub Bot commented on APEXMALHAR-2116: GitHub user yogidevendra reopened a pull request: https://github.com/apache/apex-malhar/pull/326 APEXMALHAR-2116 Added FS record reader operator, module, test You can merge this pull request into a Git repository by running: $ git pull https://github.com/yogidevendra/apex-malhar APEXMALHAR-2116-record-reader Alternatively you can review and apply these changes as the patch at: https://github.com/apache/apex-malhar/pull/326.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #326 commit 506ab58d382f6f6b338486a49bad0838d77272f9 Author: yogidevendraDate: 2016-06-20T05:47:08Z Added FS record reader operator, module, test 2. incorporated review comments 3. javadoc improvements. > File Record reader module > - > > Key: APEXMALHAR-2116 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2116 > Project: Apache Apex Malhar > Issue Type: New Feature >Reporter: Yogi Devendra >Assignee: Yogi Devendra > > This will be useful for the usecases which involves reading from files "line > by line" in parallel and emit each line as seperate tuple. > Proposal is to have new Module which would allow users to monitor > directories, read files and emit data records(tuple). Records are based on > record separator (e.g. newline) or fixed size (no of bytes). > Plan is as follows: > 1. New operator FileRecordReader which will extend BlockReader. > 2. This operator will have configuration option to select mode for > FIXED_LENGTH, SEPARATOR_BASED recors. > 3. Using appropriate ReaderContext based on mode. > 4. New module FileRecordReaderModule which wraps (FileSplitter (existing) + > FileRecordReader operator) > Reason for having different operator than BlockReader is because output port > signature is different than BlockReader. > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (APEXMALHAR-2116) File Record reader module
[ https://issues.apache.org/jira/browse/APEXMALHAR-2116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15355935#comment-15355935 ] ASF GitHub Bot commented on APEXMALHAR-2116: Github user amberarrow commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/326#discussion_r69042882 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordReaderModule.java --- @@ -0,0 +1,332 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.apex.malhar.lib.fs; + +import javax.validation.constraints.Min; + +import javax.validation.constraints.NotNull; +import javax.validation.constraints.Size; + +import org.apache.apex.malhar.lib.fs.FSRecordReader.RECORD_READER_MODE; +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.api.Module; +import com.datatorrent.common.partitioner.StatelessPartitioner; +import com.datatorrent.lib.codec.KryoSerializableStreamCodec; +import com.datatorrent.lib.io.block.BlockMetadata; +import com.datatorrent.lib.io.block.FSSliceReader; +import com.datatorrent.lib.io.fs.FileSplitterInput; + +/** + * This module is used for reading records/tuples from FileSystem. Records can + * be read in parallel using multiple partitions of record reader operator. + * (Ordering is not guaranteed when records are read in parallel) + * + * Input directory is scanned at specified interval to poll for new data. + * + * The module reads data in parallel, following parameters can be configured + * + * 1. files: list of file(s)/directories to read + * 2. filePatternRegularExp: Files names matching given regex will be read + * 3. scanIntervalMillis: interval between two scans to discover new files in + * input directory + * 4. recursive: if scan recursively input directories + * 5. blockSize: block size used to read input blocks of file + * 6. readersCount: count of readers to read input file + * 7. sequentialFileRead: If emit file blocks in sequence? + * 8. blocksThreshold: number of blocks emitted per window + */ +@org.apache.hadoop.classification.InterfaceStability.Evolving +public class FSRecordReaderModule implements Module +{ + @NotNull + @Size(min = 1) + private String files; + private String filePatternRegularExp; + @Min(0) + private long scanIntervalMillis; + private boolean recursive = true; + private boolean sequentialFileRead = false; + private int readersCount; + @Min(1) + protected int blocksThreshold; + + public final transient ProxyOutputPortrecords = new ProxyOutputPort (); + + /** + * Criteria for record split + */ + private RECORD_READER_MODE mode; + + /** + * Length for fixed width record + */ + private int recordLength; + + public FileSplitterInput createFileSplitter() + { +return new FileSplitterInput(); + } + + public FSRecordReader createBlockReader() + { +FSRecordReader recordReader = new FSRecordReader(); +recordReader.setMode(mode); +recordReader.setRecordLength(recordLength); + +return recordReader; + } + + @Override + public void populateDAG(DAG dag, Configuration configuration) + { +FileSplitterInput fileSplitter = dag.addOperator("FileSplitter", createFileSplitter()); +FSRecordReader recordReader = dag.addOperator("BlockReader", createBlockReader()); + +dag.addStream("BlockMetadata", fileSplitter.blocksMetadataOutput, recordReader.blocksMetadataInput); + +if (sequentialFileRead) { + dag.setInputPortAttribute(recordReader.blocksMetadataInput, Context.PortContext.STREAM_CODEC, + new SequentialFileBlockMetadataCodec()); +} + +
[jira] [Commented] (APEXMALHAR-2116) File Record reader module
[ https://issues.apache.org/jira/browse/APEXMALHAR-2116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15355930#comment-15355930 ] ASF GitHub Bot commented on APEXMALHAR-2116: Github user amberarrow commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/326#discussion_r69042391 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordReaderModule.java --- @@ -0,0 +1,332 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.apex.malhar.lib.fs; + +import javax.validation.constraints.Min; + +import javax.validation.constraints.NotNull; +import javax.validation.constraints.Size; + +import org.apache.apex.malhar.lib.fs.FSRecordReader.RECORD_READER_MODE; +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.api.Module; +import com.datatorrent.common.partitioner.StatelessPartitioner; +import com.datatorrent.lib.codec.KryoSerializableStreamCodec; +import com.datatorrent.lib.io.block.BlockMetadata; +import com.datatorrent.lib.io.block.FSSliceReader; +import com.datatorrent.lib.io.fs.FileSplitterInput; + +/** + * This module is used for reading records/tuples from FileSystem. Records can + * be read in parallel using multiple partitions of record reader operator. + * (Ordering is not guaranteed when records are read in parallel) + * + * Input directory is scanned at specified interval to poll for new data. + * + * The module reads data in parallel, following parameters can be configured + * + * 1. files: list of file(s)/directories to read + * 2. filePatternRegularExp: Files names matching given regex will be read + * 3. scanIntervalMillis: interval between two scans to discover new files in + * input directory + * 4. recursive: if scan recursively input directories + * 5. blockSize: block size used to read input blocks of file + * 6. readersCount: count of readers to read input file + * 7. sequentialFileRead: If emit file blocks in sequence? --- End diff -- This description is not clear, please rewrite. > File Record reader module > - > > Key: APEXMALHAR-2116 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2116 > Project: Apache Apex Malhar > Issue Type: New Feature >Reporter: Yogi Devendra >Assignee: Yogi Devendra > > This will be useful for the usecases which involves reading from files "line > by line" in parallel and emit each line as seperate tuple. > Proposal is to have new Module which would allow users to monitor > directories, read files and emit data records(tuple). Records are based on > record separator (e.g. newline) or fixed size (no of bytes). > Plan is as follows: > 1. New operator FileRecordReader which will extend BlockReader. > 2. This operator will have configuration option to select mode for > FIXED_LENGTH, SEPARATOR_BASED recors. > 3. Using appropriate ReaderContext based on mode. > 4. New module FileRecordReaderModule which wraps (FileSplitter (existing) + > FileRecordReader operator) > Reason for having different operator than BlockReader is because output port > signature is different than BlockReader. > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (APEXMALHAR-2116) File Record reader module
[ https://issues.apache.org/jira/browse/APEXMALHAR-2116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15355929#comment-15355929 ] ASF GitHub Bot commented on APEXMALHAR-2116: Github user amberarrow commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/326#discussion_r69042316 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordReaderModule.java --- @@ -0,0 +1,332 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.apex.malhar.lib.fs; + +import javax.validation.constraints.Min; + +import javax.validation.constraints.NotNull; +import javax.validation.constraints.Size; + +import org.apache.apex.malhar.lib.fs.FSRecordReader.RECORD_READER_MODE; +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.api.Module; +import com.datatorrent.common.partitioner.StatelessPartitioner; +import com.datatorrent.lib.codec.KryoSerializableStreamCodec; +import com.datatorrent.lib.io.block.BlockMetadata; +import com.datatorrent.lib.io.block.FSSliceReader; +import com.datatorrent.lib.io.fs.FileSplitterInput; + +/** + * This module is used for reading records/tuples from FileSystem. Records can + * be read in parallel using multiple partitions of record reader operator. + * (Ordering is not guaranteed when records are read in parallel) + * + * Input directory is scanned at specified interval to poll for new data. + * + * The module reads data in parallel, following parameters can be configured + * + * 1. files: list of file(s)/directories to read + * 2. filePatternRegularExp: Files names matching given regex will be read + * 3. scanIntervalMillis: interval between two scans to discover new files in + * input directory + * 4. recursive: if scan recursively input directories --- End diff -- if scan recursively input directories => if true, scan input directories recursively > File Record reader module > - > > Key: APEXMALHAR-2116 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2116 > Project: Apache Apex Malhar > Issue Type: New Feature >Reporter: Yogi Devendra >Assignee: Yogi Devendra > > This will be useful for the usecases which involves reading from files "line > by line" in parallel and emit each line as seperate tuple. > Proposal is to have new Module which would allow users to monitor > directories, read files and emit data records(tuple). Records are based on > record separator (e.g. newline) or fixed size (no of bytes). > Plan is as follows: > 1. New operator FileRecordReader which will extend BlockReader. > 2. This operator will have configuration option to select mode for > FIXED_LENGTH, SEPARATOR_BASED recors. > 3. Using appropriate ReaderContext based on mode. > 4. New module FileRecordReaderModule which wraps (FileSplitter (existing) + > FileRecordReader operator) > Reason for having different operator than BlockReader is because output port > signature is different than BlockReader. > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (APEXMALHAR-2116) File Record reader module
[ https://issues.apache.org/jira/browse/APEXMALHAR-2116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15355202#comment-15355202 ] ASF GitHub Bot commented on APEXMALHAR-2116: GitHub user yogidevendra reopened a pull request: https://github.com/apache/apex-malhar/pull/326 APEXMALHAR-2116 Added FS record reader operator, module, test You can merge this pull request into a Git repository by running: $ git pull https://github.com/yogidevendra/apex-malhar APEXMALHAR-2116-record-reader Alternatively you can review and apply these changes as the patch at: https://github.com/apache/apex-malhar/pull/326.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #326 commit e5dba39f1aa5390275faf5c5938d89dd3cd78598 Author: yogidevendraDate: 2016-06-20T05:47:08Z Added FS record reader operator, module, test 2. incorporated review comments > File Record reader module > - > > Key: APEXMALHAR-2116 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2116 > Project: Apache Apex Malhar > Issue Type: New Feature >Reporter: Yogi Devendra >Assignee: Yogi Devendra > > This will be useful for the usecases which involves reading from files "line > by line" in parallel and emit each line as seperate tuple. > Proposal is to have new Module which would allow users to monitor > directories, read files and emit data records(tuple). Records are based on > record separator (e.g. newline) or fixed size (no of bytes). > Plan is as follows: > 1. New operator FileRecordReader which will extend BlockReader. > 2. This operator will have configuration option to select mode for > FIXED_LENGTH, SEPARATOR_BASED recors. > 3. Using appropriate ReaderContext based on mode. > 4. New module FileRecordReaderModule which wraps (FileSplitter (existing) + > FileRecordReader operator) > Reason for having different operator than BlockReader is because output port > signature is different than BlockReader. > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (APEXMALHAR-2116) File Record reader module
[ https://issues.apache.org/jira/browse/APEXMALHAR-2116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15355201#comment-15355201 ] ASF GitHub Bot commented on APEXMALHAR-2116: Github user yogidevendra closed the pull request at: https://github.com/apache/apex-malhar/pull/326 > File Record reader module > - > > Key: APEXMALHAR-2116 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2116 > Project: Apache Apex Malhar > Issue Type: New Feature >Reporter: Yogi Devendra >Assignee: Yogi Devendra > > This will be useful for the usecases which involves reading from files "line > by line" in parallel and emit each line as seperate tuple. > Proposal is to have new Module which would allow users to monitor > directories, read files and emit data records(tuple). Records are based on > record separator (e.g. newline) or fixed size (no of bytes). > Plan is as follows: > 1. New operator FileRecordReader which will extend BlockReader. > 2. This operator will have configuration option to select mode for > FIXED_LENGTH, SEPARATOR_BASED recors. > 3. Using appropriate ReaderContext based on mode. > 4. New module FileRecordReaderModule which wraps (FileSplitter (existing) + > FileRecordReader operator) > Reason for having different operator than BlockReader is because output port > signature is different than BlockReader. > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (APEXMALHAR-2116) File Record reader module
[ https://issues.apache.org/jira/browse/APEXMALHAR-2116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15355113#comment-15355113 ] ASF GitHub Bot commented on APEXMALHAR-2116: Github user yogidevendra closed the pull request at: https://github.com/apache/apex-malhar/pull/326 > File Record reader module > - > > Key: APEXMALHAR-2116 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2116 > Project: Apache Apex Malhar > Issue Type: New Feature >Reporter: Yogi Devendra >Assignee: Yogi Devendra > > This will be useful for the usecases which involves reading from files "line > by line" in parallel and emit each line as seperate tuple. > Proposal is to have new Module which would allow users to monitor > directories, read files and emit data records(tuple). Records are based on > record separator (e.g. newline) or fixed size (no of bytes). > Plan is as follows: > 1. New operator FileRecordReader which will extend BlockReader. > 2. This operator will have configuration option to select mode for > FIXED_LENGTH, SEPARATOR_BASED recors. > 3. Using appropriate ReaderContext based on mode. > 4. New module FileRecordReaderModule which wraps (FileSplitter (existing) + > FileRecordReader operator) > Reason for having different operator than BlockReader is because output port > signature is different than BlockReader. > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (APEXMALHAR-2116) File Record reader module
[ https://issues.apache.org/jira/browse/APEXMALHAR-2116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15355114#comment-15355114 ] ASF GitHub Bot commented on APEXMALHAR-2116: GitHub user yogidevendra reopened a pull request: https://github.com/apache/apex-malhar/pull/326 APEXMALHAR-2116 Added FS record reader operator, module, test You can merge this pull request into a Git repository by running: $ git pull https://github.com/yogidevendra/apex-malhar APEXMALHAR-2116-record-reader Alternatively you can review and apply these changes as the patch at: https://github.com/apache/apex-malhar/pull/326.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #326 commit c4a6d552c5b0c6b8ede672086f3548847311ab70 Author: yogidevendraDate: 2016-06-20T05:47:08Z Added FS record reader operator, module, test 2. incorporated review comments > File Record reader module > - > > Key: APEXMALHAR-2116 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2116 > Project: Apache Apex Malhar > Issue Type: New Feature >Reporter: Yogi Devendra >Assignee: Yogi Devendra > > This will be useful for the usecases which involves reading from files "line > by line" in parallel and emit each line as seperate tuple. > Proposal is to have new Module which would allow users to monitor > directories, read files and emit data records(tuple). Records are based on > record separator (e.g. newline) or fixed size (no of bytes). > Plan is as follows: > 1. New operator FileRecordReader which will extend BlockReader. > 2. This operator will have configuration option to select mode for > FIXED_LENGTH, SEPARATOR_BASED recors. > 3. Using appropriate ReaderContext based on mode. > 4. New module FileRecordReaderModule which wraps (FileSplitter (existing) + > FileRecordReader operator) > Reason for having different operator than BlockReader is because output port > signature is different than BlockReader. > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (APEXMALHAR-2116) File Record reader module
[ https://issues.apache.org/jira/browse/APEXMALHAR-2116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15352095#comment-15352095 ] ASF GitHub Bot commented on APEXMALHAR-2116: Github user amberarrow commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/326#discussion_r68678627 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordReader.java --- @@ -0,0 +1,180 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.apex.malhar.lib.fs; + +import java.io.IOException; + +import org.apache.commons.beanutils.ConvertUtils; +import org.apache.commons.beanutils.converters.AbstractConverter; +import org.apache.hadoop.fs.FSDataInputStream; + +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.lib.io.block.BlockMetadata; +import com.datatorrent.lib.io.block.FSSliceReader; +import com.datatorrent.lib.io.block.ReaderContext; + +/** + * This operator can be used for reading records/tuples from Filesystem in + * parallel (without ordering guarantees between tuples). Records can be + * delimited (e.g. newline) or fixed width records. Output tuples are byte[]. + * + * Typically, this operator will be connected to output of FileSplitterInput to + * read records in parallel. + */ +@org.apache.hadoop.classification.InterfaceStability.Evolving +public class FSRecordReader extends FSSliceReader +{ + /** + * Record reader mode decides how to split the records. + */ + public static enum RECORD_READER_MODE + { +DELIMITED_RECORD, FIXED_WIDTH_RECORD; + } + + /** + * Criteria for record split + */ + private RECORD_READER_MODE mode = RECORD_READER_MODE.DELIMITED_RECORD; + + /** + * Length for fixed width record + */ + private int recordLength; + + /** + * Port to emit individual records/tuples as byte[] + */ + public final transient DefaultOutputPortrecords = new DefaultOutputPort (); + + /** + * Initialize appropriate reader context based on mode selection + */ + @Override + public void setup(OperatorContext context) + { +super.setup(context); +if (mode == RECORD_READER_MODE.FIXED_WIDTH_RECORD) { + ReaderContext.FixedBytesReaderContext fixedBytesReaderContext = new ReaderContext.FixedBytesReaderContext(); + fixedBytesReaderContext.setLength(recordLength); + readerContext = fixedBytesReaderContext; +} else { + readerContext = new ReaderContext.ReadAheadLineReaderContext(); +} + } + + /** + * Read the block data and emit records based on reader context + * + * @param blockMetadata + * block + * @throws IOException + */ + protected void readBlock(BlockMetadata blockMetadata) throws IOException + { +readerContext.initialize(stream, blockMetadata, consecutiveBlock); +ReaderContext.Entity entity; +while ((entity = readerContext.next()) != null) { + + counters.getCounter(ReaderCounterKeys.BYTES).add(entity.getUsedBytes()); + + byte[] record = entity.getRecord(); + + if (record != null) { +counters.getCounter(ReaderCounterKeys.RECORDS).increment(); +records.emit(record); + } +} + } + + /** + * Criteria for record split + * + * @param mode + * Mode + */ + public void setMode(RECORD_READER_MODE mode) + { +this.mode = mode; + } + + /** + * Criteria for record split + * + * @return mode + */ + public RECORD_READER_MODE getMode() + { +return mode; + } + + /** + * Length for fixed width record + * + * @param
[jira] [Commented] (APEXMALHAR-2116) File Record reader module
[ https://issues.apache.org/jira/browse/APEXMALHAR-2116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15346391#comment-15346391 ] ASF GitHub Bot commented on APEXMALHAR-2116: Github user amberarrow commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/326#discussion_r68229454 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordReaderModule.java --- @@ -0,0 +1,331 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.apex.malhar.lib.fs; + +import javax.validation.constraints.Min; + +import javax.validation.constraints.NotNull; +import javax.validation.constraints.Size; + +import org.apache.apex.malhar.lib.fs.FSRecordReader.RECORD_READER_MODE; +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.api.Module; +import com.datatorrent.common.partitioner.StatelessPartitioner; +import com.datatorrent.lib.codec.KryoSerializableStreamCodec; +import com.datatorrent.lib.io.block.BlockMetadata; +import com.datatorrent.lib.io.block.FSSliceReader; +import com.datatorrent.lib.io.fs.FileSplitterInput; + +/** + * This module is used for reading records/tuples from FileSystem. Records can + * be read in parallel using multiple partitions of record reader operator. + * (Ordering is not guaranteed when records are read in parallel) + * + * Input directory is scanned at specified interval to poll for new data. + * + * The module reads data in parallel, following parameters can be configured + * + * 1. files: list of file(s)/directories to read + * 2. filePatternRegularExp: Files names matching given regex will be read + * 3. scanIntervalMillis: interval between two scans to discover new files in + * input directory + * 4. recursive: if scan recursively input directories + * 5. blockSize: block size used to read input blocks of file + * 6. readersCount: count of readers to read input file + * 7. sequencialFileRead: If emit file blocks in sequence? + * 8. blocksThreshold: number of blocks emitted per window + */ +@org.apache.hadoop.classification.InterfaceStability.Evolving +public class FSRecordReaderModule implements Module +{ + @NotNull + @Size(min = 1) + private String files; + private String filePatternRegularExp; + @Min(0) + private long scanIntervalMillis; + private boolean recursive = true; + private boolean sequencialFileRead = false; --- End diff -- sequencial => sequential > File Record reader module > - > > Key: APEXMALHAR-2116 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2116 > Project: Apache Apex Malhar > Issue Type: New Feature >Reporter: Yogi Devendra >Assignee: Yogi Devendra > > This will be useful for the usecases which involves reading from files "line > by line" in parallel and emit each line as seperate tuple. > Proposal is to have new Module which would allow users to monitor > directories, read files and emit data records(tuple). Records are based on > record separator (e.g. newline) or fixed size (no of bytes). > Plan is as follows: > 1. New operator FileRecordReader which will extend BlockReader. > 2. This operator will have configuration option to select mode for > FIXED_LENGTH, SEPARATOR_BASED recors. > 3. Using appropriate ReaderContext based on mode. > 4. New module FileRecordReaderModule which wraps (FileSplitter (existing) + > FileRecordReader operator) > Reason for having different operator than BlockReader is because output port > signature is different than BlockReader. > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (APEXMALHAR-2116) File Record reader module
[ https://issues.apache.org/jira/browse/APEXMALHAR-2116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15346390#comment-15346390 ] ASF GitHub Bot commented on APEXMALHAR-2116: Github user amberarrow commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/326#discussion_r68229363 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordReaderModule.java --- @@ -0,0 +1,331 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.apex.malhar.lib.fs; + +import javax.validation.constraints.Min; + +import javax.validation.constraints.NotNull; +import javax.validation.constraints.Size; + +import org.apache.apex.malhar.lib.fs.FSRecordReader.RECORD_READER_MODE; +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.api.Module; +import com.datatorrent.common.partitioner.StatelessPartitioner; +import com.datatorrent.lib.codec.KryoSerializableStreamCodec; +import com.datatorrent.lib.io.block.BlockMetadata; +import com.datatorrent.lib.io.block.FSSliceReader; +import com.datatorrent.lib.io.fs.FileSplitterInput; + +/** + * This module is used for reading records/tuples from FileSystem. Records can + * be read in parallel using multiple partitions of record reader operator. + * (Ordering is not guaranteed when records are read in parallel) + * + * Input directory is scanned at specified interval to poll for new data. + * + * The module reads data in parallel, following parameters can be configured + * + * 1. files: list of file(s)/directories to read + * 2. filePatternRegularExp: Files names matching given regex will be read + * 3. scanIntervalMillis: interval between two scans to discover new files in + * input directory + * 4. recursive: if scan recursively input directories + * 5. blockSize: block size used to read input blocks of file + * 6. readersCount: count of readers to read input file + * 7. sequencialFileRead: If emit file blocks in sequence? --- End diff -- sequencial => sequential > File Record reader module > - > > Key: APEXMALHAR-2116 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2116 > Project: Apache Apex Malhar > Issue Type: New Feature >Reporter: Yogi Devendra >Assignee: Yogi Devendra > > This will be useful for the usecases which involves reading from files "line > by line" in parallel and emit each line as seperate tuple. > Proposal is to have new Module which would allow users to monitor > directories, read files and emit data records(tuple). Records are based on > record separator (e.g. newline) or fixed size (no of bytes). > Plan is as follows: > 1. New operator FileRecordReader which will extend BlockReader. > 2. This operator will have configuration option to select mode for > FIXED_LENGTH, SEPARATOR_BASED recors. > 3. Using appropriate ReaderContext based on mode. > 4. New module FileRecordReaderModule which wraps (FileSplitter (existing) + > FileRecordReader operator) > Reason for having different operator than BlockReader is because output port > signature is different than BlockReader. > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (APEXMALHAR-2116) File Record reader module
[ https://issues.apache.org/jira/browse/APEXMALHAR-2116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15343027#comment-15343027 ] ASF GitHub Bot commented on APEXMALHAR-2116: Github user amberarrow commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/326#discussion_r67971098 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordReaderModule.java --- @@ -0,0 +1,317 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.apex.malhar.lib.fs; + +import javax.validation.constraints.Min; + +import javax.validation.constraints.NotNull; +import javax.validation.constraints.Size; + +import org.apache.apex.malhar.lib.fs.FSRecordReader.RECORD_READER_MODE; +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.api.Module; +import com.datatorrent.common.partitioner.StatelessPartitioner; +import com.datatorrent.lib.codec.KryoSerializableStreamCodec; +import com.datatorrent.lib.io.block.BlockMetadata; +import com.datatorrent.lib.io.block.FSSliceReader; +import com.datatorrent.lib.io.fs.FileSplitterInput; + +/** + * This module is used for reading records/tuples from FileSystem. + * Records can be read in parallel using multiple partitions of record reader operator. + * (Ordering is not guaranteed when records are read in parallel) + * + * Input directory is scanned at specified interval to poll for new data. + * + * The module reads data in parallel, following parameters can be configured + * 1. files: list of file(s)/directories to read + * 2. filePatternRegularExp: Files names matching given regex will be read + * 3. scanIntervalMillis: interval between two scans to discover new files in input directory + * 4. recursive: if scan recursively input directories + * 5. blockSize: block size used to read input blocks of file + * 6. readersCount: count of readers to read input file + * 7. sequencialFileRead: If emit file blocks in sequence? + * 8. blocksThreshold: number of blocks emitted per window + */ +@org.apache.hadoop.classification.InterfaceStability.Evolving +public class FSRecordReaderModule implements Module +{ + @NotNull + @Size(min = 1) + private String files; + private String filePatternRegularExp; + @Min(0) + private long scanIntervalMillis; + private boolean recursive = true; + private boolean sequencialFileRead = false; + private int readersCount; + @Min(1) + protected int blocksThreshold; + + public final transient ProxyOutputPortrecords = new ProxyOutputPort (); + + /** + * Criteria for record split + */ + private RECORD_READER_MODE mode; + + /** + * Length for fixed width record + */ + private int recordLength; + + public FileSplitterInput createFileSplitter() + { +return new FileSplitterInput(); + } + + public FSRecordReader createBlockReader() + { +FSRecordReader recordReader = new FSRecordReader(); +recordReader.setMode(mode); +recordReader.setRecordLength(recordLength); + +return recordReader; + } + + @Override + public void populateDAG(DAG dag, Configuration configuration) + { +FileSplitterInput fileSplitter = dag.addOperator("FileSplitter", createFileSplitter()); +FSRecordReader recordReader = dag.addOperator("BlockReader", createBlockReader()); + +dag.addStream("BlockMetadata", fileSplitter.blocksMetadataOutput, recordReader.blocksMetadataInput); + +if (sequencialFileRead) { + dag.setInputPortAttribute(recordReader.blocksMetadataInput, Context.PortContext.STREAM_CODEC, + new SequentialFileBlockMetadataCodec()); +} + +
[jira] [Commented] (APEXMALHAR-2116) File Record reader module
[ https://issues.apache.org/jira/browse/APEXMALHAR-2116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15343016#comment-15343016 ] ASF GitHub Bot commented on APEXMALHAR-2116: Github user amberarrow commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/326#discussion_r67970457 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordReader.java --- @@ -0,0 +1,175 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.apex.malhar.lib.fs; + +import java.io.IOException; + +import org.apache.commons.beanutils.ConvertUtils; +import org.apache.commons.beanutils.converters.AbstractConverter; +import org.apache.hadoop.fs.FSDataInputStream; + +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.lib.io.block.BlockMetadata; +import com.datatorrent.lib.io.block.FSSliceReader; +import com.datatorrent.lib.io.block.ReaderContext; + +/** + * This operator can be used for reading records/tuples from Filesystem + * in parallel (without ordering guarantees between tuples). + * Records can be delimited (e.g. newline) or fixed with records. + * Output tuples are byte[]. + * + * Typically, this operator will be connected to output of FileSplitterInput + * to read records in parallel. + */ +@org.apache.hadoop.classification.InterfaceStability.Evolving +public class FSRecordReader extends FSSliceReader +{ + /** + * Record reader mode decides how to split the records. + */ + public static enum RECORD_READER_MODE + { +DELIMITED_RECORD, FIXED_WIDTH_RECORD; + } + + /** + * Criteria for record split + */ + private RECORD_READER_MODE mode = RECORD_READER_MODE.DELIMITED_RECORD; + + /** + * Length for fixed width record + */ + private int recordLength; + + /** + * Port to emit individual records/tuples as byte[] + */ + public final transient DefaultOutputPortrecords = new DefaultOutputPort (); + + /** + * Initialize appropriate reader context based on mode selection + */ + @Override + public void setup(OperatorContext context) + { +super.setup(context); +if (mode == RECORD_READER_MODE.FIXED_WIDTH_RECORD) { + ReaderContext.FixedBytesReaderContext fixedBytesReaderContext = new ReaderContext.FixedBytesReaderContext(); + fixedBytesReaderContext.setLength(recordLength); + readerContext = fixedBytesReaderContext; +} else { + readerContext = new ReaderContext.ReadAheadLineReaderContext(); +} + } + + /** + * Read the block data and emit records based on reader context + * + * @param blockMetadata block + * @throws IOException + */ + protected void readBlock(BlockMetadata blockMetadata) throws IOException + { +readerContext.initialize(stream, blockMetadata, consecutiveBlock); +ReaderContext.Entity entity; +while ((entity = readerContext.next()) != null) { + + counters.getCounter(ReaderCounterKeys.BYTES).add(entity.getUsedBytes()); + + byte[] record = entity.getRecord(); + + //If the record is partial then ignore the record. --- End diff -- why 'partial' rather than 'null' ? > File Record reader module > - > > Key: APEXMALHAR-2116 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2116 > Project: Apache Apex Malhar > Issue Type: New Feature >Reporter: Yogi Devendra >Assignee: Yogi Devendra > > This will be useful for the usecases which involves reading from files "line > by line" in parallel and emit each line as seperate tuple. > Proposal is to have new Module which would allow users to monitor >
[jira] [Commented] (APEXMALHAR-2116) File Record reader module
[ https://issues.apache.org/jira/browse/APEXMALHAR-2116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15342995#comment-15342995 ] ASF GitHub Bot commented on APEXMALHAR-2116: Github user amberarrow commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/326#discussion_r67969125 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordReader.java --- @@ -0,0 +1,175 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.apex.malhar.lib.fs; + +import java.io.IOException; + +import org.apache.commons.beanutils.ConvertUtils; +import org.apache.commons.beanutils.converters.AbstractConverter; +import org.apache.hadoop.fs.FSDataInputStream; + +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.lib.io.block.BlockMetadata; +import com.datatorrent.lib.io.block.FSSliceReader; +import com.datatorrent.lib.io.block.ReaderContext; + +/** + * This operator can be used for reading records/tuples from Filesystem + * in parallel (without ordering guarantees between tuples). + * Records can be delimited (e.g. newline) or fixed with records. --- End diff -- fixed with => fixed width > File Record reader module > - > > Key: APEXMALHAR-2116 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2116 > Project: Apache Apex Malhar > Issue Type: New Feature >Reporter: Yogi Devendra >Assignee: Yogi Devendra > > This will be useful for the usecases which involves reading from files "line > by line" in parallel and emit each line as seperate tuple. > Proposal is to have new Module which would allow users to monitor > directories, read files and emit data records(tuple). Records are based on > record separator (e.g. newline) or fixed size (no of bytes). > Plan is as follows: > 1. New operator FileRecordReader which will extend BlockReader. > 2. This operator will have configuration option to select mode for > FIXED_LENGTH, SEPARATOR_BASED recors. > 3. Using appropriate ReaderContext based on mode. > 4. New module FileRecordReaderModule which wraps (FileSplitter (existing) + > FileRecordReader operator) > Reason for having different operator than BlockReader is because output port > signature is different than BlockReader. > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (APEXMALHAR-2116) File Record reader module
[ https://issues.apache.org/jira/browse/APEXMALHAR-2116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15342992#comment-15342992 ] ASF GitHub Bot commented on APEXMALHAR-2116: Github user amberarrow commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/326#discussion_r67968963 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordReader.java --- @@ -0,0 +1,175 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.apex.malhar.lib.fs; + +import java.io.IOException; + +import org.apache.commons.beanutils.ConvertUtils; +import org.apache.commons.beanutils.converters.AbstractConverter; +import org.apache.hadoop.fs.FSDataInputStream; + +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.lib.io.block.BlockMetadata; +import com.datatorrent.lib.io.block.FSSliceReader; +import com.datatorrent.lib.io.block.ReaderContext; + +/** + * This operator can be used for reading records/tuples from Filesystem --- End diff -- This line and several others have trailing whitespace; this is generally considered not a good thing -- see for instance: http://programmers.stackexchange.com/questions/121555/why-is-trailing-whitespace-a-big-deal Suggest removing all trailing white space. > File Record reader module > - > > Key: APEXMALHAR-2116 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2116 > Project: Apache Apex Malhar > Issue Type: New Feature >Reporter: Yogi Devendra >Assignee: Yogi Devendra > > This will be useful for the usecases which involves reading from files "line > by line" in parallel and emit each line as seperate tuple. > Proposal is to have new Module which would allow users to monitor > directories, read files and emit data records(tuple). Records are based on > record separator (e.g. newline) or fixed size (no of bytes). > Plan is as follows: > 1. New operator FileRecordReader which will extend BlockReader. > 2. This operator will have configuration option to select mode for > FIXED_LENGTH, SEPARATOR_BASED recors. > 3. Using appropriate ReaderContext based on mode. > 4. New module FileRecordReaderModule which wraps (FileSplitter (existing) + > FileRecordReader operator) > Reason for having different operator than BlockReader is because output port > signature is different than BlockReader. > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (APEXMALHAR-2116) File Record reader module
[ https://issues.apache.org/jira/browse/APEXMALHAR-2116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15341888#comment-15341888 ] ASF GitHub Bot commented on APEXMALHAR-2116: GitHub user yogidevendra opened a pull request: https://github.com/apache/apex-malhar/pull/326 APEXMALHAR-2116 Added FS record reader operator, module, test You can merge this pull request into a Git repository by running: $ git pull https://github.com/yogidevendra/apex-malhar APEXMALHAR-2116-record-reader Alternatively you can review and apply these changes as the patch at: https://github.com/apache/apex-malhar/pull/326.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #326 commit aaeb7fe34dc69de71dae120ea01452149389854f Author: yogidevendraDate: 2016-06-20T05:47:08Z Added FS record reader operator, module, test > File Record reader module > - > > Key: APEXMALHAR-2116 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2116 > Project: Apache Apex Malhar > Issue Type: New Feature >Reporter: Yogi Devendra >Assignee: Yogi Devendra > > This will be useful for the usecases which involves reading from files "line > by line" in parallel and emit each line as seperate tuple. > Proposal is to have new Module which would allow users to monitor > directories, read files and emit data records(tuple). Records are based on > record separator (e.g. newline) or fixed size (no of bytes). > Plan is as follows: > 1. New operator FileRecordReader which will extend BlockReader. > 2. This operator will have configuration option to select mode for > FIXED_LENGTH, SEPARATOR_BASED recors. > 3. Using appropriate ReaderContext based on mode. > 4. New module FileRecordReaderModule which wraps (FileSplitter (existing) + > FileRecordReader operator) > Reason for having different operator than BlockReader is because output port > signature is different than BlockReader. > -- This message was sent by Atlassian JIRA (v6.3.4#6332)