Hi,
I started to work on the ConsoleIO (and SocketIO too), but it's not yet merged.
I can provide a SNAPSHOT to you if you wanna try.
Regards
JB
On 10/20/2017 04:14 AM, [email protected] wrote:
Dear sir,
I have the question how to use the beam java sdk: ConsoleIO.
My objective colored in background yellow is to write the PCollection ”data” on
Console, and then use it(type: RDD ??) as another variable to do other works.
If any further information is needed, I am glad to be informed and will provide
to you as soon as possible.
I am looking forward to hearing from you.
My java code is as:
“
*import *java.io.IOException;
*import*org.apache.beam.sdk.Pipeline;
*import*org.apache.beam.sdk.options.PipelineOptionsFactory;
*import*org.apache.beam.runners.spark.SparkRunner;
*import*org.apache.beam.runners.spark.io.ConsoleIO;
*import*org.apache.beam.runners.spark.SparkPipelineOptions;
**
*import *org.apache.beam.sdk.transforms.Create;
*import *org.apache.beam.sdk.values.KV;
*import *org.apache.beam.sdk.values.PCollection;
*import *org.apache.beam.sdk.values.TimestampedValue;
**
*import *javafx.util.Pair;**
**
*import*org.joda.time.Duration;
*import*org.joda.time.Instant;
*import*org.joda.time.MutableDateTime;
*public**static**void*main(String[] args) *throws*IOException {
MutableDateTime mutableNow= Instant./now/().toMutableDateTime();
mutableNow.setDateTime(2017, 7, 12, 14, 0, 0, 0);
Instant starttime= mutableNow.toInstant().plus(8*60*60*1000);
*int*min;
*int*sec;
*int*millsec;
min=2;
sec=min*60;
millsec=sec*1000;
*double*[] value=*new**double*[] {1.0,2.0,3.0,4.0,5.0};
List<TimestampedValue<KV<String,Pair<Integer, Double>>>> dataList=
*new*ArrayList<>();
*int*n=value.length;
*int*count=0;
*for*(*int*i=0; i<n; i++)
{
count=count+1;
*if*(i<=3)
{
Instant M1_time=starttime.plus(millsec*count);
dataList.add(TimestampedValue./of/(KV./of/("M1", *new*Pair<Integer, Double>
(i,value[i])), M1_time));
}
*else**if*(4<=i&& i<5)
{
Instant M2_time=starttime.plus(millsec*count);
dataList.add(TimestampedValue./of/(KV./of/("M1", *new*Pair<Integer, Double>
(i,value[i])), M2_time));
}
*else*
{
Instant M3_time=starttime.plus(millsec*count);
dataList.add(TimestampedValue./of/(KV./of/("M1", *new*Pair<Integer, Double>
(i,value[i])), M3_time));
}
System.*/out/*.println("raw_data="+dataList.get(i));
}
SparkPipelineOptions options=
PipelineOptionsFactory./as/(SparkPipelineOptions.*class*);
options.setRunner(SparkRunner.*class*);
options.setSparkMaster("local[4]");
Pipeline p= Pipeline./create/(options);
PCollection<KV<String,Pair<Integer, Double>>> data=p.apply("create data with
time",Create./timestamped/(dataList));
data.apply("spark_write_on_console",ConsoleIO.Write._out_);
p.run().waitUntilFinish();
”
Thanks very much
Sincerely yours,
Liang-Sian Lin, Dr.
Oct 20 2017
--
本信件可能包含工研院機密資訊,非指定之收件者,請勿使用或揭露本信件內容,並請銷毀
此信件。 This email may contain confidential information. Please do not use or
disclose it in any way and delete it if you are not the intended recipient.
--
Jean-Baptiste Onofré
[email protected]
http://blog.nanthrax.net
Talend - http://www.talend.com