[jira] [Commented] (APEXMALHAR-2116) File Record reader module

2016-07-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15401320#comment-15401320
 ] 

ASF GitHub Bot commented on APEXMALHAR-2116:


Github user asfgit closed the pull request at:

https://github.com/apache/apex-malhar/pull/326


> File Record reader module
> -
>
> Key: APEXMALHAR-2116
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2116
> Project: Apache Apex Malhar
>  Issue Type: New Feature
>Reporter: Yogi Devendra
>Assignee: Yogi Devendra
>
> This will be useful for the usecases which involves reading from files "line 
> by line" in parallel and emit each line as seperate tuple.
> Proposal is to have new Module which would allow users to monitor 
> directories, read files and emit data records(tuple). Records are based on 
> record separator (e.g. newline) or fixed size (no of bytes). 
> Plan is as follows:
> 1. New operator FileRecordReader which will extend BlockReader.
> 2. This operator will have configuration option to select mode for 
> FIXED_LENGTH, SEPARATOR_BASED recors. 
> 3. Using appropriate ReaderContext based on mode.
> 4. New module FileRecordReaderModule which wraps (FileSplitter (existing) + 
> FileRecordReader operator)
> Reason for having different operator than BlockReader is because output port 
> signature is different than BlockReader.
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (APEXMALHAR-2116) File Record reader module

2016-07-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15379410#comment-15379410
 ] 

ASF GitHub Bot commented on APEXMALHAR-2116:


Github user amberarrow commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/326#discussion_r70977817
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordReaderModule.java 
---
@@ -0,0 +1,344 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.lib.fs;
+
+import javax.validation.constraints.Min;
+
+import javax.validation.constraints.NotNull;
+import javax.validation.constraints.Size;
+
+import org.apache.apex.malhar.lib.fs.FSRecordReader.RECORD_READER_MODE;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.Module;
+import com.datatorrent.common.partitioner.StatelessPartitioner;
+import com.datatorrent.lib.codec.KryoSerializableStreamCodec;
+import com.datatorrent.lib.io.block.BlockMetadata;
+import com.datatorrent.lib.io.block.FSSliceReader;
+import com.datatorrent.lib.io.fs.FileSplitterInput;
+
+/**
+ * This module is used for reading records/tuples from FileSystem. Records 
can
+ * be read in parallel using multiple partitions of record reader operator.
+ * (Ordering is not guaranteed when records are read in parallel)
+ *
+ * Input directory is scanned at specified interval to poll for new data.
+ * 
+ * The module reads data in parallel, following parameters can be 
configured
+ * 
+ * 1. files: list of file(s)/directories to read
+ * 2. filePatternRegularExp: Files with names matching given regex will be 
read
+ * 3. scanIntervalMillis: interval between two scans to discover new files 
in
+ * input directory
+ * 4. recursive: if true, scan input directories recursively
+ * 5. blockSize: block size used to read input blocks of file
+ * 6. readersCount: count of readers to read input file
+ * 7. sequentialFileRead: if true, then each reader partition will read 
different file. 
+ *instead of reading different offsets of the same file. 
+ *(File level parallelism instead of block level parallelism)
+ * 8. blocksThreshold: number of blocks emitted per window
+ */
+@org.apache.hadoop.classification.InterfaceStability.Evolving
+public class FSRecordReaderModule implements Module
+{
+  @NotNull
+  @Size(min = 1)
+  private String files;
+  private String filePatternRegularExp;
+  @Min(1)
+  private long scanIntervalMillis = 5000;
+  private boolean recursive = true;
+  private boolean sequentialFileRead = false;
+  @Min(1)
+  private int readersCount = 1;
+  @Min(1)
+  protected int blocksThreshold = 1;
+
+  public final transient ProxyOutputPort records = new 
ProxyOutputPort();
+
+  /**
+   * Criteria for record split
+   */
+  private RECORD_READER_MODE mode = RECORD_READER_MODE.DELIMITED_RECORD;
+
+  /**
+   * Length for fixed width record
+   */
+  @Min(1)
+  private int recordLength;
--- End diff --

Adding another class seems like overkill; how about removing the annotation 
and adding a check in the code to ensure that the value is positive if mode is 
FIXED_WIDTH_RECORD ?


> File Record reader module
> -
>
> Key: APEXMALHAR-2116
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2116
> Project: Apache Apex Malhar
>  Issue Type: New Feature
>Reporter: Yogi Devendra
>Assignee: Yogi Devendra
>
> This will be useful for the usecases which involves reading from files "line 
> by line" in parallel and emit each line as seperate tuple.
> Proposal is to have new Module which would allow users to monitor 
> directories, 

[jira] [Commented] (APEXMALHAR-2116) File Record reader module

2016-07-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15378938#comment-15378938
 ] 

ASF GitHub Bot commented on APEXMALHAR-2116:


Github user yogidevendra commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/326#discussion_r70930388
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordReaderModule.java 
---
@@ -0,0 +1,344 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.lib.fs;
+
+import javax.validation.constraints.Min;
+
+import javax.validation.constraints.NotNull;
+import javax.validation.constraints.Size;
+
+import org.apache.apex.malhar.lib.fs.FSRecordReader.RECORD_READER_MODE;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.Module;
+import com.datatorrent.common.partitioner.StatelessPartitioner;
+import com.datatorrent.lib.codec.KryoSerializableStreamCodec;
+import com.datatorrent.lib.io.block.BlockMetadata;
+import com.datatorrent.lib.io.block.FSSliceReader;
+import com.datatorrent.lib.io.fs.FileSplitterInput;
+
+/**
+ * This module is used for reading records/tuples from FileSystem. Records 
can
+ * be read in parallel using multiple partitions of record reader operator.
+ * (Ordering is not guaranteed when records are read in parallel)
+ *
+ * Input directory is scanned at specified interval to poll for new data.
+ * 
+ * The module reads data in parallel, following parameters can be 
configured
+ * 
+ * 1. files: list of file(s)/directories to read
+ * 2. filePatternRegularExp: Files with names matching given regex will be 
read
+ * 3. scanIntervalMillis: interval between two scans to discover new files 
in
+ * input directory
+ * 4. recursive: if true, scan input directories recursively
+ * 5. blockSize: block size used to read input blocks of file
+ * 6. readersCount: count of readers to read input file
+ * 7. sequentialFileRead: if true, then each reader partition will read 
different file. 
+ *instead of reading different offsets of the same file. 
+ *(File level parallelism instead of block level parallelism)
+ * 8. blocksThreshold: number of blocks emitted per window
+ */
+@org.apache.hadoop.classification.InterfaceStability.Evolving
+public class FSRecordReaderModule implements Module
+{
+  @NotNull
+  @Size(min = 1)
+  private String files;
+  private String filePatternRegularExp;
+  @Min(1)
+  private long scanIntervalMillis = 5000;
+  private boolean recursive = true;
+  private boolean sequentialFileRead = false;
+  @Min(1)
+  private int readersCount = 1;
+  @Min(1)
+  protected int blocksThreshold = 1;
+
+  public final transient ProxyOutputPort records = new 
ProxyOutputPort();
+
+  /**
+   * Criteria for record split
+   */
+  private RECORD_READER_MODE mode = RECORD_READER_MODE.DELIMITED_RECORD;
+
+  /**
+   * Length for fixed width record
+   */
+  @Min(1)
+  private int recordLength;
--- End diff --

Oh. Good point. 
Should we separate this into two different classes. FixedWidthRecordReader, 
DelimitedRecordReader? That will make configuration clean. 

@amberarrow Any thoughts?


> File Record reader module
> -
>
> Key: APEXMALHAR-2116
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2116
> Project: Apache Apex Malhar
>  Issue Type: New Feature
>Reporter: Yogi Devendra
>Assignee: Yogi Devendra
>
> This will be useful for the usecases which involves reading from files "line 
> by line" in parallel and emit each line as seperate tuple.
> Proposal is to have new Module which would allow users to monitor 
> 

[jira] [Commented] (APEXMALHAR-2116) File Record reader module

2016-07-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15378933#comment-15378933
 ] 

