[jira] [Commented] (BEAM-1789) window can't not use in spark cluster module

2017-04-19 Thread tianyou (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1789?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15975999#comment-15975999
 ] 

tianyou commented on BEAM-1789:
---

I  change _Window.Bound> fixWindow = 
Window.> into(
FixedWindows.of(size)
);_ to _Window.>into(FixedWindows.of(size))
  .triggering(
  AfterWatermark.pastEndOfWindow()
  
.withLateFirings(AfterProcessingTime
  
.pastFirstElementInPane().plusDelayOf(Duration.standardHours(1
  
.withAllowedLateness(Duration.standardDays(1)).discardingFiredPanes()_ ,But I 
have the same conclusion

> window can't not use in spark cluster module
> 
>
> Key: BEAM-1789
> URL: https://issues.apache.org/jira/browse/BEAM-1789
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Reporter: tianyou
>Assignee: Amit Sela
>
>  I user beam in spark cluster,The application is blow.
> SparkPipelineOptions options = 
> PipelineOptionsFactory.as(SparkPipelineOptions.class);
> options.setRunner(SparkRunner.class);
> options.setEnableSparkMetricSinks(false);
> options.setStreaming(true);
> options.setSparkMaster("spark://10.100.124.205:6066");
> options.setAppName("Beam App Spark"+new Random().nextFloat());
> options.setJobName("Beam Job Spark"+new Random().nextFloat());
> System.out.println("App Name:"+options.getAppName());
> System.out.println("Job Name:"+options.getJobName());
> options.setMaxRecordsPerBatch(10L);
> 
> //  PipelineOptions options = PipelineOptionsFactory.create();
> Pipeline p = Pipeline.create(options);
> 
> //  Duration size = Duration.standardMinutes(4);
> long duration = 60;
> if(args!=null && args.length==1){
> duration = Integer.valueOf(args[0]);
> }
> Duration size = Duration.standardSeconds(duration);
> System.out.println("时间窗口为:["+duration+"]秒");
> Window.Bound> fixWindow = 
> Window.> into(
> FixedWindows.of(size)
> );
> 
> String kafkaAddress = "10.100.124.208:9093";
> //  String kafkaAddress = "192.168.100.212:9092";
> 
> Map kfConsunmerConf = new HashMap();
> kfConsunmerConf.put("auto.offset.reset", "latest");
> PCollection kafkaJsonPc = p.apply(KafkaIO. 
> read()
> .withBootstrapServers(kafkaAddress)
> .withTopics(ImmutableList.of("wypxx1"))
> .withKeyCoder(StringUtf8Coder.of()) 
> .withValueCoder(StringUtf8Coder.of())
> .updateConsumerProperties(kfConsunmerConf)
> .withoutMetadata() 
> ).apply(Values. create());
> 
> 
> PCollection> totalPc = kafkaJsonPc.apply(
> "count line",
> ParDo.of(new DoFn>() {
> @ProcessElement
>   public void processElement(ProcessContext c) {
> String line = c.element();
> Instant is = c.timestamp();
> if(line.length()>2)
>   line = line.substring(0,2);
> System.out.println(line + " " +  is.toString());
> c.output(KV.of(line, line));
>   }
>  })
> );
> 
> 
> PCollection> itPc = 
> totalPc.apply(fixWindow).apply(
> "group by appKey",
> GroupByKey.create()
> );
>   itPc.apply(ParDo.of(new DoFn, Void>() {
> @ProcessElement
> public void processElement(ProcessContext c) {
> KV keyIt = c.element();
> String key = keyIt.getKey();
> Iterable itb = keyIt.getValue();
> Iterator it = itb.iterator();
> StringBuilder sb = new StringBuilder();
> sb.append(key).append(":[");
> while(it.hasNext()){
> sb.append(it.next()).append(",");
> }
> String str = sb.toString();
> str = str.substring(0,str.length() -1) + "]";
> System.out.println(str);
> String filePath = "/data/wyp/sparktest.txt";
> String line 

[jira] [Issue Comment Deleted] (BEAM-1789) window can't not use in spark cluster module

2017-04-19 Thread tianyou (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1789?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

tianyou updated BEAM-1789:
--
Comment: was deleted

(was: I change *Window.Bound> fixWindow = 
Window.> into(
FixedWindows.of(size)
);* to *Window.>into(FixedWindows.of(size))
.triggering(
AfterWatermark.pastEndOfWindow()
.withLateFirings(AfterProcessingTime
.pastFirstElementInPane().plusDelayOf(Duration.standardHours(1
.withAllowedLateness(Duration.standardDays(1)).discardingFiredPanes()*  ,But I 
have the same conclusion)

> window can't not use in spark cluster module
> 
>
> Key: BEAM-1789
> URL: https://issues.apache.org/jira/browse/BEAM-1789
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Reporter: tianyou
>Assignee: Amit Sela
>
>  I user beam in spark cluster,The application is blow.
> SparkPipelineOptions options = 
> PipelineOptionsFactory.as(SparkPipelineOptions.class);
> options.setRunner(SparkRunner.class);
> options.setEnableSparkMetricSinks(false);
> options.setStreaming(true);
> options.setSparkMaster("spark://10.100.124.205:6066");
> options.setAppName("Beam App Spark"+new Random().nextFloat());
> options.setJobName("Beam Job Spark"+new Random().nextFloat());
> System.out.println("App Name:"+options.getAppName());
> System.out.println("Job Name:"+options.getJobName());
> options.setMaxRecordsPerBatch(10L);
> 
> //  PipelineOptions options = PipelineOptionsFactory.create();
> Pipeline p = Pipeline.create(options);
> 
> //  Duration size = Duration.standardMinutes(4);
> long duration = 60;
> if(args!=null && args.length==1){
> duration = Integer.valueOf(args[0]);
> }
> Duration size = Duration.standardSeconds(duration);
> System.out.println("时间窗口为:["+duration+"]秒");
> Window.Bound> fixWindow = 
> Window.> into(
> FixedWindows.of(size)
> );
> 
> String kafkaAddress = "10.100.124.208:9093";
> //  String kafkaAddress = "192.168.100.212:9092";
> 
> Map kfConsunmerConf = new HashMap();
> kfConsunmerConf.put("auto.offset.reset", "latest");
> PCollection kafkaJsonPc = p.apply(KafkaIO. 
> read()
> .withBootstrapServers(kafkaAddress)
> .withTopics(ImmutableList.of("wypxx1"))
> .withKeyCoder(StringUtf8Coder.of()) 
> .withValueCoder(StringUtf8Coder.of())
> .updateConsumerProperties(kfConsunmerConf)
> .withoutMetadata() 
> ).apply(Values. create());
> 
> 
> PCollection> totalPc = kafkaJsonPc.apply(
> "count line",
> ParDo.of(new DoFn>() {
> @ProcessElement
>   public void processElement(ProcessContext c) {
> String line = c.element();
> Instant is = c.timestamp();
> if(line.length()>2)
>   line = line.substring(0,2);
> System.out.println(line + " " +  is.toString());
> c.output(KV.of(line, line));
>   }
>  })
> );
> 
> 
> PCollection> itPc = 
> totalPc.apply(fixWindow).apply(
> "group by appKey",
> GroupByKey.create()
> );
>   itPc.apply(ParDo.of(new DoFn, Void>() {
> @ProcessElement
> public void processElement(ProcessContext c) {
> KV keyIt = c.element();
> String key = keyIt.getKey();
> Iterable itb = keyIt.getValue();
> Iterator it = itb.iterator();
> StringBuilder sb = new StringBuilder();
> sb.append(key).append(":[");
> while(it.hasNext()){
> sb.append(it.next()).append(",");
> }
> String str = sb.toString();
> str = str.substring(0,str.length() -1) + "]";
> System.out.println(str);
> String filePath = "/data/wyp/sparktest.txt";
> String line = "word-->["+key+"]total 
> count="+str+"--->time+"+c.timestamp().toString();
> System.out.println("writefile->"+line);
> FileUtil.write(filePath, line, true, true);
>   

[jira] [Created] (BEAM-2021) Fix Java's Coder class hierarchy

2017-04-19 Thread Kenneth Knowles (JIRA)
Kenneth Knowles created BEAM-2021:
-

 Summary: Fix Java's Coder class hierarchy
 Key: BEAM-2021
 URL: https://issues.apache.org/jira/browse/BEAM-2021
 Project: Beam
  Issue Type: Improvement
  Components: beam-model-runner-api, sdk-java-core
Affects Versions: First stable release
Reporter: Kenneth Knowles
Assignee: Thomas Groh


This is thoroughly out of hand. In the runner API world, there are two paths:

1. URN plus component coders plus custom payload (in the form of component 
coders alongside an SdkFunctionSpec)
2. Custom coder (a single URN) and payload is serialized Java. I think this 
never has component coders.

The other base classes have now been shown to be extraneous: they favor saving 
~3 lines of boilerplate for rarely written code at the cost of readability. 
Instead they should just be dropped.

The custom payload is an Any proto in the runner API. But tying the Coder 
interface to proto would be unfortunate from a design perspective and cannot be 
done anyhow due to dependency hell.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Resolved] (BEAM-972) Add basic level of unit testing to gearpump runner

2017-04-19 Thread Manu Zhang (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-972?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manu Zhang resolved BEAM-972.
-
   Resolution: Fixed
Fix Version/s: Not applicable

> Add basic level of unit testing to gearpump runner
> --
>
> Key: BEAM-972
> URL: https://issues.apache.org/jira/browse/BEAM-972
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-gearpump, testing
>Reporter: Manu Zhang
>Assignee: Huafeng Wang
> Fix For: Not applicable
>
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Resolved] (BEAM-973) Add end user and developer documentation to gearpump-runner

2017-04-19 Thread Manu Zhang (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-973?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manu Zhang resolved BEAM-973.
-
   Resolution: Fixed
Fix Version/s: Not applicable

> Add end user and developer documentation to gearpump-runner
> ---
>
> Key: BEAM-973
> URL: https://issues.apache.org/jira/browse/BEAM-973
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-gearpump
>Reporter: Manu Zhang
>Assignee: Manu Zhang
> Fix For: Not applicable
>
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (BEAM-1789) window can't not use in spark cluster module

2017-04-19 Thread tianyou (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1789?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15975992#comment-15975992
 ] 

tianyou commented on BEAM-1789:
---

*黑体*

> window can't not use in spark cluster module
> 
>
> Key: BEAM-1789
> URL: https://issues.apache.org/jira/browse/BEAM-1789
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Reporter: tianyou
>Assignee: Amit Sela
>
>  I user beam in spark cluster,The application is blow.
> SparkPipelineOptions options = 
> PipelineOptionsFactory.as(SparkPipelineOptions.class);
> options.setRunner(SparkRunner.class);
> options.setEnableSparkMetricSinks(false);
> options.setStreaming(true);
> options.setSparkMaster("spark://10.100.124.205:6066");
> options.setAppName("Beam App Spark"+new Random().nextFloat());
> options.setJobName("Beam Job Spark"+new Random().nextFloat());
> System.out.println("App Name:"+options.getAppName());
> System.out.println("Job Name:"+options.getJobName());
> options.setMaxRecordsPerBatch(10L);
> 
> //  PipelineOptions options = PipelineOptionsFactory.create();
> Pipeline p = Pipeline.create(options);
> 
> //  Duration size = Duration.standardMinutes(4);
> long duration = 60;
> if(args!=null && args.length==1){
> duration = Integer.valueOf(args[0]);
> }
> Duration size = Duration.standardSeconds(duration);
> System.out.println("时间窗口为:["+duration+"]秒");
> Window.Bound> fixWindow = 
> Window.> into(
> FixedWindows.of(size)
> );
> 
> String kafkaAddress = "10.100.124.208:9093";
> //  String kafkaAddress = "192.168.100.212:9092";
> 
> Map kfConsunmerConf = new HashMap();
> kfConsunmerConf.put("auto.offset.reset", "latest");
> PCollection kafkaJsonPc = p.apply(KafkaIO. 
> read()
> .withBootstrapServers(kafkaAddress)
> .withTopics(ImmutableList.of("wypxx1"))
> .withKeyCoder(StringUtf8Coder.of()) 
> .withValueCoder(StringUtf8Coder.of())
> .updateConsumerProperties(kfConsunmerConf)
> .withoutMetadata() 
> ).apply(Values. create());
> 
> 
> PCollection> totalPc = kafkaJsonPc.apply(
> "count line",
> ParDo.of(new DoFn>() {
> @ProcessElement
>   public void processElement(ProcessContext c) {
> String line = c.element();
> Instant is = c.timestamp();
> if(line.length()>2)
>   line = line.substring(0,2);
> System.out.println(line + " " +  is.toString());
> c.output(KV.of(line, line));
>   }
>  })
> );
> 
> 
> PCollection> itPc = 
> totalPc.apply(fixWindow).apply(
> "group by appKey",
> GroupByKey.create()
> );
>   itPc.apply(ParDo.of(new DoFn, Void>() {
> @ProcessElement
> public void processElement(ProcessContext c) {
> KV keyIt = c.element();
> String key = keyIt.getKey();
> Iterable itb = keyIt.getValue();
> Iterator it = itb.iterator();
> StringBuilder sb = new StringBuilder();
> sb.append(key).append(":[");
> while(it.hasNext()){
> sb.append(it.next()).append(",");
> }
> String str = sb.toString();
> str = str.substring(0,str.length() -1) + "]";
> System.out.println(str);
> String filePath = "/data/wyp/sparktest.txt";
> String line = "word-->["+key+"]total 
> count="+str+"--->time+"+c.timestamp().toString();
> System.out.println("writefile->"+line);
> FileUtil.write(filePath, line, true, true);
> }
> 
>  }));
>   
> p.run().waitUntilFinish();
> When I user submit application to spark cluster.In spark UI,I can see log of  
> totalPc PCollection  of. after one miniter but I can.t see log of itPc 
> PCollection.
> I use local mode spark,It work well.
> Please help me to resovle this proplems.Thanks!



--
This message was sent by Atlassian JIRA

[jira] [Created] (BEAM-2020) Move CloudObject to Dataflow runner

2017-04-19 Thread Kenneth Knowles (JIRA)
Kenneth Knowles created BEAM-2020:
-

 Summary: Move CloudObject to Dataflow runner
 Key: BEAM-2020
 URL: https://issues.apache.org/jira/browse/BEAM-2020
 Project: Beam
  Issue Type: Improvement
  Components: beam-model-runner-api, sdk-java-core
Reporter: Kenneth Knowles
Assignee: Thomas Groh
 Fix For: First stable release


This entails primarily eliminating Coder.asCloudObject() by adding the needed 
accessors, and possibly a serialization registrar discipline, for coders in the 
Runner API proto.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Issue Comment Deleted] (BEAM-1789) window can't not use in spark cluster module

2017-04-19 Thread tianyou (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1789?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

tianyou updated BEAM-1789:
--
Comment: was deleted

(was: *黑体*)

> window can't not use in spark cluster module
> 
>
> Key: BEAM-1789
> URL: https://issues.apache.org/jira/browse/BEAM-1789
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Reporter: tianyou
>Assignee: Amit Sela
>
>  I user beam in spark cluster,The application is blow.
> SparkPipelineOptions options = 
> PipelineOptionsFactory.as(SparkPipelineOptions.class);
> options.setRunner(SparkRunner.class);
> options.setEnableSparkMetricSinks(false);
> options.setStreaming(true);
> options.setSparkMaster("spark://10.100.124.205:6066");
> options.setAppName("Beam App Spark"+new Random().nextFloat());
> options.setJobName("Beam Job Spark"+new Random().nextFloat());
> System.out.println("App Name:"+options.getAppName());
> System.out.println("Job Name:"+options.getJobName());
> options.setMaxRecordsPerBatch(10L);
> 
> //  PipelineOptions options = PipelineOptionsFactory.create();
> Pipeline p = Pipeline.create(options);
> 
> //  Duration size = Duration.standardMinutes(4);
> long duration = 60;
> if(args!=null && args.length==1){
> duration = Integer.valueOf(args[0]);
> }
> Duration size = Duration.standardSeconds(duration);
> System.out.println("时间窗口为:["+duration+"]秒");
> Window.Bound> fixWindow = 
> Window.> into(
> FixedWindows.of(size)
> );
> 
> String kafkaAddress = "10.100.124.208:9093";
> //  String kafkaAddress = "192.168.100.212:9092";
> 
> Map kfConsunmerConf = new HashMap();
> kfConsunmerConf.put("auto.offset.reset", "latest");
> PCollection kafkaJsonPc = p.apply(KafkaIO. 
> read()
> .withBootstrapServers(kafkaAddress)
> .withTopics(ImmutableList.of("wypxx1"))
> .withKeyCoder(StringUtf8Coder.of()) 
> .withValueCoder(StringUtf8Coder.of())
> .updateConsumerProperties(kfConsunmerConf)
> .withoutMetadata() 
> ).apply(Values. create());
> 
> 
> PCollection> totalPc = kafkaJsonPc.apply(
> "count line",
> ParDo.of(new DoFn>() {
> @ProcessElement
>   public void processElement(ProcessContext c) {
> String line = c.element();
> Instant is = c.timestamp();
> if(line.length()>2)
>   line = line.substring(0,2);
> System.out.println(line + " " +  is.toString());
> c.output(KV.of(line, line));
>   }
>  })
> );
> 
> 
> PCollection> itPc = 
> totalPc.apply(fixWindow).apply(
> "group by appKey",
> GroupByKey.create()
> );
>   itPc.apply(ParDo.of(new DoFn, Void>() {
> @ProcessElement
> public void processElement(ProcessContext c) {
> KV keyIt = c.element();
> String key = keyIt.getKey();
> Iterable itb = keyIt.getValue();
> Iterator it = itb.iterator();
> StringBuilder sb = new StringBuilder();
> sb.append(key).append(":[");
> while(it.hasNext()){
> sb.append(it.next()).append(",");
> }
> String str = sb.toString();
> str = str.substring(0,str.length() -1) + "]";
> System.out.println(str);
> String filePath = "/data/wyp/sparktest.txt";
> String line = "word-->["+key+"]total 
> count="+str+"--->time+"+c.timestamp().toString();
> System.out.println("writefile->"+line);
> FileUtil.write(filePath, line, true, true);
> }
> 
>  }));
>   
> p.run().waitUntilFinish();
> When I user submit application to spark cluster.In spark UI,I can see log of  
> totalPc PCollection  of. after one miniter but I can.t see log of itPc 
> PCollection.
> I use local mode spark,It work well.
> Please help me to resovle this proplems.Thanks!



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Issue Comment Deleted] (BEAM-1789) window can't not use in spark cluster module

2017-04-19 Thread tianyou (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1789?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

tianyou updated BEAM-1789:
--
Comment: was deleted

(was: ??ffdsafds??)

> window can't not use in spark cluster module
> 
>
> Key: BEAM-1789
> URL: https://issues.apache.org/jira/browse/BEAM-1789
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Reporter: tianyou
>Assignee: Amit Sela
>
>  I user beam in spark cluster,The application is blow.
> SparkPipelineOptions options = 
> PipelineOptionsFactory.as(SparkPipelineOptions.class);
> options.setRunner(SparkRunner.class);
> options.setEnableSparkMetricSinks(false);
> options.setStreaming(true);
> options.setSparkMaster("spark://10.100.124.205:6066");
> options.setAppName("Beam App Spark"+new Random().nextFloat());
> options.setJobName("Beam Job Spark"+new Random().nextFloat());
> System.out.println("App Name:"+options.getAppName());
> System.out.println("Job Name:"+options.getJobName());
> options.setMaxRecordsPerBatch(10L);
> 
> //  PipelineOptions options = PipelineOptionsFactory.create();
> Pipeline p = Pipeline.create(options);
> 
> //  Duration size = Duration.standardMinutes(4);
> long duration = 60;
> if(args!=null && args.length==1){
> duration = Integer.valueOf(args[0]);
> }
> Duration size = Duration.standardSeconds(duration);
> System.out.println("时间窗口为:["+duration+"]秒");
> Window.Bound> fixWindow = 
> Window.> into(
> FixedWindows.of(size)
> );
> 
> String kafkaAddress = "10.100.124.208:9093";
> //  String kafkaAddress = "192.168.100.212:9092";
> 
> Map kfConsunmerConf = new HashMap();
> kfConsunmerConf.put("auto.offset.reset", "latest");
> PCollection kafkaJsonPc = p.apply(KafkaIO. 
> read()
> .withBootstrapServers(kafkaAddress)
> .withTopics(ImmutableList.of("wypxx1"))
> .withKeyCoder(StringUtf8Coder.of()) 
> .withValueCoder(StringUtf8Coder.of())
> .updateConsumerProperties(kfConsunmerConf)
> .withoutMetadata() 
> ).apply(Values. create());
> 
> 
> PCollection> totalPc = kafkaJsonPc.apply(
> "count line",
> ParDo.of(new DoFn>() {
> @ProcessElement
>   public void processElement(ProcessContext c) {
> String line = c.element();
> Instant is = c.timestamp();
> if(line.length()>2)
>   line = line.substring(0,2);
> System.out.println(line + " " +  is.toString());
> c.output(KV.of(line, line));
>   }
>  })
> );
> 
> 
> PCollection> itPc = 
> totalPc.apply(fixWindow).apply(
> "group by appKey",
> GroupByKey.create()
> );
>   itPc.apply(ParDo.of(new DoFn, Void>() {
> @ProcessElement
> public void processElement(ProcessContext c) {
> KV keyIt = c.element();
> String key = keyIt.getKey();
> Iterable itb = keyIt.getValue();
> Iterator it = itb.iterator();
> StringBuilder sb = new StringBuilder();
> sb.append(key).append(":[");
> while(it.hasNext()){
> sb.append(it.next()).append(",");
> }
> String str = sb.toString();
> str = str.substring(0,str.length() -1) + "]";
> System.out.println(str);
> String filePath = "/data/wyp/sparktest.txt";
> String line = "word-->["+key+"]total 
> count="+str+"--->time+"+c.timestamp().toString();
> System.out.println("writefile->"+line);
> FileUtil.write(filePath, line, true, true);
> }
> 
>  }));
>   
> p.run().waitUntilFinish();
> When I user submit application to spark cluster.In spark UI,I can see log of  
> totalPc PCollection  of. after one miniter but I can.t see log of itPc 
> PCollection.
> I use local mode spark,It work well.
> Please help me to resovle this proplems.Thanks!



--
This message was sent by Atlassian JIRA

[jira] [Assigned] (BEAM-2020) Move CloudObject to Dataflow runner

2017-04-19 Thread Kenneth Knowles (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2020?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kenneth Knowles reassigned BEAM-2020:
-

Assignee: Luke Cwik  (was: Thomas Groh)

> Move CloudObject to Dataflow runner
> ---
>
> Key: BEAM-2020
> URL: https://issues.apache.org/jira/browse/BEAM-2020
> Project: Beam
>  Issue Type: Improvement
>  Components: beam-model-runner-api, sdk-java-core
>Reporter: Kenneth Knowles
>Assignee: Luke Cwik
> Fix For: First stable release
>
>
> This entails primarily eliminating Coder.asCloudObject() by adding the needed 
> accessors, and possibly a serialization registrar discipline, for coders in 
> the Runner API proto.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (BEAM-1789) window can't not use in spark cluster module

2017-04-19 Thread tianyou (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1789?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15975996#comment-15975996
 ] 

tianyou commented on BEAM-1789:
---

??ffdsafds??

> window can't not use in spark cluster module
> 
>
> Key: BEAM-1789
> URL: https://issues.apache.org/jira/browse/BEAM-1789
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Reporter: tianyou
>Assignee: Amit Sela
>
>  I user beam in spark cluster,The application is blow.
> SparkPipelineOptions options = 
> PipelineOptionsFactory.as(SparkPipelineOptions.class);
> options.setRunner(SparkRunner.class);
> options.setEnableSparkMetricSinks(false);
> options.setStreaming(true);
> options.setSparkMaster("spark://10.100.124.205:6066");
> options.setAppName("Beam App Spark"+new Random().nextFloat());
> options.setJobName("Beam Job Spark"+new Random().nextFloat());
> System.out.println("App Name:"+options.getAppName());
> System.out.println("Job Name:"+options.getJobName());
> options.setMaxRecordsPerBatch(10L);
> 
> //  PipelineOptions options = PipelineOptionsFactory.create();
> Pipeline p = Pipeline.create(options);
> 
> //  Duration size = Duration.standardMinutes(4);
> long duration = 60;
> if(args!=null && args.length==1){
> duration = Integer.valueOf(args[0]);
> }
> Duration size = Duration.standardSeconds(duration);
> System.out.println("时间窗口为:["+duration+"]秒");
> Window.Bound> fixWindow = 
> Window.> into(
> FixedWindows.of(size)
> );
> 
> String kafkaAddress = "10.100.124.208:9093";
> //  String kafkaAddress = "192.168.100.212:9092";
> 
> Map kfConsunmerConf = new HashMap();
> kfConsunmerConf.put("auto.offset.reset", "latest");
> PCollection kafkaJsonPc = p.apply(KafkaIO. 
> read()
> .withBootstrapServers(kafkaAddress)
> .withTopics(ImmutableList.of("wypxx1"))
> .withKeyCoder(StringUtf8Coder.of()) 
> .withValueCoder(StringUtf8Coder.of())
> .updateConsumerProperties(kfConsunmerConf)
> .withoutMetadata() 
> ).apply(Values. create());
> 
> 
> PCollection> totalPc = kafkaJsonPc.apply(
> "count line",
> ParDo.of(new DoFn>() {
> @ProcessElement
>   public void processElement(ProcessContext c) {
> String line = c.element();
> Instant is = c.timestamp();
> if(line.length()>2)
>   line = line.substring(0,2);
> System.out.println(line + " " +  is.toString());
> c.output(KV.of(line, line));
>   }
>  })
> );
> 
> 
> PCollection> itPc = 
> totalPc.apply(fixWindow).apply(
> "group by appKey",
> GroupByKey.create()
> );
>   itPc.apply(ParDo.of(new DoFn, Void>() {
> @ProcessElement
> public void processElement(ProcessContext c) {
> KV keyIt = c.element();
> String key = keyIt.getKey();
> Iterable itb = keyIt.getValue();
> Iterator it = itb.iterator();
> StringBuilder sb = new StringBuilder();
> sb.append(key).append(":[");
> while(it.hasNext()){
> sb.append(it.next()).append(",");
> }
> String str = sb.toString();
> str = str.substring(0,str.length() -1) + "]";
> System.out.println(str);
> String filePath = "/data/wyp/sparktest.txt";
> String line = "word-->["+key+"]total 
> count="+str+"--->time+"+c.timestamp().toString();
> System.out.println("writefile->"+line);
> FileUtil.write(filePath, line, true, true);
> }
> 
>  }));
>   
> p.run().waitUntilFinish();
> When I user submit application to spark cluster.In spark UI,I can see log of  
> totalPc PCollection  of. after one miniter but I can.t see log of itPc 
> PCollection.
> I use local mode spark,It work well.
> Please help me to resovle this proplems.Thanks!



--
This message was sent by Atlassian JIRA

[jira] [Issue Comment Deleted] (BEAM-1789) window can't not use in spark cluster module

2017-04-19 Thread tianyou (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1789?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

tianyou updated BEAM-1789:
--
Comment: was deleted

(was: I change *Window.Bound> fixWindow = 
Window.> into(FixedWindows.of(size));* to 
*Window.>into(FixedWindows.of(size)).triggering(AfterWatermark.pastEndOfWindow().withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardHours(1.withAllowedLateness(Duration.standardDays(1)).discardingFiredPanes()*
 ,But I have the same conclusion)

> window can't not use in spark cluster module
> 
>
> Key: BEAM-1789
> URL: https://issues.apache.org/jira/browse/BEAM-1789
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Reporter: tianyou
>Assignee: Amit Sela
>
>  I user beam in spark cluster,The application is blow.
> SparkPipelineOptions options = 
> PipelineOptionsFactory.as(SparkPipelineOptions.class);
> options.setRunner(SparkRunner.class);
> options.setEnableSparkMetricSinks(false);
> options.setStreaming(true);
> options.setSparkMaster("spark://10.100.124.205:6066");
> options.setAppName("Beam App Spark"+new Random().nextFloat());
> options.setJobName("Beam Job Spark"+new Random().nextFloat());
> System.out.println("App Name:"+options.getAppName());
> System.out.println("Job Name:"+options.getJobName());
> options.setMaxRecordsPerBatch(10L);
> 
> //  PipelineOptions options = PipelineOptionsFactory.create();
> Pipeline p = Pipeline.create(options);
> 
> //  Duration size = Duration.standardMinutes(4);
> long duration = 60;
> if(args!=null && args.length==1){
> duration = Integer.valueOf(args[0]);
> }
> Duration size = Duration.standardSeconds(duration);
> System.out.println("时间窗口为:["+duration+"]秒");
> Window.Bound> fixWindow = 
> Window.> into(
> FixedWindows.of(size)
> );
> 
> String kafkaAddress = "10.100.124.208:9093";
> //  String kafkaAddress = "192.168.100.212:9092";
> 
> Map kfConsunmerConf = new HashMap();
> kfConsunmerConf.put("auto.offset.reset", "latest");
> PCollection kafkaJsonPc = p.apply(KafkaIO. 
> read()
> .withBootstrapServers(kafkaAddress)
> .withTopics(ImmutableList.of("wypxx1"))
> .withKeyCoder(StringUtf8Coder.of()) 
> .withValueCoder(StringUtf8Coder.of())
> .updateConsumerProperties(kfConsunmerConf)
> .withoutMetadata() 
> ).apply(Values. create());
> 
> 
> PCollection> totalPc = kafkaJsonPc.apply(
> "count line",
> ParDo.of(new DoFn>() {
> @ProcessElement
>   public void processElement(ProcessContext c) {
> String line = c.element();
> Instant is = c.timestamp();
> if(line.length()>2)
>   line = line.substring(0,2);
> System.out.println(line + " " +  is.toString());
> c.output(KV.of(line, line));
>   }
>  })
> );
> 
> 
> PCollection> itPc = 
> totalPc.apply(fixWindow).apply(
> "group by appKey",
> GroupByKey.create()
> );
>   itPc.apply(ParDo.of(new DoFn, Void>() {
> @ProcessElement
> public void processElement(ProcessContext c) {
> KV keyIt = c.element();
> String key = keyIt.getKey();
> Iterable itb = keyIt.getValue();
> Iterator it = itb.iterator();
> StringBuilder sb = new StringBuilder();
> sb.append(key).append(":[");
> while(it.hasNext()){
> sb.append(it.next()).append(",");
> }
> String str = sb.toString();
> str = str.substring(0,str.length() -1) + "]";
> System.out.println(str);
> String filePath = "/data/wyp/sparktest.txt";
> String line = "word-->["+key+"]total 
> count="+str+"--->time+"+c.timestamp().toString();
> System.out.println("writefile->"+line);
> FileUtil.write(filePath, line, true, true);
> }
>  

[jira] [Commented] (BEAM-1789) window can't not use in spark cluster module

2017-04-19 Thread tianyou (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1789?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15976002#comment-15976002
 ] 

tianyou commented on BEAM-1789:
---

I change *Window.Bound> fixWindow = 
Window.> into(FixedWindows.of(size));* to 
*Window.>into(FixedWindows.of(size)).triggering(AfterWatermark.pastEndOfWindow().withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardHours(1.withAllowedLateness(Duration.standardDays(1)).discardingFiredPanes()*
 ,But I have the same conclusion

> window can't not use in spark cluster module
> 
>
> Key: BEAM-1789
> URL: https://issues.apache.org/jira/browse/BEAM-1789
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Reporter: tianyou
>Assignee: Amit Sela
>
>  I user beam in spark cluster,The application is blow.
> SparkPipelineOptions options = 
> PipelineOptionsFactory.as(SparkPipelineOptions.class);
> options.setRunner(SparkRunner.class);
> options.setEnableSparkMetricSinks(false);
> options.setStreaming(true);
> options.setSparkMaster("spark://10.100.124.205:6066");
> options.setAppName("Beam App Spark"+new Random().nextFloat());
> options.setJobName("Beam Job Spark"+new Random().nextFloat());
> System.out.println("App Name:"+options.getAppName());
> System.out.println("Job Name:"+options.getJobName());
> options.setMaxRecordsPerBatch(10L);
> 
> //  PipelineOptions options = PipelineOptionsFactory.create();
> Pipeline p = Pipeline.create(options);
> 
> //  Duration size = Duration.standardMinutes(4);
> long duration = 60;
> if(args!=null && args.length==1){
> duration = Integer.valueOf(args[0]);
> }
> Duration size = Duration.standardSeconds(duration);
> System.out.println("时间窗口为:["+duration+"]秒");
> Window.Bound> fixWindow = 
> Window.> into(
> FixedWindows.of(size)
> );
> 
> String kafkaAddress = "10.100.124.208:9093";
> //  String kafkaAddress = "192.168.100.212:9092";
> 
> Map kfConsunmerConf = new HashMap();
> kfConsunmerConf.put("auto.offset.reset", "latest");
> PCollection kafkaJsonPc = p.apply(KafkaIO. 
> read()
> .withBootstrapServers(kafkaAddress)
> .withTopics(ImmutableList.of("wypxx1"))
> .withKeyCoder(StringUtf8Coder.of()) 
> .withValueCoder(StringUtf8Coder.of())
> .updateConsumerProperties(kfConsunmerConf)
> .withoutMetadata() 
> ).apply(Values. create());
> 
> 
> PCollection> totalPc = kafkaJsonPc.apply(
> "count line",
> ParDo.of(new DoFn>() {
> @ProcessElement
>   public void processElement(ProcessContext c) {
> String line = c.element();
> Instant is = c.timestamp();
> if(line.length()>2)
>   line = line.substring(0,2);
> System.out.println(line + " " +  is.toString());
> c.output(KV.of(line, line));
>   }
>  })
> );
> 
> 
> PCollection> itPc = 
> totalPc.apply(fixWindow).apply(
> "group by appKey",
> GroupByKey.create()
> );
>   itPc.apply(ParDo.of(new DoFn, Void>() {
> @ProcessElement
> public void processElement(ProcessContext c) {
> KV keyIt = c.element();
> String key = keyIt.getKey();
> Iterable itb = keyIt.getValue();
> Iterator it = itb.iterator();
> StringBuilder sb = new StringBuilder();
> sb.append(key).append(":[");
> while(it.hasNext()){
> sb.append(it.next()).append(",");
> }
> String str = sb.toString();
> str = str.substring(0,str.length() -1) + "]";
> System.out.println(str);
> String filePath = "/data/wyp/sparktest.txt";
> String line = "word-->["+key+"]total 
> count="+str+"--->time+"+c.timestamp().toString();
> System.out.println("writefile->"+line);
> FileUtil.write(filePath, line, true, true);
>  

[jira] [Comment Edited] (BEAM-1789) window can't not use in spark cluster module

2017-04-19 Thread tianyou (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1789?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15976003#comment-15976003
 ] 

tianyou edited comment on BEAM-1789 at 4/20/17 3:31 AM:


I change _*Window.Bound> fixWindow = 
Window.> into(FixedWindows.of(size));*_ to 
_*Window.>into(FixedWindows.of(size)).triggering(AfterWatermark.pastEndOfWindow().withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardHours(1.withAllowedLateness(Duration.standardDays(1)).discardingFiredPanes()*_
 ,But I have the same conclusion


was (Author: tianyou):
I change _Window.Bound> fixWindow = 
Window.> into(FixedWindows.of(size));_ to 
_Window.>into(FixedWindows.of(size)).triggering(AfterWatermark.pastEndOfWindow().withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardHours(1.withAllowedLateness(Duration.standardDays(1)).discardingFiredPanes()_
 ,But I have the same conclusion

> window can't not use in spark cluster module
> 
>
> Key: BEAM-1789
> URL: https://issues.apache.org/jira/browse/BEAM-1789
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Reporter: tianyou
>Assignee: Amit Sela
>
>  I user beam in spark cluster,The application is blow.
> SparkPipelineOptions options = 
> PipelineOptionsFactory.as(SparkPipelineOptions.class);
> options.setRunner(SparkRunner.class);
> options.setEnableSparkMetricSinks(false);
> options.setStreaming(true);
> options.setSparkMaster("spark://10.100.124.205:6066");
> options.setAppName("Beam App Spark"+new Random().nextFloat());
> options.setJobName("Beam Job Spark"+new Random().nextFloat());
> System.out.println("App Name:"+options.getAppName());
> System.out.println("Job Name:"+options.getJobName());
> options.setMaxRecordsPerBatch(10L);
> 
> //  PipelineOptions options = PipelineOptionsFactory.create();
> Pipeline p = Pipeline.create(options);
> 
> //  Duration size = Duration.standardMinutes(4);
> long duration = 60;
> if(args!=null && args.length==1){
> duration = Integer.valueOf(args[0]);
> }
> Duration size = Duration.standardSeconds(duration);
> System.out.println("时间窗口为:["+duration+"]秒");
> Window.Bound> fixWindow = 
> Window.> into(
> FixedWindows.of(size)
> );
> 
> String kafkaAddress = "10.100.124.208:9093";
> //  String kafkaAddress = "192.168.100.212:9092";
> 
> Map kfConsunmerConf = new HashMap();
> kfConsunmerConf.put("auto.offset.reset", "latest");
> PCollection kafkaJsonPc = p.apply(KafkaIO. 
> read()
> .withBootstrapServers(kafkaAddress)
> .withTopics(ImmutableList.of("wypxx1"))
> .withKeyCoder(StringUtf8Coder.of()) 
> .withValueCoder(StringUtf8Coder.of())
> .updateConsumerProperties(kfConsunmerConf)
> .withoutMetadata() 
> ).apply(Values. create());
> 
> 
> PCollection> totalPc = kafkaJsonPc.apply(
> "count line",
> ParDo.of(new DoFn>() {
> @ProcessElement
>   public void processElement(ProcessContext c) {
> String line = c.element();
> Instant is = c.timestamp();
> if(line.length()>2)
>   line = line.substring(0,2);
> System.out.println(line + " " +  is.toString());
> c.output(KV.of(line, line));
>   }
>  })
> );
> 
> 
> PCollection> itPc = 
> totalPc.apply(fixWindow).apply(
> "group by appKey",
> GroupByKey.create()
> );
>   itPc.apply(ParDo.of(new DoFn, Void>() {
> @ProcessElement
> public void processElement(ProcessContext c) {
> KV keyIt = c.element();
> String key = keyIt.getKey();
> Iterable itb = keyIt.getValue();
> Iterator it = itb.iterator();
> StringBuilder sb = new StringBuilder();
> sb.append(key).append(":[");
> while(it.hasNext()){
> sb.append(it.next()).append(",");
>

[jira] [Commented] (BEAM-1789) window can't not use in spark cluster module

2017-04-19 Thread tianyou (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1789?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15976003#comment-15976003
 ] 

tianyou commented on BEAM-1789:
---

I change _Window.Bound> fixWindow = 
Window.> into(FixedWindows.of(size));_ to 
_Window.>into(FixedWindows.of(size)).triggering(AfterWatermark.pastEndOfWindow().withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardHours(1.withAllowedLateness(Duration.standardDays(1)).discardingFiredPanes()_
 ,But I have the same conclusion

> window can't not use in spark cluster module
> 
>
> Key: BEAM-1789
> URL: https://issues.apache.org/jira/browse/BEAM-1789
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Reporter: tianyou
>Assignee: Amit Sela
>
>  I user beam in spark cluster,The application is blow.
> SparkPipelineOptions options = 
> PipelineOptionsFactory.as(SparkPipelineOptions.class);
> options.setRunner(SparkRunner.class);
> options.setEnableSparkMetricSinks(false);
> options.setStreaming(true);
> options.setSparkMaster("spark://10.100.124.205:6066");
> options.setAppName("Beam App Spark"+new Random().nextFloat());
> options.setJobName("Beam Job Spark"+new Random().nextFloat());
> System.out.println("App Name:"+options.getAppName());
> System.out.println("Job Name:"+options.getJobName());
> options.setMaxRecordsPerBatch(10L);
> 
> //  PipelineOptions options = PipelineOptionsFactory.create();
> Pipeline p = Pipeline.create(options);
> 
> //  Duration size = Duration.standardMinutes(4);
> long duration = 60;
> if(args!=null && args.length==1){
> duration = Integer.valueOf(args[0]);
> }
> Duration size = Duration.standardSeconds(duration);
> System.out.println("时间窗口为:["+duration+"]秒");
> Window.Bound> fixWindow = 
> Window.> into(
> FixedWindows.of(size)
> );
> 
> String kafkaAddress = "10.100.124.208:9093";
> //  String kafkaAddress = "192.168.100.212:9092";
> 
> Map kfConsunmerConf = new HashMap();
> kfConsunmerConf.put("auto.offset.reset", "latest");
> PCollection kafkaJsonPc = p.apply(KafkaIO. 
> read()
> .withBootstrapServers(kafkaAddress)
> .withTopics(ImmutableList.of("wypxx1"))
> .withKeyCoder(StringUtf8Coder.of()) 
> .withValueCoder(StringUtf8Coder.of())
> .updateConsumerProperties(kfConsunmerConf)
> .withoutMetadata() 
> ).apply(Values. create());
> 
> 
> PCollection> totalPc = kafkaJsonPc.apply(
> "count line",
> ParDo.of(new DoFn>() {
> @ProcessElement
>   public void processElement(ProcessContext c) {
> String line = c.element();
> Instant is = c.timestamp();
> if(line.length()>2)
>   line = line.substring(0,2);
> System.out.println(line + " " +  is.toString());
> c.output(KV.of(line, line));
>   }
>  })
> );
> 
> 
> PCollection> itPc = 
> totalPc.apply(fixWindow).apply(
> "group by appKey",
> GroupByKey.create()
> );
>   itPc.apply(ParDo.of(new DoFn, Void>() {
> @ProcessElement
> public void processElement(ProcessContext c) {
> KV keyIt = c.element();
> String key = keyIt.getKey();
> Iterable itb = keyIt.getValue();
> Iterator it = itb.iterator();
> StringBuilder sb = new StringBuilder();
> sb.append(key).append(":[");
> while(it.hasNext()){
> sb.append(it.next()).append(",");
> }
> String str = sb.toString();
> str = str.substring(0,str.length() -1) + "]";
> System.out.println(str);
> String filePath = "/data/wyp/sparktest.txt";
> String line = "word-->["+key+"]total 
> count="+str+"--->time+"+c.timestamp().toString();
> System.out.println("writefile->"+line);
> FileUtil.write(filePath, line, true, true);
>  

[jira] [Commented] (BEAM-1272) Align the naming of "generateInitialSplits" and "splitIntoBundles" to better reflect their intention

2017-04-19 Thread Etienne Chauchot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1272?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15974232#comment-15974232
 ] 

Etienne Chauchot commented on BEAM-1272:


[~dhalp...@google.com], I opened this 
https://issues.apache.org/jira/browse/BEAM-2012 ticket as a reminder for 
cleaning/building dataflow worker as you suggested in slack.

> Align the naming of "generateInitialSplits" and "splitIntoBundles" to better 
> reflect their intention
> 
>
> Key: BEAM-1272
> URL: https://issues.apache.org/jira/browse/BEAM-1272
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core
>Reporter: Stas Levin
>Assignee: Etienne Chauchot
>Priority: Minor
> Fix For: First stable release
>
>
> See [dev list 
> thread|https://lists.apache.org/thread.html/ac5717566707153e85da880cc75c8d047e1c6606861777670bb9107c@%3Cdev.beam.apache.org%3E].



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (BEAM-2012) Rebuild the Dataflow worker and remove the BoundedSource.splitIntoBundles deprecated method

2017-04-19 Thread Etienne Chauchot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2012?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Etienne Chauchot updated BEAM-2012:
---
Summary: Rebuild the Dataflow worker and remove the 
BoundedSource.splitIntoBundles deprecated method  (was: Rebuild the Dataflow 
worker and remove the {{BoundedSource.splitIntoBundles}} deprecated method)

> Rebuild the Dataflow worker and remove the BoundedSource.splitIntoBundles 
> deprecated method
> ---
>
> Key: BEAM-2012
> URL: https://issues.apache.org/jira/browse/BEAM-2012
> Project: Beam
>  Issue Type: Task
>  Components: runner-dataflow
>Reporter: Etienne Chauchot
>Assignee: Eugene Kirpichov
>
> Linked to https://issues.apache.org/jira/browse/BEAM-1272
> Just a reminder for building/cleaning as suggested by [~dhalp...@google.com]



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Jenkins build is still unstable: beam_PostCommit_Java_ValidatesRunner_Dataflow #2877

2017-04-19 Thread Apache Jenkins Server
See 




[jira] [Created] (BEAM-2012) Rebuild the Dataflow worker and remove the {{BoundedSource.splitIntoBundles}} deprecated method

2017-04-19 Thread Etienne Chauchot (JIRA)
Etienne Chauchot created BEAM-2012:
--

 Summary: Rebuild the Dataflow worker and remove the 
{{BoundedSource.splitIntoBundles}} deprecated method
 Key: BEAM-2012
 URL: https://issues.apache.org/jira/browse/BEAM-2012
 Project: Beam
  Issue Type: Task
  Components: runner-dataflow
Reporter: Etienne Chauchot
Assignee: Eugene Kirpichov


Just a reminder for building/cleaning as suggested by [~dhalp...@google.com]



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Jenkins build is still unstable: beam_PostCommit_Java_ValidatesRunner_Dataflow #2876

2017-04-19 Thread Apache Jenkins Server
See 




[jira] [Commented] (BEAM-2012) Rebuild the Dataflow worker and remove the {{BoundedSource.splitIntoBundles}} deprecated method

2017-04-19 Thread Etienne Chauchot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15974230#comment-15974230
 ] 

Etienne Chauchot commented on BEAM-2012:


[~jkff] I assigned this ticket to you as, in the PR,  you offered help to 
remove deprecated {{splitIntoBundles}} from Dataflow worker.

> Rebuild the Dataflow worker and remove the {{BoundedSource.splitIntoBundles}} 
> deprecated method
> ---
>
> Key: BEAM-2012
> URL: https://issues.apache.org/jira/browse/BEAM-2012
> Project: Beam
>  Issue Type: Task
>  Components: runner-dataflow
>Reporter: Etienne Chauchot
>Assignee: Eugene Kirpichov
>
> Linked to https://issues.apache.org/jira/browse/BEAM-1272
> Just a reminder for building/cleaning as suggested by [~dhalp...@google.com]



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (BEAM-2012) Rebuild the Dataflow worker and remove the {{BoundedSource.splitIntoBundles}} deprecated method

2017-04-19 Thread Etienne Chauchot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2012?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Etienne Chauchot updated BEAM-2012:
---
Description: 
Linked to https://issues.apache.org/jira/browse/BEAM-1272
Just a reminder for building/cleaning as suggested by [~dhalp...@google.com]

  was:Just a reminder for building/cleaning as suggested by 
[~dhalp...@google.com]


> Rebuild the Dataflow worker and remove the {{BoundedSource.splitIntoBundles}} 
> deprecated method
> ---
>
> Key: BEAM-2012
> URL: https://issues.apache.org/jira/browse/BEAM-2012
> Project: Beam
>  Issue Type: Task
>  Components: runner-dataflow
>Reporter: Etienne Chauchot
>Assignee: Eugene Kirpichov
>
> Linked to https://issues.apache.org/jira/browse/BEAM-1272
> Just a reminder for building/cleaning as suggested by [~dhalp...@google.com]



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Jenkins build is back to normal : beam_PostCommit_Java_MavenInstall #3366

2017-04-19 Thread Apache Jenkins Server
See 




Build failed in Jenkins: beam_PerformanceTests_Dataflow #326

2017-04-19 Thread Apache Jenkins Server
See 


Changes:

[dhalperi] Delete deprecated AttemptAndTimeBoundedExponentialBackoff

[dhalperi] Remove deprecated method in IOChannelUtils

[dhalperi] Remove deprecated/unused code from Pipeline

[dhalperi] Delete AttemptBoundedExponentialBackoff

[dhalperi] Delete IntervalBoundedExponentialBackoff

[dhalperi] Delete AppEngineEnvironment

[dhalperi] Upgrade worker to not depend on deprecated now deleted code

[dhalperi] Changed snappy version to 1.1.4-M3

[kirpichov] Separates side input test and side output test

[kirpichov] ProcessFn remembers more info about its application context

[kirpichov] Minor cleanups in ParDoEvaluator

[kirpichov] Extracts interface from PushbackSideInputDoFnRunner

[kirpichov] Creates ProcessFnRunner and wires it through ParDoEvaluator

[kirpichov] Explodes windows before GBKIKWI

[kirpichov] Use tableRefFunction throughout BigQueryIO. Constant table writes 
use

[kirpichov] Add PrepareWrite transform.

[kirpichov] Refactor streaming write branch into separate reusable components.

[kirpichov] Refactor batch load job path, and add support for data-dependent 
tables.

[kirpichov] Refactor batch loads, and add support for windowed writes.

[kirpichov] Fix tests to properly fake out BigQueryService, and add tests for

[kirpichov] Separate streaming writes into two pluggable components - 
CreateTables,

--
[...truncated 273.30 KB...]
 * [new ref] refs/pull/2540/head -> origin/pr/2540/head
 * [new ref] refs/pull/2540/merge -> origin/pr/2540/merge
 * [new ref] refs/pull/2541/head -> origin/pr/2541/head
 * [new ref] refs/pull/2541/merge -> origin/pr/2541/merge
 * [new ref] refs/pull/2542/head -> origin/pr/2542/head
 * [new ref] refs/pull/2542/merge -> origin/pr/2542/merge
 * [new ref] refs/pull/2543/head -> origin/pr/2543/head
 * [new ref] refs/pull/2543/merge -> origin/pr/2543/merge
 * [new ref] refs/pull/2544/head -> origin/pr/2544/head
 * [new ref] refs/pull/2544/merge -> origin/pr/2544/merge
 * [new ref] refs/pull/2545/head -> origin/pr/2545/head
 * [new ref] refs/pull/2545/merge -> origin/pr/2545/merge
 * [new ref] refs/pull/2546/head -> origin/pr/2546/head
 * [new ref] refs/pull/2546/merge -> origin/pr/2546/merge
 * [new ref] refs/pull/2547/head -> origin/pr/2547/head
 * [new ref] refs/pull/2547/merge -> origin/pr/2547/merge
 * [new ref] refs/pull/2548/head -> origin/pr/2548/head
 * [new ref] refs/pull/2548/merge -> origin/pr/2548/merge
error: unable to resolve reference refs/remotes/origin/pr/2549/merge: No such 
file or directory
 ! 3490028...04ca87a refs/pull/2549/merge -> origin/pr/2549/merge  (unable to 
update local ref)
 * [new ref] refs/pull/2550/head -> origin/pr/2550/head
 * [new ref] refs/pull/2550/merge -> origin/pr/2550/merge
 * [new ref] refs/pull/2551/head -> origin/pr/2551/head
 * [new ref] refs/pull/2551/merge -> origin/pr/2551/merge
 * [new ref] refs/pull/2552/head -> origin/pr/2552/head
 * [new ref] refs/pull/2552/merge -> origin/pr/2552/merge
 * [new ref] refs/pull/2553/head -> origin/pr/2553/head
 * [new ref] refs/pull/2553/merge -> origin/pr/2553/merge
 * [new ref] refs/pull/2554/head -> origin/pr/2554/head
 * [new ref] refs/pull/2554/merge -> origin/pr/2554/merge
 * [new ref] refs/pull/2555/head -> origin/pr/2555/head
 * [new ref] refs/pull/2555/merge -> origin/pr/2555/merge
 * [new ref] refs/pull/2556/head -> origin/pr/2556/head
 * [new ref] refs/pull/2556/merge -> origin/pr/2556/merge
 * [new ref] refs/pull/2557/head -> origin/pr/2557/head
 * [new ref] refs/pull/2557/merge -> origin/pr/2557/merge
 * [new ref] refs/pull/2558/head -> origin/pr/2558/head
 * [new ref] refs/pull/2558/merge -> origin/pr/2558/merge
 * [new ref] refs/pull/2559/head -> origin/pr/2559/head
 * [new ref] refs/pull/2559/merge -> origin/pr/2559/merge
 * [new ref] refs/pull/2560/head -> origin/pr/2560/head
 * [new ref] refs/pull/2560/merge -> origin/pr/2560/merge
 * [new ref] refs/pull/2561/head -> origin/pr/2561/head
 * [new ref] refs/pull/2561/merge -> origin/pr/2561/merge
 * [new ref] refs/pull/2562/head -> origin/pr/2562/head
 * [new ref] refs/pull/2562/merge -> origin/pr/2562/merge
 * [new ref] refs/pull/2563/head -> origin/pr/2563/head
 * [new ref] refs/pull/2563/merge -> origin/pr/2563/merge
 * [new ref] refs/pull/2564/head -> origin/pr/2564/head
 * [new ref] refs/pull/2564/merge -> origin/pr/2564/merge
 * [new ref] refs/pull/2565/head -> origin/pr/2565/head
 * [new ref] refs/pull/2565/merge -> origin/pr/2565/merge
 * [new ref] 

Build failed in Jenkins: beam_PerformanceTests_JDBC #132

2017-04-19 Thread Apache Jenkins Server
See 


Changes:

[dhalperi] Delete deprecated AttemptAndTimeBoundedExponentialBackoff

[dhalperi] Remove deprecated method in IOChannelUtils

[dhalperi] Remove deprecated/unused code from Pipeline

[dhalperi] Delete AttemptBoundedExponentialBackoff

[dhalperi] Delete IntervalBoundedExponentialBackoff

[dhalperi] Delete AppEngineEnvironment

[dhalperi] Upgrade worker to not depend on deprecated now deleted code

[dhalperi] Changed snappy version to 1.1.4-M3

[kirpichov] Separates side input test and side output test

[kirpichov] ProcessFn remembers more info about its application context

[kirpichov] Minor cleanups in ParDoEvaluator

[kirpichov] Extracts interface from PushbackSideInputDoFnRunner

[kirpichov] Creates ProcessFnRunner and wires it through ParDoEvaluator

[kirpichov] Explodes windows before GBKIKWI

[kirpichov] Use tableRefFunction throughout BigQueryIO. Constant table writes 
use

[kirpichov] Add PrepareWrite transform.

[kirpichov] Refactor streaming write branch into separate reusable components.

[kirpichov] Refactor batch load job path, and add support for data-dependent 
tables.

[kirpichov] Refactor batch loads, and add support for windowed writes.

[kirpichov] Fix tests to properly fake out BigQueryService, and add tests for

[kirpichov] Separate streaming writes into two pluggable components - 
CreateTables,

--
[...truncated 837.89 KB...]
at 
com.google.cloud.dataflow.worker.runners.worker.MapTaskExecutorFactory$3.typedApply(MapTaskExecutorFactory.java:261)
at 
com.google.cloud.dataflow.worker.graph.Networks$TypeSafeNodeFunction.apply(Networks.java:55)
at 
com.google.cloud.dataflow.worker.graph.Networks$TypeSafeNodeFunction.apply(Networks.java:43)
at 
com.google.cloud.dataflow.worker.graph.Networks.replaceDirectedNetworkNodes(Networks.java:78)
at 
com.google.cloud.dataflow.worker.runners.worker.MapTaskExecutorFactory.create(MapTaskExecutorFactory.java:152)
at 
com.google.cloud.dataflow.worker.runners.worker.DataflowWorker.doWork(DataflowWorker.java:271)
at 
com.google.cloud.dataflow.worker.runners.worker.DataflowWorker.getAndPerformWork(DataflowWorker.java:243)
at 
com.google.cloud.dataflow.worker.runners.worker.DataflowBatchWorkerHarness$WorkerThread.doWork(DataflowBatchWorkerHarness.java:127)
at 
com.google.cloud.dataflow.worker.runners.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:107)
at 
com.google.cloud.dataflow.worker.runners.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:94)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
(871d2fc53b05fa5f): java.lang.RuntimeException: 
org.apache.beam.sdk.util.UserCodeException: org.postgresql.util.PSQLException: 
The connection attempt failed.
at 
com.google.cloud.dataflow.worker.runners.worker.MapTaskExecutorFactory$3.typedApply(MapTaskExecutorFactory.java:289)
at 
com.google.cloud.dataflow.worker.runners.worker.MapTaskExecutorFactory$3.typedApply(MapTaskExecutorFactory.java:261)
at 
com.google.cloud.dataflow.worker.graph.Networks$TypeSafeNodeFunction.apply(Networks.java:55)
at 
com.google.cloud.dataflow.worker.graph.Networks$TypeSafeNodeFunction.apply(Networks.java:43)
at 
com.google.cloud.dataflow.worker.graph.Networks.replaceDirectedNetworkNodes(Networks.java:78)
at 
com.google.cloud.dataflow.worker.runners.worker.MapTaskExecutorFactory.create(MapTaskExecutorFactory.java:152)
at 
com.google.cloud.dataflow.worker.runners.worker.DataflowWorker.doWork(DataflowWorker.java:271)
at 
com.google.cloud.dataflow.worker.runners.worker.DataflowWorker.getAndPerformWork(DataflowWorker.java:243)
at 
com.google.cloud.dataflow.worker.runners.worker.DataflowBatchWorkerHarness$WorkerThread.doWork(DataflowBatchWorkerHarness.java:127)
at 
com.google.cloud.dataflow.worker.runners.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:107)
at 
com.google.cloud.dataflow.worker.runners.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:94)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.beam.sdk.util.UserCodeException: 
org.postgresql.util.PSQLException: The connection attempt failed.
at 

[jira] [Resolved] (BEAM-1914) XML IO should comply with PTransform style guide

2017-04-19 Thread JIRA

 [ 
https://issues.apache.org/jira/browse/BEAM-1914?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jean-Baptiste Onofré resolved BEAM-1914.

   Resolution: Fixed
Fix Version/s: First stable release

> XML IO should comply with PTransform style guide
> 
>
> Key: BEAM-1914
> URL: https://issues.apache.org/jira/browse/BEAM-1914
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-core
>Reporter: Eugene Kirpichov
>Assignee: Eugene Kirpichov
>  Labels: backward-incompatible, starter
> Fix For: First stable release
>
>
> Currently we have XmlSource and XmlSink in the Java SDK. They violate the 
> PTransform style guide in several respects:
> - They should be grouped into an XmlIO class with read() and write() verbs, 
> like all the other similar connectors
> - The source/sink classes should be made private or package-local
> - Should get rid of XmlSink.Bound - XmlSink itself should inherit from 
> FileBasedSink
> - Could optionally benefit from AutoValue
> See e.g. the PR with BigQuery fixes https://github.com/apache/beam/pull/2149



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Jenkins build became unstable: beam_PostCommit_Java_ValidatesRunner_Spark #1699

2017-04-19 Thread Apache Jenkins Server
See 




[GitHub] beam-site pull request #216: Update flink runner page to use language-py ins...

2017-04-19 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/beam-site/pull/216


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[1/3] beam-site git commit: Update flink page ref from language-python to language-py and add toggle

2017-04-19 Thread aljoscha
Repository: beam-site
Updated Branches:
  refs/heads/asf-site 97641db9f -> 3ecf363c2


Update flink page ref from language-python to language-py and add toggle


Project: http://git-wip-us.apache.org/repos/asf/beam-site/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam-site/commit/f936a3dc
Tree: http://git-wip-us.apache.org/repos/asf/beam-site/tree/f936a3dc
Diff: http://git-wip-us.apache.org/repos/asf/beam-site/diff/f936a3dc

Branch: refs/heads/asf-site
Commit: f936a3dc1ac825d20b9db3a0bfbddd2b32b3da71
Parents: 97641db
Author: melissa 
Authored: Tue Apr 18 16:22:19 2017 -0700
Committer: Aljoscha Krettek 
Committed: Wed Apr 19 11:29:24 2017 +0200

--
 src/documentation/runners/flink.md | 15 ---
 1 file changed, 12 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam-site/blob/f936a3dc/src/documentation/runners/flink.md
--
diff --git a/src/documentation/runners/flink.md 
b/src/documentation/runners/flink.md
index ed52689..f844c6b 100644
--- a/src/documentation/runners/flink.md
+++ b/src/documentation/runners/flink.md
@@ -6,6 +6,14 @@ redirect_from: /learn/runners/flink/
 ---
 # Using the Apache Flink Runner
 
+
+  Adapt for:
+  
+Java SDK
+Python SDK
+  
+
+
 The Apache Flink Runner can be used to execute Beam pipelines using [Apache 
Flink](https://flink.apache.org). When using the Flink Runner you will create a 
jar file containing your job that can be executed on a regular Flink cluster. 
It's also possible to execute a Beam pipeline using Flink's local execution 
mode without setting up a cluster. This is helpful for development and 
debugging of your pipeline.
 
 The Flink Runner and Flink are suitable for large scale, continuous jobs, and 
provide:
@@ -38,8 +46,7 @@ For more information, the [Flink 
Documentation](https://ci.apache.org/projects/f
 
 ### Specify your dependency
 
-You must specify your dependency on the Flink Runner.
-
+When using Java, you must specify your dependency 
on the Flink Runner in your `pom.xml`.
 ```java
 
   org.apache.beam
@@ -49,6 +56,8 @@ You must specify your dependency on the Flink Runner.
 
 ```
 
+This section is not applicable to the Beam SDK for 
Python.
+
 ## Executing a pipeline on a Flink cluster
 
 For executing a pipeline on a Flink cluster you need to package your program 
along will all dependencies in a so-called fat jar. How you do this depends on 
your build system but if you follow along the [Beam Quickstart]({{ site.baseurl 
}}/get-started/quickstart/) this is the command that you have to run:
@@ -129,7 +138,7 @@ When executing your pipeline with the Flink Runner, you can 
set these pipeline o
 
 
 
-See the reference documentation for the  [FlinkPipelineOptions]({{ site.baseurl 
}}/documentation/sdks/javadoc/{{ site.release_latest 
}}/index.html?org/apache/beam/runners/flink/FlinkPipelineOptions.html)[PipelineOptions](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/utils/pipeline_options.py)
 interface (and its subinterfaces) for the complete list of pipeline 
configuration options.
+See the reference documentation for the  [FlinkPipelineOptions]({{ site.baseurl 
}}/documentation/sdks/javadoc/{{ site.release_latest 
}}/index.html?org/apache/beam/runners/flink/FlinkPipelineOptions.html)[PipelineOptions](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/utils/pipeline_options.py)
 interface (and its subinterfaces) for the complete list of pipeline 
configuration options.
 
 ## Additional information and caveats
 



[3/3] beam-site git commit: This closes #216

2017-04-19 Thread aljoscha
This closes #216


Project: http://git-wip-us.apache.org/repos/asf/beam-site/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam-site/commit/3ecf363c
Tree: http://git-wip-us.apache.org/repos/asf/beam-site/tree/3ecf363c
Diff: http://git-wip-us.apache.org/repos/asf/beam-site/diff/3ecf363c

Branch: refs/heads/asf-site
Commit: 3ecf363c2229dbbe918e00ad9acce34c45ee10e8
Parents: 97641db ab9c857
Author: Aljoscha Krettek 
Authored: Wed Apr 19 11:31:25 2017 +0200
Committer: Aljoscha Krettek 
Committed: Wed Apr 19 11:31:25 2017 +0200

--
 content/documentation/runners/flink/index.html | 15 ---
 src/documentation/runners/flink.md | 15 ---
 2 files changed, 24 insertions(+), 6 deletions(-)
--




[2/3] beam-site git commit: Regenerate website

2017-04-19 Thread aljoscha
Regenerate website


Project: http://git-wip-us.apache.org/repos/asf/beam-site/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam-site/commit/ab9c8578
Tree: http://git-wip-us.apache.org/repos/asf/beam-site/tree/ab9c8578
Diff: http://git-wip-us.apache.org/repos/asf/beam-site/diff/ab9c8578

Branch: refs/heads/asf-site
Commit: ab9c8578359f4c9fadc11fae251298e5b57645af
Parents: f936a3d
Author: Aljoscha Krettek 
Authored: Wed Apr 19 11:31:14 2017 +0200
Committer: Aljoscha Krettek 
Committed: Wed Apr 19 11:31:14 2017 +0200

--
 content/documentation/runners/flink/index.html | 15 ---
 1 file changed, 12 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam-site/blob/ab9c8578/content/documentation/runners/flink/index.html
--
diff --git a/content/documentation/runners/flink/index.html 
b/content/documentation/runners/flink/index.html
index b0b8fdc..64ae583 100644
--- a/content/documentation/runners/flink/index.html
+++ b/content/documentation/runners/flink/index.html
@@ -153,6 +153,14 @@
   
 Using the Apache Flink 
Runner
 
+
+  Adapt for:
+  
+Java SDK
+Python SDK
+  
+
+
 The Apache Flink Runner can be used to execute Beam pipelines using https://flink.apache.org;>Apache Flink. When using the Flink Runner 
you will create a jar file containing your job that can be executed on a 
regular Flink cluster. It’s also possible to execute a Beam pipeline using 
Flink’s local execution mode without setting up a cluster. This is helpful 
for development and debugging of your pipeline.
 
 The Flink Runner and Flink are suitable for large scale, continuous jobs, 
and provide:
@@ -187,8 +195,7 @@
 
 Specify your dependency
 
-You must specify your dependency on the Flink Runner.
-
+When using Java, you must specify your 
dependency on the Flink Runner in your pom.xml.
 dependency
   groupIdorg.apache.beam/groupId
   artifactIdbeam-runners-flink_2.10/artifactId
@@ -198,6 +205,8 @@
 
 
 
+This section is not applicable to the Beam SDK 
for Python.
+
 Executing a pipeline on a 
Flink cluster
 
 For executing a pipeline on a Flink cluster you need to package your 
program along will all dependencies in a so-called fat jar. How you do this 
depends on your build system but if you follow along the Beam Quickstart this is the command that 
you have to run:
@@ -278,7 +287,7 @@
 
 
 
-See the reference documentation for the  FlinkPipelineOptionshttps://github.com/apache/beam/blob/master/sdks/python/apache_beam/utils/pipeline_options.py;>PipelineOptions
 interface (and its subinterfaces) for the complete list of pipeline 
configuration options.
+See the reference documentation for the  FlinkPipelineOptionshttps://github.com/apache/beam/blob/master/sdks/python/apache_beam/utils/pipeline_options.py;>PipelineOptions
 interface (and its subinterfaces) for the complete list of pipeline 
configuration options.
 
 Additional information and 
caveats
 



[jira] [Commented] (BEAM-1914) XML IO should comply with PTransform style guide

2017-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15974343#comment-15974343
 ] 

ASF GitHub Bot commented on BEAM-1914:
--

Github user asfgit closed the pull request at:

https://github.com/apache/beam/pull/2558


> XML IO should comply with PTransform style guide
> 
>
> Key: BEAM-1914
> URL: https://issues.apache.org/jira/browse/BEAM-1914
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-core
>Reporter: Eugene Kirpichov
>Assignee: Eugene Kirpichov
>  Labels: backward-incompatible, starter
>
> Currently we have XmlSource and XmlSink in the Java SDK. They violate the 
> PTransform style guide in several respects:
> - They should be grouped into an XmlIO class with read() and write() verbs, 
> like all the other similar connectors
> - The source/sink classes should be made private or package-local
> - Should get rid of XmlSink.Bound - XmlSink itself should inherit from 
> FileBasedSink
> - Could optionally benefit from AutoValue
> See e.g. the PR with BigQuery fixes https://github.com/apache/beam/pull/2149



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[1/2] beam git commit: [BEAM-1914] XmlIO now complies with PTransform style guide

2017-04-19 Thread jbonofre
Repository: beam
Updated Branches:
  refs/heads/master 57929fb80 -> 470808c06


[BEAM-1914] XmlIO now complies with PTransform style guide


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d0c0a60c
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d0c0a60c
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d0c0a60c

Branch: refs/heads/master
Commit: d0c0a60c83a9d2a6caa29f91f89d8c0ee3b0eb93
Parents: 57929fb
Author: Eugene Kirpichov 
Authored: Mon Apr 17 16:25:42 2017 -0700
Committer: Jean-Baptiste Onofré 
Committed: Wed Apr 19 10:34:46 2017 +0200

--
 .../apache/beam/sdk/io/CompressedSource.java|   4 +-
 .../main/java/org/apache/beam/sdk/io/XmlIO.java | 477 +++
 .../java/org/apache/beam/sdk/io/XmlSink.java| 226 ++---
 .../java/org/apache/beam/sdk/io/XmlSource.java  | 191 +---
 .../sdk/transforms/display/DisplayData.java |   6 +
 .../org/apache/beam/sdk/io/XmlSinkTest.java |  89 ++--
 .../org/apache/beam/sdk/io/XmlSourceTest.java   | 248 ++
 .../sdk/transforms/display/DisplayDataTest.java |  17 +
 8 files changed, 740 insertions(+), 518 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/d0c0a60c/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java
--
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java
index ecd0fd9..1d940cb 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java
@@ -46,10 +46,10 @@ import 
org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
  * A Source that reads from compressed files. A {@code CompressedSources} 
wraps a delegate
  * {@link FileBasedSource} that is able to read the decompressed file format.
  *
- * For example, use the following to read from a gzip-compressed XML file:
+ * For example, use the following to read from a gzip-compressed file-based 
source:
  *
  *  {@code
- * XmlSource mySource = XmlSource.from(...);
+ * FileBasedSource mySource = ...;
  * PCollection collection = p.apply(Read.from(CompressedSource
  * .from(mySource)
  * .withDecompression(CompressedSource.CompressionMode.GZIP)));

http://git-wip-us.apache.org/repos/asf/beam/blob/d0c0a60c/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlIO.java
--
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlIO.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlIO.java
new file mode 100644
index 000..a53fb86
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlIO.java
@@ -0,0 +1,477 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.auto.value.AutoValue;
+import com.google.common.annotations.VisibleForTesting;
+import javax.annotation.Nullable;
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.JAXBException;
+import org.apache.beam.sdk.runners.PipelineRunner;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+
+/** Transforms for reading and writing XML files using JAXB mappers. */
+public class XmlIO {
+  // CHECKSTYLE.OFF: JavadocStyle
+  /**
+   * Reads XML files. This source reads one or more XML files and
+   * creates a {@link PCollection} of a given type. Please note the example 
given below.
+   *
+   * The XML file must be of the following form, where {@code root} and 
{@code record} are XML
+   * element names that are defined by the user:
+   *
+   * 

[2/2] beam git commit: [BEAM-1914] This closes #2558

2017-04-19 Thread jbonofre
[BEAM-1914] This closes #2558


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/470808c0
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/470808c0
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/470808c0

Branch: refs/heads/master
Commit: 470808c06fc10ad545712d6b1831530e3d5313ad
Parents: 57929fb d0c0a60
Author: Jean-Baptiste Onofré 
Authored: Wed Apr 19 10:58:42 2017 +0200
Committer: Jean-Baptiste Onofré 
Committed: Wed Apr 19 10:58:42 2017 +0200

--
 .../apache/beam/sdk/io/CompressedSource.java|   4 +-
 .../main/java/org/apache/beam/sdk/io/XmlIO.java | 477 +++
 .../java/org/apache/beam/sdk/io/XmlSink.java| 226 ++---
 .../java/org/apache/beam/sdk/io/XmlSource.java  | 191 +---
 .../sdk/transforms/display/DisplayData.java |   6 +
 .../org/apache/beam/sdk/io/XmlSinkTest.java |  89 ++--
 .../org/apache/beam/sdk/io/XmlSourceTest.java   | 248 ++
 .../sdk/transforms/display/DisplayDataTest.java |  17 +
 8 files changed, 740 insertions(+), 518 deletions(-)
--




[GitHub] beam pull request #2558: [BEAM-1914] XmlIO should comply with PTransform sty...

2017-04-19 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/beam/pull/2558


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


<    1   2   3   4