Hi, what data are you using?
The exception says "NullFieldException: Field 1 is null, but expected to hold a value.". Maybe the data is not in the right format? On Mon, Apr 27, 2015 at 2:32 PM, hagersaleh <loveallah1...@yahoo.com> wrote: > I want implement left outer join from two dataset i use Tuple data type > > > package org.apache.flink.examples.java.relational; > > > import org.apache.flink.api.common.functions.CoGroupFunction; > import org.apache.flink.api.common.functions.MapFunction; > import org.apache.flink.api.java.DataSet; > import org.apache.flink.api.java.ExecutionEnvironment; > import org.apache.flink.api.java.operators.DataSource; > import org.apache.flink.api.java.tuple.Tuple1; > import org.apache.flink.api.java.tuple.Tuple4; > import org.apache.flink.api.java.tuple.Tuple2; > import org.apache.flink.util.Collector; > import java.io.File; > > @SuppressWarnings("serial") > public class TPCHQuery3 { > > //filed name in cutomer table > > public static class LeftOuterJoin implements > CoGroupFunction<Tuple2<Tuple1<Integer>, String>, > Tuple2<Tuple1<Integer>, String>, > Tuple2<Tuple1<Integer>,Tuple1<Integer>>> { > > @Override > public void coGroup(Iterable<Tuple2<Tuple1<Integer>, String>> > leftElements, > Iterable<Tuple2<Tuple1<Integer>, String>> > rightElements, > > Collector<Tuple2<Tuple1<Integer>,Tuple1<Integer>>> out) throws > Exception { > > > > for (Tuple2<Tuple1<Integer>, String> leftElem : leftElements) > { > boolean hadElements = false; > for (Tuple2<Tuple1<Integer>, String> rightElem : > rightElements) { > out.collect(new > Tuple2<Tuple1<Integer>,Tuple1<Integer>>(leftElem.f0, rightElem.f0)); > hadElements = true; > } > if (!hadElements) { > out.collect(new Tuple2<Tuple1<Integer>, > Tuple1<Integer>>(leftElem.f0, null)); > } > } > > } > } > > public static void main(String[] args) throws Exception { > > > ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); > DataSet<Tuple1<Integer>> > leftSide=env.readCsvFile("/home/hadoop/Desktop/Dataset/customer.csv") > .fieldDelimiter('|') > > .includeFields("10000000").ignoreFirstLine() > .types(Integer.class); > > // DataSource<Integer> leftSide = env.fromElements(1, 2, 3, 4, 5); > DataSet<Tuple2<Tuple1<Integer>, String>> leftSide2 = leftSide.map( > new MapFunction<Tuple1<Integer>, Tuple2<Tuple1<Integer>, > String>>() { > @Override > public Tuple2<Tuple1<Integer>, String> > map(Tuple1<Integer> x) throws Exception { > return new Tuple2<Tuple1<Integer>, String>(x, > "some data"); > } > }); > DataSet<Tuple1<Integer>> > rightSide=env.readCsvFile("/home/hadoop/Desktop/Dataset/orders.csv") > .fieldDelimiter('|') > > .includeFields("010000000").ignoreFirstLine() > .types(Integer.class); > // DataSource<Integer> rightSide = env.fromElements(1,2,4, 5, 6, 7, 8, > 9, > 10); > DataSet<Tuple2<Tuple1<Integer>, String>> rightSide2 = > rightSide.map( > new MapFunction<Tuple1<Integer>, Tuple2<Tuple1<Integer>, > String>>() { > @Override > public Tuple2<Tuple1<Integer>, String> > map(Tuple1<Integer> x) throws Exception { > return new Tuple2<Tuple1<Integer>, String>(x, > "some other data"); > } > }); > DataSet<Tuple2<Tuple1<Integer>, Tuple1<Integer>>> leftOuterJoin = > leftSide2.coGroup(rightSide2) > .where(0) > .equalTo(0) > .with(new LeftOuterJoin()); > > leftOuterJoin.writeAsCsv("/home/hadoop/Desktop/Dataset/output1.csv", > "\n", "|");; > env.execute(); > > } > > > Error code After run programs > Exception in thread "main" > org.apache.flink.runtime.client.JobExecutionException: > org.apache.flink.types.NullFieldException: Field 1 is null, but expected to > hold a value. > at > > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:97) > at > > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30) > at > > org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:51) > at > > org.apache.flink.runtime.io.network.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:76) > at > > org.apache.flink.runtime.io.network.api.RecordWriter.emit(RecordWriter.java:82) > at > > org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:88) > at > > org.apache.flink.examples.java.relational.TPCHQuery3$LeftOuterJoin.coGroup(TPCHQuery3.java:38) > at > > org.apache.flink.runtime.operators.CoGroupDriver.run(CoGroupDriver.java:130) > at > > org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:484) > at > > org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:359) > at > > org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:235) > at java.lang.Thread.run(Thread.java:724) > > at > > org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:349) > at > org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:239) > at > > org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:51) > at > > org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:540) > at > > org.apache.flink.examples.java.relational.TPCHQuery3.main(TPCHQuery3.java:80) > > > > -- > View this message in context: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/error-when-eun-program-left-outer-join-tp1141.html > Sent from the Apache Flink User Mailing List archive. mailing list archive > at Nabble.com. >