ASF GitHub Bot commented on APEXMALHAR-2116:


Github user DT-Priyanka commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/326#discussion_r70930124
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordReaderModule.java 
---
@@ -0,0 +1,344 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.lib.fs;
+
+import javax.validation.constraints.Min;
+
+import javax.validation.constraints.NotNull;
+import javax.validation.constraints.Size;
+
+import org.apache.apex.malhar.lib.fs.FSRecordReader.RECORD_READER_MODE;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.Module;
+import com.datatorrent.common.partitioner.StatelessPartitioner;
+import com.datatorrent.lib.codec.KryoSerializableStreamCodec;
+import com.datatorrent.lib.io.block.BlockMetadata;
+import com.datatorrent.lib.io.block.FSSliceReader;
+import com.datatorrent.lib.io.fs.FileSplitterInput;
+
+/**
+ * This module is used for reading records/tuples from FileSystem. Records 
can
+ * be read in parallel using multiple partitions of record reader operator.
+ * (Ordering is not guaranteed when records are read in parallel)
+ *
+ * Input directory is scanned at specified interval to poll for new data.
+ * 
+ * The module reads data in parallel, following parameters can be 
configured
+ * 
+ * 1. files: list of file(s)/directories to read
+ * 2. filePatternRegularExp: Files with names matching given regex will be 
read
+ * 3. scanIntervalMillis: interval between two scans to discover new files 
in
+ * input directory
+ * 4. recursive: if true, scan input directories recursively
+ * 5. blockSize: block size used to read input blocks of file
+ * 6. readersCount: count of readers to read input file
+ * 7. sequentialFileRead: if true, then each reader partition will read 
different file. 
+ *instead of reading different offsets of the same file. 
+ *(File level parallelism instead of block level parallelism)
+ * 8. blocksThreshold: number of blocks emitted per window
+ */
+@org.apache.hadoop.classification.InterfaceStability.Evolving
+public class FSRecordReaderModule implements Module
+{
+  @NotNull
+  @Size(min = 1)
+  private String files;
+  private String filePatternRegularExp;
+  @Min(1)
+  private long scanIntervalMillis = 5000;
+  private boolean recursive = true;
+  private boolean sequentialFileRead = false;
+  @Min(1)
+  private int readersCount = 1;
+  @Min(1)
+  protected int blocksThreshold = 1;
+
+  public final transient ProxyOutputPort records = new 
ProxyOutputPort();
+
+  /**
+   * Criteria for record split
+   */
+  private RECORD_READER_MODE mode = RECORD_READER_MODE.DELIMITED_RECORD;
+
+  /**
+   * Length for fixed width record
+   */
+  @Min(1)
+  private int recordLength;
--- End diff --

Record length will be optional in case of DELIMITED_RECORD and is 
compulsory in case of FIXED_LENGHT records. so keeping it either optional or 
compulsory field both have pros and cons. 


> File Record reader module
> -
>
> Key: APEXMALHAR-2116
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2116
> Project: Apache Apex Malhar
>  Issue Type: New Feature
>Reporter: Yogi Devendra
>Assignee: Yogi Devendra
>
> This will be useful for the usecases which involves reading from files "line 
> by line" in parallel and emit each line as seperate tuple.
> Proposal is to have new Module which would allow users to monitor 
> 

[jira] [Commented] (APEXMALHAR-2116) File Record reader module

2016-07-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15367309#comment-15367309
 ] 

ASF GitHub Bot commented on APEXMALHAR-2116:


