Hi,

I created a custom input format. Idea behind this is to read all binary
files from a directory and use each file as it's own split. Each split is
read as one whole record. When I run it in flink I don't get any error but
I am not seeing any output from .print. Am I missing something?

----

*public* *class* *PDFFileInputFormat* *extends*
RichInputFormat<StringValue, InputSplit> {

*private* *static* *final* Logger *logger* = LoggerFactory.*getLogger*
(PDFFileInputFormat.*class*.getName());

PDFFileInputSplit current = *null*;

*public* *static* *void* main(String... args) *throws* Exception {

PDFFileInputFormat pdfReader = *new* PDFFileInputFormat("c:\\proj\\test");

InputSplit[] splits = pdfReader.createInputSplits(1);

pdfReader.open(splits[0]);

pdfReader.nextRecord(*null*);

*final* ExecutionEnvironment env = ExecutionEnvironment.
*getExecutionEnvironment*();

env.fromElements(1, 2, 3)

// returns the squared i

.print();

PDFFileInputFormat format = *new* PDFFileInputFormat("c:\\proj\\test");

InputFormatSourceFunction<StringValue> *reader* = *new*
InputFormatSourceFunction<>(format,

TypeInformation.*of*(StringValue.*class*));

env.createInput(format,TypeInformation.*of*(StringValue.*class*)).print();

}

String path = *null*;

*public* PDFFileInputFormat(String path) {

*this*.path = path;

}

*public* *void* configure(Configuration parameters) {

// *TODO* Auto-generated method stub

}

*public* BaseStatistics getStatistics(BaseStatistics cachedStatistics)
*throws* IOException {

// *TODO* Auto-generated method stub

*return* cachedStatistics;

}

*public* InputSplit[] createInputSplits(*int* minNumSplits) *throws*
IOException {

*final* List<PDFFileInputSplit> splits = *new*
ArrayList<PDFFileInputSplit>();

Files.*list*(Paths.*get*(path)).forEach(f -> {

PDFFileInputSplit split = *new* PDFFileInputSplit(splits.size(), f);

splits.add(split);

});

PDFFileInputSplit[] inputSplitArray = *new* PDFFileInputSplit[splits
.size()];

*return* splits.toArray(inputSplitArray);

}

*public* InputSplitAssigner getInputSplitAssigner(InputSplit[] inputSplits)
{

*logger*.info("Assigner");

// *TODO* Auto-generated method stub

*return* *new* DefaultInputSplitAssigner(inputSplits);

}

*public* *void* open(InputSplit split) *throws* IOException {

*this*.current = (PDFFileInputSplit) split;

}

*public* *boolean* reachedEnd() *throws* IOException {

// *TODO* Auto-generated method stub

*return* *true*;

}

*public* StringValue nextRecord(StringValue reuse) *throws* IOException {

String content = *new* String(Files.*readAllBytes*(*this*.current
.getFile()));

*logger*.info("Content " + content);

*return* *new* StringValue(content);

}

*public* *void* close() *throws* IOException {

// *TODO* Auto-generated method stub

}

}

---


Thanks,

Mohit

Reply via email to