Did you use StreamExecutionEnvironment.createFileInput() ? What did the modification times of the 2 files look like (were they the newest) ?
Cheers On Mon, Jul 31, 2017 at 12:42 PM, Mohit Anchlia <mohitanch...@gmail.com> wrote: > Thanks! When I give path to a directory flink is only reading 2 files. It > seems to be picking these 2 files randomly. > > On Mon, Jul 31, 2017 at 12:05 AM, Fabian Hueske <fhue...@gmail.com> wrote: > >> Hi Mohit, >> >> as Ted said, there are plenty of InputFormats which are based on >> FileInputFormat. >> FileInputFormat also supports reading all files in a directory. Simply >> specify the path of the directory. >> >> Check StreamExecutionEnvironment.createFileInput() which takes a several >> parameters such as a FileInputFormat and a time interval in which the >> directory is periodically checked. >> >> Best, Fabian >> >> 2017-07-30 21:31 GMT+02:00 Ted Yu <yuzhih...@gmail.com>: >> >>> For #1, you can find quite a few classes which extend FileInputFormat. >>> e.g. >>> >>> flink-connectors/flink-avro/src/main/java/org/apache/flink/a >>> pi/java/io/AvroInputFormat.java:public class AvroInputFormat<E> extends >>> FileInputFormat<E> implements ResultTypeQuer >>> flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java:public >>> abstract class BinaryInputFormat<T> extends FileInputFormat<T> >>> flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java:public >>> abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> >>> implements Checkpoi >>> >>> flink-streaming-java/src/test/java/org/apache/flink/streamin >>> g/runtime/operators/ContinuousFileProcessingRescalingTest.java: >>> extends FileInputFormat<String> >>> >>> FYI >>> >>> On Sun, Jul 30, 2017 at 12:26 PM, Mohit Anchlia <mohitanch...@gmail.com> >>> wrote: >>> >>>> Thanks. Few more questions: >>>> >>>> - Is there an example for FileInputFormat? >>>> - how to make it read all the files in a directory? >>>> - how to make an inputformat a streaming input instead of batch? Eg: >>>> read as new files come to a dir. >>>> >>>> Thanks again. >>>> >>>> On Sun, Jul 30, 2017 at 12:53 AM, Fabian Hueske <fhue...@gmail.com> >>>> wrote: >>>> >>>>> Hi, >>>>> >>>>> Flink calls the reachedEnd() method before it calls nextRecord() and >>>>> closes the IF when reachedEnd() returns true. >>>>> So, it should not return true until nextRecord() was called and the >>>>> first and last record was emitted. >>>>> >>>>> You might also want to built your PDFFileInputFormat on >>>>> FileInputFormat and set unsplittable to true. >>>>> FileInputFormat comes with lots of built-in functionality such as >>>>> InputSplit generation. >>>>> >>>>> Cheers, Fabian >>>>> >>>>> 2017-07-30 3:41 GMT+02:00 Mohit Anchlia <mohitanch...@gmail.com>: >>>>> >>>>>> Hi, >>>>>> >>>>>> I created a custom input format. Idea behind this is to read all >>>>>> binary files from a directory and use each file as it's own split. Each >>>>>> split is read as one whole record. When I run it in flink I don't get any >>>>>> error but I am not seeing any output from .print. Am I missing something? >>>>>> >>>>>> ---- >>>>>> >>>>>> *public* *class* *PDFFileInputFormat* *extends* >>>>>> RichInputFormat<StringValue, InputSplit> { >>>>>> >>>>>> *private* *static* *final* Logger *logger* = LoggerFactory. >>>>>> *getLogger*(PDFFileInputFormat.*class*.getName()); >>>>>> >>>>>> PDFFileInputSplit current = *null*; >>>>>> >>>>>> *public* *static* *void* main(String... args) *throws* Exception { >>>>>> >>>>>> PDFFileInputFormat pdfReader = *new* PDFFileInputFormat("c:\\proj\\ >>>>>> test"); >>>>>> >>>>>> InputSplit[] splits = pdfReader.createInputSplits(1); >>>>>> >>>>>> pdfReader.open(splits[0]); >>>>>> >>>>>> pdfReader.nextRecord(*null*); >>>>>> >>>>>> *final* ExecutionEnvironment env = ExecutionEnvironment. >>>>>> *getExecutionEnvironment*(); >>>>>> >>>>>> env.fromElements(1, 2, 3) >>>>>> >>>>>> // returns the squared i >>>>>> >>>>>> .print(); >>>>>> >>>>>> PDFFileInputFormat format = *new* PDFFileInputFormat("c:\\proj\\test" >>>>>> ); >>>>>> >>>>>> InputFormatSourceFunction<StringValue> *reader* = *new* >>>>>> InputFormatSourceFunction<>(format, >>>>>> >>>>>> TypeInformation.*of*(StringValue.*class*)); >>>>>> >>>>>> env.createInput(format,TypeInformation.*of*(StringValue.*class*) >>>>>> ).print(); >>>>>> >>>>>> } >>>>>> >>>>>> String path = *null*; >>>>>> >>>>>> *public* PDFFileInputFormat(String path) { >>>>>> >>>>>> *this*.path = path; >>>>>> >>>>>> } >>>>>> >>>>>> *public* *void* configure(Configuration parameters) { >>>>>> >>>>>> // *TODO* Auto-generated method stub >>>>>> >>>>>> } >>>>>> >>>>>> *public* BaseStatistics getStatistics(BaseStatistics cachedStatistics) >>>>>> *throws* IOException { >>>>>> >>>>>> // *TODO* Auto-generated method stub >>>>>> >>>>>> *return* cachedStatistics; >>>>>> >>>>>> } >>>>>> >>>>>> *public* InputSplit[] createInputSplits(*int* minNumSplits) *throws* >>>>>> IOException { >>>>>> >>>>>> *final* List<PDFFileInputSplit> splits = *new* >>>>>> ArrayList<PDFFileInputSplit>(); >>>>>> >>>>>> Files.*list*(Paths.*get*(path)).forEach(f -> { >>>>>> >>>>>> PDFFileInputSplit split = *new* PDFFileInputSplit(splits.size(), f); >>>>>> >>>>>> splits.add(split); >>>>>> >>>>>> }); >>>>>> >>>>>> PDFFileInputSplit[] inputSplitArray = *new* PDFFileInputSplit[splits >>>>>> .size()]; >>>>>> >>>>>> *return* splits.toArray(inputSplitArray); >>>>>> >>>>>> } >>>>>> >>>>>> *public* InputSplitAssigner getInputSplitAssigner(InputSplit[] >>>>>> inputSplits) { >>>>>> >>>>>> *logger*.info("Assigner"); >>>>>> >>>>>> // *TODO* Auto-generated method stub >>>>>> >>>>>> *return* *new* DefaultInputSplitAssigner(inputSplits); >>>>>> >>>>>> } >>>>>> >>>>>> *public* *void* open(InputSplit split) *throws* IOException { >>>>>> >>>>>> *this*.current = (PDFFileInputSplit) split; >>>>>> >>>>>> } >>>>>> >>>>>> *public* *boolean* reachedEnd() *throws* IOException { >>>>>> >>>>>> // *TODO* Auto-generated method stub >>>>>> >>>>>> *return* *true*; >>>>>> >>>>>> } >>>>>> >>>>>> *public* StringValue nextRecord(StringValue reuse) *throws* >>>>>> IOException { >>>>>> >>>>>> String content = *new* String(Files.*readAllBytes*(*this*.current >>>>>> .getFile())); >>>>>> >>>>>> *logger*.info("Content " + content); >>>>>> >>>>>> *return* *new* StringValue(content); >>>>>> >>>>>> } >>>>>> >>>>>> *public* *void* close() *throws* IOException { >>>>>> >>>>>> // *TODO* Auto-generated method stub >>>>>> >>>>>> } >>>>>> >>>>>> } >>>>>> >>>>>> --- >>>>>> >>>>>> >>>>>> Thanks, >>>>>> >>>>>> Mohit >>>>>> >>>>>> >>>>>> >>>>> >>>> >>> >> >