Hi Team, I am using the apache flink with java for below problem statement
1.where i will read a csv file with field delimeter character ; 2.transform the fields 3.write back the data back to csv my doubts are as below 1. if i need to read the csv file of size above 50 gb what would be the approach 2 if i use Parallelism i am not able to split the data and collect it since its a csv file and while writing a back to csv its creating a multiple files to write the data using the default Parallelism how can achieve the same sample input is 000008000077;151139924603;3526358005322;2;29/07/2016:00:00:00;29/07/2018:00:00:00;20;4800019940 and sample output is 000008000077sfhsdfbs;151139924603;XXXXXXXXX;2;29/07/2016:00:00:00;29/07/2018:00:00:00;20;4800019940 below is the code which i am currently running on local environment ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ package com.ericsson.voucher; import org.apache.flink.api.common.functions.GroupReduceFunction; import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple1; import org.apache.flink.api.java.tuple.Tuple8; import org.apache.flink.util.Collector; public class Classification { private static final String OUTPUT_PATH = "C:\\Projects\\DM\\Pentaho\\OutPut\\output.csv"; public static void main(String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment .getExecutionEnvironment(); env.setParallelism(20); long subViewStartTime = System.currentTimeMillis(); DataSet<Tuple1<String>> rawdata = (DataSet<Tuple1<String>>) env .readCsvFile("C:\\Projects\\DM\\Pentaho\\CRV_EXPORT.csv") .lineDelimiter("\n").types(String.class); DataSet<Tuple8<String,String, String, String, String, String, String, String>> mails = rawdata .flatMap(new DataExtractor()).rebalance(); mails.writeAsCsv(OUTPUT_PATH, "\n", ";").setParallelism(1); mails.print(); long subViewEndTime = System.currentTimeMillis(); long subViewDifference = subViewEndTime - subViewStartTime; System.out.println("The Difference Time is"+ subViewDifference/1000 +"seconds"); } public static class DataExtractor extends RichFlatMapFunction<Tuple1<String>, Tuple8<String, String, String, String, String, String, String, String>> { /** * */ private static final long serialVersionUID = 1L; public void flatMap( Tuple1<String> paramIN, org.apache.flink.util.Collector<Tuple8<String, String, String, String, String, String, String, String>> out) throws Exception { String[] lines = paramIN.f0.split(";"); if (lines != null && lines.length > 0) { String vocuherCode =lines[0]; vocuherCode=vocuherCode+"TEST1"; String VoucherId = lines[1]; String voucherNumber = lines[2]; String status = lines[3]+"TWTSTST"; String startDate = lines[4] + ""; String endDate = lines[5] + ""; String endStatus = lines[6]; String endVoucherNumber = lines[7]; out.collect(new Tuple8<String, String, String, String, String, String, String, String>( vocuherCode, VoucherId, voucherNumber, status, startDate, endDate, endStatus, endVoucherNumber)); } } } public static class RecordReducer implements GroupReduceFunction<Tuple8<String, String, String, String, String, String, String, String>, Tuple8<String, String, String, String, String, String, String, String>> { /** * */ private static final long serialVersionUID = -6045821605365596025L; @Override public void reduce( Iterable<Tuple8<String, String, String, String, String, String, String, String>> paramIterable, Collector<Tuple8<String, String, String, String, String, String, String, String>> paramCollector) throws Exception { // TODO Auto-generated method stub String vocuherCode = null; String VoucherId = null; String voucherNumber = null; String status = null; String startDate = null; String endDate = null; String endStatus = null; String endVoucherNumber = null; for (Tuple8<String, String, String, String, String, String, String, String> m : paramIterable) { vocuherCode = m.f0; VoucherId = m.f1; voucherNumber = m.f2; status = m.f3; startDate = m.f4; endDate = m.f5; endStatus = m.f6; endVoucherNumber = m.f7; paramCollector .collect(new Tuple8<String, String, String, String, String, String, String, String>( vocuherCode, VoucherId, voucherNumber, status, startDate, endDate, endStatus, endVoucherNumber)); } } } } ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ please help me on the same how can I achieve the portioning of fields on the above data and achieve the parallism to increase the throughput of my application