Github user yogidevendra commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/326#discussion_r70032545
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordReaderModule.java 
---
@@ -0,0 +1,334 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.lib.fs;
+
+import javax.validation.constraints.Min;
+
+import javax.validation.constraints.NotNull;
+import javax.validation.constraints.Size;
+
+import org.apache.apex.malhar.lib.fs.FSRecordReader.RECORD_READER_MODE;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.Module;
+import com.datatorrent.common.partitioner.StatelessPartitioner;
+import com.datatorrent.lib.codec.KryoSerializableStreamCodec;
+import com.datatorrent.lib.io.block.BlockMetadata;
+import com.datatorrent.lib.io.block.FSSliceReader;
+import com.datatorrent.lib.io.fs.FileSplitterInput;
+
+/**
+ * This module is used for reading records/tuples from FileSystem. Records 
can
+ * be read in parallel using multiple partitions of record reader operator.
+ * (Ordering is not guaranteed when records are read in parallel)
+ *
+ * Input directory is scanned at specified interval to poll for new data.
+ * 
+ * The module reads data in parallel, following parameters can be 
configured
+ * 
+ * 1. files: list of file(s)/directories to read
+ * 2. filePatternRegularExp: Files with names matching given regex will be 
read
+ * 3. scanIntervalMillis: interval between two scans to discover new files 
in
+ * input directory
+ * 4. recursive: if true, scan input directories recursively
+ * 5. blockSize: block size used to read input blocks of file
+ * 6. readersCount: count of readers to read input file
+ * 7. sequentialFileRead: if true, then each reader partition will read 
different file. 
+ *instead of reading different offsets of the same file. 
+ *(File level parallelism instead of block level parallelism)
+ * 8. blocksThreshold: number of blocks emitted per window
+ */
+@org.apache.hadoop.classification.InterfaceStability.Evolving
+public class FSRecordReaderModule implements Module
+{
+  @NotNull
+  @Size(min = 1)
+  private String files;
+  private String filePatternRegularExp;
+  @Min(0)
+  private long scanIntervalMillis;
+  private boolean recursive = true;
+  private boolean sequentialFileRead = false;
+  private int readersCount;
+  @Min(1)
+  protected int blocksThreshold;
+
+  public final transient ProxyOutputPort records = new 
ProxyOutputPort();
+
+  /**
+   * Criteria for record split
+   */
+  private RECORD_READER_MODE mode;
+
+  /**
+   * Length for fixed width record
+   */
+  private int recordLength;
+
+  public FileSplitterInput createFileSplitter()
+  {
+return new FileSplitterInput();
+  }
+
+  public FSRecordReader createBlockReader()
+  {
+FSRecordReader recordReader = new FSRecordReader();
+recordReader.setMode(mode);
+recordReader.setRecordLength(recordLength);
+
+return recordReader;
+  }
+
+  @Override
+  public void populateDAG(DAG dag, Configuration configuration)
+  {
+FileSplitterInput fileSplitter = dag.addOperator("FileSplitter", 
createFileSplitter());
+FSRecordReader recordReader = dag.addOperator("BlockReader", 
createBlockReader());
+
+dag.addStream("BlockMetadata", fileSplitter.blocksMetadataOutput, 
recordReader.blocksMetadataInput);
+
+if (sequentialFileRead) {
+  

[jira] [Commented] (APEXMALHAR-2116) File Record reader module

2016-07-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15366023#comment-15366023
 ] 

ASF GitHub Bot commented on APEXMALHAR-2116:


Github user DT-Priyanka commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/326#discussion_r69897823
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordReaderModule.java 
---
@@ -0,0 +1,334 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.lib.fs;
+
+import javax.validation.constraints.Min;
+
+import javax.validation.constraints.NotNull;
+import javax.validation.constraints.Size;
+
+import org.apache.apex.malhar.lib.fs.FSRecordReader.RECORD_READER_MODE;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.Module;
+import com.datatorrent.common.partitioner.StatelessPartitioner;
+import com.datatorrent.lib.codec.KryoSerializableStreamCodec;
+import com.datatorrent.lib.io.block.BlockMetadata;
+import com.datatorrent.lib.io.block.FSSliceReader;
+import com.datatorrent.lib.io.fs.FileSplitterInput;
+
+/**
+ * This module is used for reading records/tuples from FileSystem. Records 
can
+ * be read in parallel using multiple partitions of record reader operator.
+ * (Ordering is not guaranteed when records are read in parallel)
+ *
+ * Input directory is scanned at specified interval to poll for new data.
+ * 
+ * The module reads data in parallel, following parameters can be 
configured
+ * 
+ * 1. files: list of file(s)/directories to read
+ * 2. filePatternRegularExp: Files with names matching given regex will be 
read
+ * 3. scanIntervalMillis: interval between two scans to discover new files 
in
+ * input directory
+ * 4. recursive: if true, scan input directories recursively
+ * 5. blockSize: block size used to read input blocks of file
+ * 6. readersCount: count of readers to read input file
+ * 7. sequentialFileRead: if true, then each reader partition will read 
different file. 
+ *instead of reading different offsets of the same file. 
+ *(File level parallelism instead of block level parallelism)
+ * 8. blocksThreshold: number of blocks emitted per window
+ */
+@org.apache.hadoop.classification.InterfaceStability.Evolving
+public class FSRecordReaderModule implements Module
+{
+  @NotNull
+  @Size(min = 1)
+  private String files;
+  private String filePatternRegularExp;
+  @Min(0)
+  private long scanIntervalMillis;
+  private boolean recursive = true;
+  private boolean sequentialFileRead = false;
+  private int readersCount;
+  @Min(1)
+  protected int blocksThreshold;
+
+  public final transient ProxyOutputPort records = new 
ProxyOutputPort();
+
+  /**
+   * Criteria for record split
+   */
+  private RECORD_READER_MODE mode;
+
+  /**
+   * Length for fixed width record
+   */
+  private int recordLength;
+
+  public FileSplitterInput createFileSplitter()
+  {
+return new FileSplitterInput();
+  }
+
+  public FSRecordReader createBlockReader()
+  {
+FSRecordReader recordReader = new FSRecordReader();
+recordReader.setMode(mode);
+recordReader.setRecordLength(recordLength);
+
+return recordReader;
+  }
+
+  @Override
+  public void populateDAG(DAG dag, Configuration configuration)
+  {
+FileSplitterInput fileSplitter = dag.addOperator("FileSplitter", 
createFileSplitter());
+FSRecordReader recordReader = dag.addOperator("BlockReader", 
createBlockReader());
+
+dag.addStream("BlockMetadata", fileSplitter.blocksMetadataOutput, 
recordReader.blocksMetadataInput);
+
+if (sequentialFileRead) {
+  

[jira] [Commented] (APEXMALHAR-2116) File Record reader module

2016-07-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15366020#comment-15366020
 ] 

ASF GitHub Bot commented on APEXMALHAR-2116:


Github user DT-Priyanka commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/326#discussion_r69897612
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordReaderModule.java 
---
@@ -0,0 +1,334 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.lib.fs;
+
+import javax.validation.constraints.Min;
+
+import javax.validation.constraints.NotNull;
+import javax.validation.constraints.Size;
+
+import org.apache.apex.malhar.lib.fs.FSRecordReader.RECORD_READER_MODE;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.Module;
+import com.datatorrent.common.partitioner.StatelessPartitioner;
+import com.datatorrent.lib.codec.KryoSerializableStreamCodec;
+import com.datatorrent.lib.io.block.BlockMetadata;
+import com.datatorrent.lib.io.block.FSSliceReader;
+import com.datatorrent.lib.io.fs.FileSplitterInput;
+
+/**
+ * This module is used for reading records/tuples from FileSystem. Records 
can
+ * be read in parallel using multiple partitions of record reader operator.
+ * (Ordering is not guaranteed when records are read in parallel)
+ *
+ * Input directory is scanned at specified interval to poll for new data.
+ * 
+ * The module reads data in parallel, following parameters can be 
configured
+ * 
+ * 1. files: list of file(s)/directories to read
+ * 2. filePatternRegularExp: Files with names matching given regex will be 
read
+ * 3. scanIntervalMillis: interval between two scans to discover new files 
in
+ * input directory
+ * 4. recursive: if true, scan input directories recursively
+ * 5. blockSize: block size used to read input blocks of file
+ * 6. readersCount: count of readers to read input file
+ * 7. sequentialFileRead: if true, then each reader partition will read 
different file. 
+ *instead of reading different offsets of the same file. 
+ *(File level parallelism instead of block level parallelism)
+ * 8. blocksThreshold: number of blocks emitted per window
+ */
+@org.apache.hadoop.classification.InterfaceStability.Evolving
+public class FSRecordReaderModule implements Module
+{
+  @NotNull
+  @Size(min = 1)
+  private String files;
+  private String filePatternRegularExp;
+  @Min(0)
+  private long scanIntervalMillis;
+  private boolean recursive = true;
+  private boolean sequentialFileRead = false;
+  private int readersCount;
+  @Min(1)
+  protected int blocksThreshold;
+
+  public final transient ProxyOutputPort records = new 
ProxyOutputPort();
+
+  /**
+   * Criteria for record split
+   */
+  private RECORD_READER_MODE mode;
+
+  /**
+   * Length for fixed width record
+   */
+  private int recordLength;
+
+  public FileSplitterInput createFileSplitter()
+  {
+return new FileSplitterInput();
+  }
+
+  public FSRecordReader createBlockReader()
--- End diff --

we can rename it to createRecordReader


> File Record reader module
> -
>
> Key: APEXMALHAR-2116
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2116
> Project: Apache Apex Malhar
>  Issue Type: New Feature
>Reporter: Yogi Devendra
>Assignee: Yogi Devendra
>
> This will be useful for the usecases which involves reading from files "line 
> by line" in parallel and emit each line as seperate tuple.
> Proposal is to have new Module which would allow users to monitor 
> directories, read files and emit data records(tuple). Records 

[jira] [Commented] (APEXMALHAR-2116) File Record reader module

2016-07-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15361186#comment-15361186
 ] 

ASF GitHub Bot commented on APEXMALHAR-2116:


Github user yogidevendra closed the pull request at:

https://github.com/apache/apex-malhar/pull/326


> File Record reader module
> -
>
> Key: APEXMALHAR-2116
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2116
> Project: Apache Apex Malhar
>  Issue Type: New Feature
>Reporter: Yogi Devendra
>Assignee: Yogi Devendra
>
> This will be useful for the usecases which involves reading from files "line 
> by line" in parallel and emit each line as seperate tuple.
> Proposal is to have new Module which would allow users to monitor 
> directories, read files and emit data records(tuple). Records are based on 
> record separator (e.g. newline) or fixed size (no of bytes). 
> Plan is as follows:
> 1. New operator FileRecordReader which will extend BlockReader.
> 2. This operator will have configuration option to select mode for 
> FIXED_LENGTH, SEPARATOR_BASED recors. 
> 3. Using appropriate ReaderContext based on mode.
> 4. New module FileRecordReaderModule which wraps (FileSplitter (existing) + 
> FileRecordReader operator)
> Reason for having different operator than BlockReader is because output port 
> signature is different than BlockReader.
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (APEXMALHAR-2116) File Record reader module

2016-07-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15361187#comment-15361187
 ] 

ASF GitHub Bot commented on APEXMALHAR-2116:


GitHub user yogidevendra reopened a pull request:

https://github.com/apache/apex-malhar/pull/326

APEXMALHAR-2116 Added FS record reader operator, module, test



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/yogidevendra/apex-malhar 
APEXMALHAR-2116-record-reader

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/apex-malhar/pull/326.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #326


commit 506ab58d382f6f6b338486a49bad0838d77272f9
Author: yogidevendra 
Date:   2016-06-20T05:47:08Z

Added FS record reader operator, module, test

2. incorporated review comments

3. javadoc improvements.




> File Record reader module
> -
>
> Key: APEXMALHAR-2116
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2116
> Project: Apache Apex Malhar
>  Issue Type: New Feature
>Reporter: Yogi Devendra
>Assignee: Yogi Devendra
>
> This will be useful for the usecases which involves reading from files "line 
> by line" in parallel and emit each line as seperate tuple.
> Proposal is to have new Module which would allow users to monitor 
> directories, read files and emit data records(tuple). Records are based on 
> record separator (e.g. newline) or fixed size (no of bytes). 
> Plan is as follows:
> 1. New operator FileRecordReader which will extend BlockReader.
> 2. This operator will have configuration option to select mode for 
> FIXED_LENGTH, SEPARATOR_BASED recors. 
> 3. Using appropriate ReaderContext based on mode.
> 4. New module FileRecordReaderModule which wraps (FileSplitter (existing) + 
> FileRecordReader operator)
> Reason for having different operator than BlockReader is because output port 
> signature is different than BlockReader.
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (APEXMALHAR-2116) File Record reader module

2016-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15355935#comment-15355935
 ] 

ASF GitHub Bot commented on APEXMALHAR-2116:


Github user amberarrow commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/326#discussion_r69042882
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordReaderModule.java 
---
@@ -0,0 +1,332 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.lib.fs;
+
+import javax.validation.constraints.Min;
+
+import javax.validation.constraints.NotNull;
+import javax.validation.constraints.Size;
+
+import org.apache.apex.malhar.lib.fs.FSRecordReader.RECORD_READER_MODE;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.Module;
+import com.datatorrent.common.partitioner.StatelessPartitioner;
+import com.datatorrent.lib.codec.KryoSerializableStreamCodec;
+import com.datatorrent.lib.io.block.BlockMetadata;
+import com.datatorrent.lib.io.block.FSSliceReader;
+import com.datatorrent.lib.io.fs.FileSplitterInput;
+
+/**
+ * This module is used for reading records/tuples from FileSystem. Records 
can
+ * be read in parallel using multiple partitions of record reader operator.
+ * (Ordering is not guaranteed when records are read in parallel)
+ *
+ * Input directory is scanned at specified interval to poll for new data.
+ * 
+ * The module reads data in parallel, following parameters can be 
configured
+ * 
+ * 1. files: list of file(s)/directories to read
+ * 2. filePatternRegularExp: Files names matching given regex will be 
read
+ * 3. scanIntervalMillis: interval between two scans to discover new files 
in
+ * input directory
+ * 4. recursive: if scan recursively input directories
+ * 5. blockSize: block size used to read input blocks of file
+ * 6. readersCount: count of readers to read input file
+ * 7. sequentialFileRead: If emit file blocks in sequence?
+ * 8. blocksThreshold: number of blocks emitted per window
+ */
+@org.apache.hadoop.classification.InterfaceStability.Evolving
+public class FSRecordReaderModule implements Module
+{
+  @NotNull
+  @Size(min = 1)
+  private String files;
+  private String filePatternRegularExp;
+  @Min(0)
+  private long scanIntervalMillis;
+  private boolean recursive = true;
+  private boolean sequentialFileRead = false;
+  private int readersCount;
+  @Min(1)
+  protected int blocksThreshold;
+
+  public final transient ProxyOutputPort records = new 
ProxyOutputPort();
+
+  /**
+   * Criteria for record split
+   */
+  private RECORD_READER_MODE mode;
+
+  /**
+   * Length for fixed width record
+   */
+  private int recordLength;
+
+  public FileSplitterInput createFileSplitter()
+  {
+return new FileSplitterInput();
+  }
+
+  public FSRecordReader createBlockReader()
+  {
+FSRecordReader recordReader = new FSRecordReader();
+recordReader.setMode(mode);
+recordReader.setRecordLength(recordLength);
+
+return recordReader;
+  }
+
+  @Override
+  public void populateDAG(DAG dag, Configuration configuration)
+  {
+FileSplitterInput fileSplitter = dag.addOperator("FileSplitter", 
createFileSplitter());
+FSRecordReader recordReader = dag.addOperator("BlockReader", 
createBlockReader());
+
+dag.addStream("BlockMetadata", fileSplitter.blocksMetadataOutput, 
recordReader.blocksMetadataInput);
+
+if (sequentialFileRead) {
+  dag.setInputPortAttribute(recordReader.blocksMetadataInput, 
Context.PortContext.STREAM_CODEC,
+  new SequentialFileBlockMetadataCodec());
+}
+
+

[jira] [Commented] (APEXMALHAR-2116) File Record reader module

2016-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15355930#comment-15355930
 ] 

ASF GitHub Bot commented on APEXMALHAR-2116:


Github user amberarrow commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/326#discussion_r69042391
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordReaderModule.java 
---
@@ -0,0 +1,332 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.lib.fs;
+
+import javax.validation.constraints.Min;
+
+import javax.validation.constraints.NotNull;
+import javax.validation.constraints.Size;
+
+import org.apache.apex.malhar.lib.fs.FSRecordReader.RECORD_READER_MODE;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.Module;
+import com.datatorrent.common.partitioner.StatelessPartitioner;
+import com.datatorrent.lib.codec.KryoSerializableStreamCodec;
+import com.datatorrent.lib.io.block.BlockMetadata;
+import com.datatorrent.lib.io.block.FSSliceReader;
+import com.datatorrent.lib.io.fs.FileSplitterInput;
+
+/**
+ * This module is used for reading records/tuples from FileSystem. Records 
can
+ * be read in parallel using multiple partitions of record reader operator.
+ * (Ordering is not guaranteed when records are read in parallel)
+ *
+ * Input directory is scanned at specified interval to poll for new data.
+ * 
+ * The module reads data in parallel, following parameters can be 
configured
+ * 
+ * 1. files: list of file(s)/directories to read
+ * 2. filePatternRegularExp: Files names matching given regex will be 
read
+ * 3. scanIntervalMillis: interval between two scans to discover new files 
in
+ * input directory
+ * 4. recursive: if scan recursively input directories
+ * 5. blockSize: block size used to read input blocks of file
+ * 6. readersCount: count of readers to read input file
+ * 7. sequentialFileRead: If emit file blocks in sequence?
--- End diff --

This description is not clear, please rewrite.


> File Record reader module
> -
>
> Key: APEXMALHAR-2116
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2116
> Project: Apache Apex Malhar
>  Issue Type: New Feature
>Reporter: Yogi Devendra
>Assignee: Yogi Devendra
>
> This will be useful for the usecases which involves reading from files "line 
> by line" in parallel and emit each line as seperate tuple.
> Proposal is to have new Module which would allow users to monitor 
> directories, read files and emit data records(tuple). Records are based on 
> record separator (e.g. newline) or fixed size (no of bytes). 
> Plan is as follows:
> 1. New operator FileRecordReader which will extend BlockReader.
> 2. This operator will have configuration option to select mode for 
> FIXED_LENGTH, SEPARATOR_BASED recors. 
> 3. Using appropriate ReaderContext based on mode.
> 4. New module FileRecordReaderModule which wraps (FileSplitter (existing) + 
> FileRecordReader operator)
> Reason for having different operator than BlockReader is because output port 
> signature is different than BlockReader.
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)



[jira] [Commented] (APEXMALHAR-2116) File Record reader module

2016-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15355929#comment-15355929
 ] 

ASF GitHub Bot commented on APEXMALHAR-2116:


Github user amberarrow commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/326#discussion_r69042316
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordReaderModule.java 
---
@@ -0,0 +1,332 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.lib.fs;
+
+import javax.validation.constraints.Min;
+
+import javax.validation.constraints.NotNull;
+import javax.validation.constraints.Size;
+
+import org.apache.apex.malhar.lib.fs.FSRecordReader.RECORD_READER_MODE;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.Module;
+import com.datatorrent.common.partitioner.StatelessPartitioner;
+import com.datatorrent.lib.codec.KryoSerializableStreamCodec;
+import com.datatorrent.lib.io.block.BlockMetadata;
+import com.datatorrent.lib.io.block.FSSliceReader;
+import com.datatorrent.lib.io.fs.FileSplitterInput;
+
+/**
+ * This module is used for reading records/tuples from FileSystem. Records 
can
+ * be read in parallel using multiple partitions of record reader operator.
+ * (Ordering is not guaranteed when records are read in parallel)
+ *
+ * Input directory is scanned at specified interval to poll for new data.
+ * 
+ * The module reads data in parallel, following parameters can be 
configured
+ * 
+ * 1. files: list of file(s)/directories to read
+ * 2. filePatternRegularExp: Files names matching given regex will be 
read
+ * 3. scanIntervalMillis: interval between two scans to discover new files 
in
+ * input directory
+ * 4. recursive: if scan recursively input directories
--- End diff --

if scan recursively input directories => if true, scan input directories 
recursively


> File Record reader module
> -
>
> Key: APEXMALHAR-2116
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2116
> Project: Apache Apex Malhar
>  Issue Type: New Feature
>Reporter: Yogi Devendra
>Assignee: Yogi Devendra
>
> This will be useful for the usecases which involves reading from files "line 
> by line" in parallel and emit each line as seperate tuple.
> Proposal is to have new Module which would allow users to monitor 
> directories, read files and emit data records(tuple). Records are based on 
> record separator (e.g. newline) or fixed size (no of bytes). 
> Plan is as follows:
> 1. New operator FileRecordReader which will extend BlockReader.
> 2. This operator will have configuration option to select mode for 
> FIXED_LENGTH, SEPARATOR_BASED recors. 
> 3. Using appropriate ReaderContext based on mode.
> 4. New module FileRecordReaderModule which wraps (FileSplitter (existing) + 
> FileRecordReader operator)
> Reason for having different operator than BlockReader is because output port 
> signature is different than BlockReader.
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (APEXMALHAR-2116) File Record reader module

2016-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15355202#comment-15355202
 ] 

ASF GitHub Bot commented on APEXMALHAR-2116:


GitHub user yogidevendra reopened a pull request:

https://github.com/apache/apex-malhar/pull/326

APEXMALHAR-2116 Added FS record reader operator, module, test



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/yogidevendra/apex-malhar 
APEXMALHAR-2116-record-reader

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/apex-malhar/pull/326.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #326


commit e5dba39f1aa5390275faf5c5938d89dd3cd78598
Author: yogidevendra 
Date:   2016-06-20T05:47:08Z

Added FS record reader operator, module, test

2. incorporated review comments




> File Record reader module
> -
>
> Key: APEXMALHAR-2116
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2116
> Project: Apache Apex Malhar
>  Issue Type: New Feature
>Reporter: Yogi Devendra
>Assignee: Yogi Devendra
>
> This will be useful for the usecases which involves reading from files "line 
> by line" in parallel and emit each line as seperate tuple.
> Proposal is to have new Module which would allow users to monitor 
> directories, read files and emit data records(tuple). Records are based on 
> record separator (e.g. newline) or fixed size (no of bytes). 
> Plan is as follows:
> 1. New operator FileRecordReader which will extend BlockReader.
> 2. This operator will have configuration option to select mode for 
> FIXED_LENGTH, SEPARATOR_BASED recors. 
> 3. Using appropriate ReaderContext based on mode.
> 4. New module FileRecordReaderModule which wraps (FileSplitter (existing) + 
> FileRecordReader operator)
> Reason for having different operator than BlockReader is because output port 
> signature is different than BlockReader.
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (APEXMALHAR-2116) File Record reader module

2016-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15355201#comment-15355201
 ] 

ASF GitHub Bot commented on APEXMALHAR-2116:


Github user yogidevendra closed the pull request at:

https://github.com/apache/apex-malhar/pull/326


> File Record reader module
> -
>
> Key: APEXMALHAR-2116
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2116
> Project: Apache Apex Malhar
>  Issue Type: New Feature
>Reporter: Yogi Devendra
>Assignee: Yogi Devendra
>
> This will be useful for the usecases which involves reading from files "line 
> by line" in parallel and emit each line as seperate tuple.
> Proposal is to have new Module which would allow users to monitor 
> directories, read files and emit data records(tuple). Records are based on 
> record separator (e.g. newline) or fixed size (no of bytes). 
> Plan is as follows:
> 1. New operator FileRecordReader which will extend BlockReader.
> 2. This operator will have configuration option to select mode for 
> FIXED_LENGTH, SEPARATOR_BASED recors. 
> 3. Using appropriate ReaderContext based on mode.
> 4. New module FileRecordReaderModule which wraps (FileSplitter (existing) + 
> FileRecordReader operator)
> Reason for having different operator than BlockReader is because output port 
> signature is different than BlockReader.
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (APEXMALHAR-2116) File Record reader module

2016-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15355113#comment-15355113
 ] 

ASF GitHub Bot commented on APEXMALHAR-2116:


Github user yogidevendra closed the pull request at:

https://github.com/apache/apex-malhar/pull/326


> File Record reader module
> -
>
> Key: APEXMALHAR-2116
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2116
> Project: Apache Apex Malhar
>  Issue Type: New Feature
>Reporter: Yogi Devendra
>Assignee: Yogi Devendra
>
> This will be useful for the usecases which involves reading from files "line 
> by line" in parallel and emit each line as seperate tuple.
> Proposal is to have new Module which would allow users to monitor 
> directories, read files and emit data records(tuple). Records are based on 
> record separator (e.g. newline) or fixed size (no of bytes). 
> Plan is as follows:
> 1. New operator FileRecordReader which will extend BlockReader.
> 2. This operator will have configuration option to select mode for 
> FIXED_LENGTH, SEPARATOR_BASED recors. 
> 3. Using appropriate ReaderContext based on mode.
> 4. New module FileRecordReaderModule which wraps (FileSplitter (existing) + 
> FileRecordReader operator)
> Reason for having different operator than BlockReader is because output port 
> signature is different than BlockReader.
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (APEXMALHAR-2116) File Record reader module

2016-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15355114#comment-15355114
 ] 

ASF GitHub Bot commented on APEXMALHAR-2116:


GitHub user yogidevendra reopened a pull request:

https://github.com/apache/apex-malhar/pull/326

APEXMALHAR-2116 Added FS record reader operator, module, test



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/yogidevendra/apex-malhar 
APEXMALHAR-2116-record-reader

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/apex-malhar/pull/326.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #326


commit c4a6d552c5b0c6b8ede672086f3548847311ab70
Author: yogidevendra 
Date:   2016-06-20T05:47:08Z

Added FS record reader operator, module, test

2. incorporated review comments




> File Record reader module
> -
>
> Key: APEXMALHAR-2116
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2116
> Project: Apache Apex Malhar
>  Issue Type: New Feature
>Reporter: Yogi Devendra
>Assignee: Yogi Devendra
>
> This will be useful for the usecases which involves reading from files "line 
> by line" in parallel and emit each line as seperate tuple.
> Proposal is to have new Module which would allow users to monitor 
> directories, read files and emit data records(tuple). Records are based on 
> record separator (e.g. newline) or fixed size (no of bytes). 
> Plan is as follows:
> 1. New operator FileRecordReader which will extend BlockReader.
> 2. This operator will have configuration option to select mode for 
> FIXED_LENGTH, SEPARATOR_BASED recors. 
> 3. Using appropriate ReaderContext based on mode.
> 4. New module FileRecordReaderModule which wraps (FileSplitter (existing) + 
> FileRecordReader operator)
> Reason for having different operator than BlockReader is because output port 
> signature is different than BlockReader.
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (APEXMALHAR-2116) File Record reader module

2016-06-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15352095#comment-15352095
 ] 

ASF GitHub Bot commented on APEXMALHAR-2116:


Github user amberarrow commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/326#discussion_r68678627
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordReader.java ---
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.lib.fs;
+
+import java.io.IOException;
+
+import org.apache.commons.beanutils.ConvertUtils;
+import org.apache.commons.beanutils.converters.AbstractConverter;
+import org.apache.hadoop.fs.FSDataInputStream;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.lib.io.block.BlockMetadata;
+import com.datatorrent.lib.io.block.FSSliceReader;
+import com.datatorrent.lib.io.block.ReaderContext;
+
+/**
+ * This operator can be used for reading records/tuples from Filesystem in
+ * parallel (without ordering guarantees between tuples). Records can be
+ * delimited (e.g. newline) or fixed width records. Output tuples are 
byte[].
+ * 
+ * Typically, this operator will be connected to output of 
FileSplitterInput to
+ * read records in parallel.
+ */
+@org.apache.hadoop.classification.InterfaceStability.Evolving
+public class FSRecordReader extends FSSliceReader
+{
+  /**
+   * Record reader mode decides how to split the records.
+   */
+  public static enum RECORD_READER_MODE
+  {
+DELIMITED_RECORD, FIXED_WIDTH_RECORD;
+  }
+
+  /**
+   * Criteria for record split
+   */
+  private RECORD_READER_MODE mode = RECORD_READER_MODE.DELIMITED_RECORD;
+
+  /**
+   * Length for fixed width record
+   */
+  private int recordLength;
+
+  /**
+   * Port to emit individual records/tuples as byte[]
+   */
+  public final transient DefaultOutputPort records = new 
DefaultOutputPort();
+
+  /**
+   * Initialize appropriate reader context based on mode selection
+   */
+  @Override
+  public void setup(OperatorContext context)
+  {
+super.setup(context);
+if (mode == RECORD_READER_MODE.FIXED_WIDTH_RECORD) {
+  ReaderContext.FixedBytesReaderContext 
fixedBytesReaderContext = new 
ReaderContext.FixedBytesReaderContext();
+  fixedBytesReaderContext.setLength(recordLength);
+  readerContext = fixedBytesReaderContext;
+} else {
+  readerContext = new 
ReaderContext.ReadAheadLineReaderContext();
+}
+  }
+
+  /**
+   * Read the block data and emit records based on reader context
+   *
+   * @param blockMetadata
+   *  block
+   * @throws IOException
+   */
+  protected void readBlock(BlockMetadata blockMetadata) throws IOException
+  {
+readerContext.initialize(stream, blockMetadata, consecutiveBlock);
+ReaderContext.Entity entity;
+while ((entity = readerContext.next()) != null) {
+
+  
counters.getCounter(ReaderCounterKeys.BYTES).add(entity.getUsedBytes());
+
+  byte[] record = entity.getRecord();
+
+  if (record != null) {
+counters.getCounter(ReaderCounterKeys.RECORDS).increment();
+records.emit(record);
+  }
+}
+  }
+
+  /**
+   * Criteria for record split
+   * 
+   * @param mode
+   *  Mode
+   */
+  public void setMode(RECORD_READER_MODE mode)
+  {
+this.mode = mode;
+  }
+
+  /**
+   * Criteria for record split
+   * 
+   * @return mode
+   */
+  public RECORD_READER_MODE getMode()
+  {
+return mode;
+  }
+
+  /**
+   * Length for fixed width record
+   * 
+   * @param 

[jira] [Commented] (APEXMALHAR-2116) File Record reader module

2016-06-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15346391#comment-15346391
 ] 

ASF GitHub Bot commented on APEXMALHAR-2116:


Github user amberarrow commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/326#discussion_r68229454
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordReaderModule.java 
---
@@ -0,0 +1,331 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.lib.fs;
+
+import javax.validation.constraints.Min;
+
+import javax.validation.constraints.NotNull;
+import javax.validation.constraints.Size;
+
+import org.apache.apex.malhar.lib.fs.FSRecordReader.RECORD_READER_MODE;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.Module;
+import com.datatorrent.common.partitioner.StatelessPartitioner;
+import com.datatorrent.lib.codec.KryoSerializableStreamCodec;
+import com.datatorrent.lib.io.block.BlockMetadata;
+import com.datatorrent.lib.io.block.FSSliceReader;
+import com.datatorrent.lib.io.fs.FileSplitterInput;
+
+/**
+ * This module is used for reading records/tuples from FileSystem. Records 
can
+ * be read in parallel using multiple partitions of record reader operator.
+ * (Ordering is not guaranteed when records are read in parallel)
+ *
+ * Input directory is scanned at specified interval to poll for new data.
+ * 
+ * The module reads data in parallel, following parameters can be 
configured
+ * 
+ * 1. files: list of file(s)/directories to read
+ * 2. filePatternRegularExp: Files names matching given regex will be 
read
+ * 3. scanIntervalMillis: interval between two scans to discover new files 
in
+ * input directory
+ * 4. recursive: if scan recursively input directories
+ * 5. blockSize: block size used to read input blocks of file
+ * 6. readersCount: count of readers to read input file
+ * 7. sequencialFileRead: If emit file blocks in sequence?
+ * 8. blocksThreshold: number of blocks emitted per window
+ */
+@org.apache.hadoop.classification.InterfaceStability.Evolving
+public class FSRecordReaderModule implements Module
+{
+  @NotNull
+  @Size(min = 1)
+  private String files;
+  private String filePatternRegularExp;
+  @Min(0)
+  private long scanIntervalMillis;
+  private boolean recursive = true;
+  private boolean sequencialFileRead = false;
--- End diff --

sequencial => sequential


> File Record reader module
> -
>
> Key: APEXMALHAR-2116
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2116
> Project: Apache Apex Malhar
>  Issue Type: New Feature
>Reporter: Yogi Devendra
>Assignee: Yogi Devendra
>
> This will be useful for the usecases which involves reading from files "line 
> by line" in parallel and emit each line as seperate tuple.
> Proposal is to have new Module which would allow users to monitor 
> directories, read files and emit data records(tuple). Records are based on 
> record separator (e.g. newline) or fixed size (no of bytes). 
> Plan is as follows:
> 1. New operator FileRecordReader which will extend BlockReader.
> 2. This operator will have configuration option to select mode for 
> FIXED_LENGTH, SEPARATOR_BASED recors. 
> 3. Using appropriate ReaderContext based on mode.
> 4. New module FileRecordReaderModule which wraps (FileSplitter (existing) + 
> FileRecordReader operator)
> Reason for having different operator than BlockReader is because output port 
> signature is different than BlockReader.
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (APEXMALHAR-2116) File Record reader module

2016-06-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15346390#comment-15346390
 ] 

ASF GitHub Bot commented on APEXMALHAR-2116:


Github user amberarrow commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/326#discussion_r68229363
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordReaderModule.java 
---
@@ -0,0 +1,331 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.lib.fs;
+
+import javax.validation.constraints.Min;
+
+import javax.validation.constraints.NotNull;
+import javax.validation.constraints.Size;
+
+import org.apache.apex.malhar.lib.fs.FSRecordReader.RECORD_READER_MODE;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.Module;
+import com.datatorrent.common.partitioner.StatelessPartitioner;
+import com.datatorrent.lib.codec.KryoSerializableStreamCodec;
+import com.datatorrent.lib.io.block.BlockMetadata;
+import com.datatorrent.lib.io.block.FSSliceReader;
+import com.datatorrent.lib.io.fs.FileSplitterInput;
+
+/**
+ * This module is used for reading records/tuples from FileSystem. Records 
can
+ * be read in parallel using multiple partitions of record reader operator.
+ * (Ordering is not guaranteed when records are read in parallel)
+ *
+ * Input directory is scanned at specified interval to poll for new data.
+ * 
+ * The module reads data in parallel, following parameters can be 
configured
+ * 
+ * 1. files: list of file(s)/directories to read
+ * 2. filePatternRegularExp: Files names matching given regex will be 
read
+ * 3. scanIntervalMillis: interval between two scans to discover new files 
in
+ * input directory
+ * 4. recursive: if scan recursively input directories
+ * 5. blockSize: block size used to read input blocks of file
+ * 6. readersCount: count of readers to read input file
+ * 7. sequencialFileRead: If emit file blocks in sequence?
--- End diff --

sequencial => sequential


> File Record reader module
> -
>
> Key: APEXMALHAR-2116
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2116
> Project: Apache Apex Malhar
>  Issue Type: New Feature
>Reporter: Yogi Devendra
>Assignee: Yogi Devendra
>
> This will be useful for the usecases which involves reading from files "line 
> by line" in parallel and emit each line as seperate tuple.
> Proposal is to have new Module which would allow users to monitor 
> directories, read files and emit data records(tuple). Records are based on 
> record separator (e.g. newline) or fixed size (no of bytes). 
> Plan is as follows:
> 1. New operator FileRecordReader which will extend BlockReader.
> 2. This operator will have configuration option to select mode for 
> FIXED_LENGTH, SEPARATOR_BASED recors. 
> 3. Using appropriate ReaderContext based on mode.
> 4. New module FileRecordReaderModule which wraps (FileSplitter (existing) + 
> FileRecordReader operator)
> Reason for having different operator than BlockReader is because output port 
> signature is different than BlockReader.
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (APEXMALHAR-2116) File Record reader module

2016-06-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15343027#comment-15343027
 ] 

ASF GitHub Bot commented on APEXMALHAR-2116:


Github user amberarrow commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/326#discussion_r67971098
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordReaderModule.java 
---
@@ -0,0 +1,317 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.lib.fs;
+
+import javax.validation.constraints.Min;
+
+import javax.validation.constraints.NotNull;
+import javax.validation.constraints.Size;
+
+import org.apache.apex.malhar.lib.fs.FSRecordReader.RECORD_READER_MODE;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.Module;
+import com.datatorrent.common.partitioner.StatelessPartitioner;
+import com.datatorrent.lib.codec.KryoSerializableStreamCodec;
+import com.datatorrent.lib.io.block.BlockMetadata;
+import com.datatorrent.lib.io.block.FSSliceReader;
+import com.datatorrent.lib.io.fs.FileSplitterInput;
+
+/**
+ * This module is used for reading records/tuples from FileSystem.
+ * Records can be read in parallel using multiple partitions of record 
reader operator.
+ * (Ordering is not guaranteed when records are read in parallel)
+ *
+ * Input directory is scanned at specified interval to poll for new data.
+ * 
+ * The module reads data in parallel, following parameters can be 
configured
+ * 1. files: list of file(s)/directories to read
+ * 2. filePatternRegularExp: Files names matching given regex will be 
read
+ * 3. scanIntervalMillis: interval between two scans to discover new files 
in input directory
+ * 4. recursive: if scan recursively input directories
+ * 5. blockSize: block size used to read input blocks of file
+ * 6. readersCount: count of readers to read input file
+ * 7. sequencialFileRead: If emit file blocks in sequence?
+ * 8. blocksThreshold: number of blocks emitted per window
+ */
+@org.apache.hadoop.classification.InterfaceStability.Evolving
+public class FSRecordReaderModule implements Module
+{
+  @NotNull
+  @Size(min = 1)
+  private String files;
+  private String filePatternRegularExp;
+  @Min(0)
+  private long scanIntervalMillis;
+  private boolean recursive = true;
+  private boolean sequencialFileRead = false;
+  private int readersCount;
+  @Min(1)
+  protected int blocksThreshold;
+  
+  public final transient ProxyOutputPort records = new 
ProxyOutputPort();
+  
+  /**
+   * Criteria for record split
+   */
+  private RECORD_READER_MODE mode;
+  
+  /**
+   * Length for fixed width record
+   */
+  private int recordLength;
+  
+  public FileSplitterInput createFileSplitter()
+  {
+return new FileSplitterInput();
+  }
+  
+  public FSRecordReader createBlockReader()
+  {
+FSRecordReader recordReader = new FSRecordReader();
+recordReader.setMode(mode);
+recordReader.setRecordLength(recordLength);
+
+return recordReader;
+  }
+  
+  @Override
+  public void populateDAG(DAG dag, Configuration configuration)
+  {
+FileSplitterInput fileSplitter = dag.addOperator("FileSplitter", 
createFileSplitter());
+FSRecordReader recordReader = dag.addOperator("BlockReader", 
createBlockReader());
+
+dag.addStream("BlockMetadata", fileSplitter.blocksMetadataOutput, 
recordReader.blocksMetadataInput);
+
+if (sequencialFileRead) {
+  dag.setInputPortAttribute(recordReader.blocksMetadataInput, 
Context.PortContext.STREAM_CODEC,
+  new SequentialFileBlockMetadataCodec());
+}
+
+

[jira] [Commented] (APEXMALHAR-2116) File Record reader module

2016-06-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15343016#comment-15343016
 ] 

ASF GitHub Bot commented on APEXMALHAR-2116:


Github user amberarrow commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/326#discussion_r67970457
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordReader.java ---
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.lib.fs;
+
+import java.io.IOException;
+
+import org.apache.commons.beanutils.ConvertUtils;
+import org.apache.commons.beanutils.converters.AbstractConverter;
+import org.apache.hadoop.fs.FSDataInputStream;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.lib.io.block.BlockMetadata;
+import com.datatorrent.lib.io.block.FSSliceReader;
+import com.datatorrent.lib.io.block.ReaderContext;
+
+/**
+ * This operator can be used for reading records/tuples from Filesystem 
+ * in parallel (without ordering guarantees between tuples).
+ * Records can be delimited (e.g. newline) or fixed with records.
+ * Output tuples are byte[]. 
+ * 
+ * Typically, this operator will be connected to output of 
FileSplitterInput 
+ * to read records in parallel.
+ */
+@org.apache.hadoop.classification.InterfaceStability.Evolving
+public class FSRecordReader extends FSSliceReader
+{
+  /**
+   * Record reader mode decides how to split the records.
+   */
+  public static enum RECORD_READER_MODE
+  {
+DELIMITED_RECORD, FIXED_WIDTH_RECORD;
+  }
+
+  /**
+   * Criteria for record split
+   */
+  private RECORD_READER_MODE mode = RECORD_READER_MODE.DELIMITED_RECORD;
+  
+  /**
+   * Length for fixed width record
+   */
+  private int recordLength;
+  
+  /**
+   * Port to emit individual records/tuples as byte[]
+   */
+  public final transient DefaultOutputPort records = new 
DefaultOutputPort();
+
+  /**
+   * Initialize appropriate reader context based on mode selection
+   */
+  @Override
+  public void setup(OperatorContext context)
+  {
+super.setup(context);
+if (mode == RECORD_READER_MODE.FIXED_WIDTH_RECORD) {
+  ReaderContext.FixedBytesReaderContext 
fixedBytesReaderContext = new 
ReaderContext.FixedBytesReaderContext();
+  fixedBytesReaderContext.setLength(recordLength);
+  readerContext = fixedBytesReaderContext;
+} else {
+  readerContext = new 
ReaderContext.ReadAheadLineReaderContext();
+}
+  }
+  
+  /**
+   * Read the block data and emit records based on reader context
+   *
+   * @param blockMetadata block
+   * @throws IOException
+   */
+  protected void readBlock(BlockMetadata blockMetadata) throws IOException
+  {
+readerContext.initialize(stream, blockMetadata, consecutiveBlock);
+ReaderContext.Entity entity;
+while ((entity = readerContext.next()) != null) {
+
+  
counters.getCounter(ReaderCounterKeys.BYTES).add(entity.getUsedBytes());
+
+  byte[] record = entity.getRecord();
+
+  //If the record is partial then ignore the record.
--- End diff --

why 'partial' rather than 'null' ?


> File Record reader module
> -
>
> Key: APEXMALHAR-2116
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2116
> Project: Apache Apex Malhar
>  Issue Type: New Feature
>Reporter: Yogi Devendra
>Assignee: Yogi Devendra
>
> This will be useful for the usecases which involves reading from files "line 
> by line" in parallel and emit each line as seperate tuple.
> Proposal is to have new Module which would allow users to monitor 
> 

[jira] [Commented] (APEXMALHAR-2116) File Record reader module

2016-06-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15342995#comment-15342995
 ] 

ASF GitHub Bot commented on APEXMALHAR-2116:


Github user amberarrow commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/326#discussion_r67969125
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordReader.java ---
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.lib.fs;
+
+import java.io.IOException;
+
+import org.apache.commons.beanutils.ConvertUtils;
+import org.apache.commons.beanutils.converters.AbstractConverter;
+import org.apache.hadoop.fs.FSDataInputStream;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.lib.io.block.BlockMetadata;
+import com.datatorrent.lib.io.block.FSSliceReader;
+import com.datatorrent.lib.io.block.ReaderContext;
+
+/**
+ * This operator can be used for reading records/tuples from Filesystem 
+ * in parallel (without ordering guarantees between tuples).
+ * Records can be delimited (e.g. newline) or fixed with records.
--- End diff --

fixed with => fixed width


> File Record reader module
> -
>
> Key: APEXMALHAR-2116
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2116
> Project: Apache Apex Malhar
>  Issue Type: New Feature
>Reporter: Yogi Devendra
>Assignee: Yogi Devendra
>
> This will be useful for the usecases which involves reading from files "line 
> by line" in parallel and emit each line as seperate tuple.
> Proposal is to have new Module which would allow users to monitor 
> directories, read files and emit data records(tuple). Records are based on 
> record separator (e.g. newline) or fixed size (no of bytes). 
> Plan is as follows:
> 1. New operator FileRecordReader which will extend BlockReader.
> 2. This operator will have configuration option to select mode for 
> FIXED_LENGTH, SEPARATOR_BASED recors. 
> 3. Using appropriate ReaderContext based on mode.
> 4. New module FileRecordReaderModule which wraps (FileSplitter (existing) + 
> FileRecordReader operator)
> Reason for having different operator than BlockReader is because output port 
> signature is different than BlockReader.
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (APEXMALHAR-2116) File Record reader module

2016-06-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15342992#comment-15342992
 ] 

ASF GitHub Bot commented on APEXMALHAR-2116:


Github user amberarrow commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/326#discussion_r67968963
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordReader.java ---
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.lib.fs;
+
+import java.io.IOException;
+
+import org.apache.commons.beanutils.ConvertUtils;
+import org.apache.commons.beanutils.converters.AbstractConverter;
+import org.apache.hadoop.fs.FSDataInputStream;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.lib.io.block.BlockMetadata;
+import com.datatorrent.lib.io.block.FSSliceReader;
+import com.datatorrent.lib.io.block.ReaderContext;
+
+/**
+ * This operator can be used for reading records/tuples from Filesystem 
--- End diff --

This line and several others have trailing whitespace; this is generally 
considered not a good thing -- see
for instance: 
http://programmers.stackexchange.com/questions/121555/why-is-trailing-whitespace-a-big-deal
Suggest removing all trailing white space.


> File Record reader module
> -
>
> Key: APEXMALHAR-2116
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2116
> Project: Apache Apex Malhar
>  Issue Type: New Feature
>Reporter: Yogi Devendra
>Assignee: Yogi Devendra
>
> This will be useful for the usecases which involves reading from files "line 
> by line" in parallel and emit each line as seperate tuple.
> Proposal is to have new Module which would allow users to monitor 
> directories, read files and emit data records(tuple). Records are based on 
> record separator (e.g. newline) or fixed size (no of bytes). 
> Plan is as follows:
> 1. New operator FileRecordReader which will extend BlockReader.
> 2. This operator will have configuration option to select mode for 
> FIXED_LENGTH, SEPARATOR_BASED recors. 
> 3. Using appropriate ReaderContext based on mode.
> 4. New module FileRecordReaderModule which wraps (FileSplitter (existing) + 
> FileRecordReader operator)
> Reason for having different operator than BlockReader is because output port 
> signature is different than BlockReader.
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (APEXMALHAR-2116) File Record reader module

2016-06-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15341888#comment-15341888
 ] 

ASF GitHub Bot commented on APEXMALHAR-2116:


GitHub user yogidevendra opened a pull request:

https://github.com/apache/apex-malhar/pull/326

APEXMALHAR-2116 Added FS record reader operator, module, test



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/yogidevendra/apex-malhar 
APEXMALHAR-2116-record-reader

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/apex-malhar/pull/326.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #326


commit aaeb7fe34dc69de71dae120ea01452149389854f
Author: yogidevendra 
Date:   2016-06-20T05:47:08Z

Added FS record reader operator, module, test




> File Record reader module
> -
>
> Key: APEXMALHAR-2116
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2116
> Project: Apache Apex Malhar
>  Issue Type: New Feature
>Reporter: Yogi Devendra
>Assignee: Yogi Devendra
>
> This will be useful for the usecases which involves reading from files "line 
> by line" in parallel and emit each line as seperate tuple.
> Proposal is to have new Module which would allow users to monitor 
> directories, read files and emit data records(tuple). Records are based on 
> record separator (e.g. newline) or fixed size (no of bytes). 
> Plan is as follows:
> 1. New operator FileRecordReader which will extend BlockReader.
> 2. This operator will have configuration option to select mode for 
> FIXED_LENGTH, SEPARATOR_BASED recors. 
> 3. Using appropriate ReaderContext based on mode.
> 4. New module FileRecordReaderModule which wraps (FileSplitter (existing) + 
> FileRecordReader operator)
> Reason for having different operator than BlockReader is because output port 
> signature is different than BlockReader.
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)