Hi, If it's all right with you, I wanna try it.
Thanks Sincerely Rick -----Original Message----- From: Jean-Baptiste Onofré [mailto:[email protected]] Sent: Friday, October 20, 2017 1:32 PM To: [email protected] Subject: Re: How to use ConsoleIO SDK Hi, I started to work on the ConsoleIO (and SocketIO too), but it's not yet merged. I can provide a SNAPSHOT to you if you wanna try. Regards JB On 10/20/2017 04:14 AM, [email protected] wrote: > Dear sir, > > I have the question how to use the beam java sdk: ConsoleIO. > > My objective colored in background yellow is to write the PCollection > ”data” on Console, and then use it(type: RDD ??) as another variable to do > other works. > > If any further information is needed, I am glad to be informed and > will provide to you as soon as possible. > > I am looking forward to hearing from you. > > My java code is as: > > “ > > *import *java.io.IOException; > > *import*org.apache.beam.sdk.Pipeline; > > *import*org.apache.beam.sdk.options.PipelineOptionsFactory; > > *import*org.apache.beam.runners.spark.SparkRunner; > > *import*org.apache.beam.runners.spark.io.ConsoleIO; > > *import*org.apache.beam.runners.spark.SparkPipelineOptions; > > ** > > *import *org.apache.beam.sdk.transforms.Create; > > *import *org.apache.beam.sdk.values.KV; > > *import *org.apache.beam.sdk.values.PCollection; > > *import *org.apache.beam.sdk.values.TimestampedValue; > > ** > > *import *javafx.util.Pair;** > > ** > > *import*org.joda.time.Duration; > > *import*org.joda.time.Instant; > > *import*org.joda.time.MutableDateTime; > > *public**static**void*main(String[] args) *throws*IOException { > > MutableDateTime mutableNow= > Instant./now/().toMutableDateTime(); > > mutableNow.setDateTime(2017, 7, 12, 14, 0, 0, 0); > > Instant starttime= mutableNow.toInstant().plus(8*60*60*1000); > > *int*min; > > *int*sec; > > *int*millsec; > > min=2; > > sec=min*60; > > millsec=sec*1000; > > *double*[] value=*new**double*[] {1.0,2.0,3.0,4.0,5.0}; > > List<TimestampedValue<KV<String,Pair<Integer, Double>>>> > dataList= *new*ArrayList<>(); > > *int*n=value.length; > > *int*count=0; > > *for*(*int*i=0; i<n; i++) > > { > > count=count+1; > > *if*(i<=3) > > { > > Instant M1_time=starttime.plus(millsec*count); > > dataList.add(TimestampedValue./of/(KV./of/("M1", *new*Pair<Integer, > Double> (i,value[i])), M1_time)); > > } > > *else**if*(4<=i&& i<5) > > { > > Instant M2_time=starttime.plus(millsec*count); > > dataList.add(TimestampedValue./of/(KV./of/("M1", *new*Pair<Integer, > Double> (i,value[i])), M2_time)); > > } > > *else* > > { > > Instant M3_time=starttime.plus(millsec*count); > > dataList.add(TimestampedValue./of/(KV./of/("M1", *new*Pair<Integer, > Double> (i,value[i])), M3_time)); > > } > > System.*/out/*.println("raw_data="+dataList.get(i)); > > } > > SparkPipelineOptions options= > PipelineOptionsFactory./as/(SparkPipelineOptions.*class*); > > options.setRunner(SparkRunner.*class*); > > options.setSparkMaster("local[4]"); > > Pipeline p= Pipeline./create/(options); > > PCollection<KV<String,Pair<Integer, Double>>> data=p.apply("create > data with time",Create./timestamped/(dataList)); > > data.apply("spark_write_on_console",ConsoleIO.Write._out_); > > p.run().waitUntilFinish(); > > ” > > Thanks very much > > Sincerely yours, > > Liang-Sian Lin, Dr. > > Oct 20 2017 > > > > -- > 本信件可能包含工研院機密資訊,非指定之收件者,請勿使用或揭露本信件內容,並請銷毀 > 此信件。 This email may contain confidential information. Please do not > use or disclose it in any way and delete it if you are not the intended > recipient. -- Jean-Baptiste Onofré [email protected] http://blog.nanthrax.net Talend - http://www.talend.com -- 本信件可能包含工研院機密資訊,非指定之收件者,請勿使用或揭露本信件內容,並請銷毀此信件。 This email may contain confidential information. Please do not use or disclose it in any way and delete it if you are not the intended recipient.
