[jira] [Commented] (BEAM-3414) AfterProcessingTime trigger issue with Flink Runner

2018-03-20 Thread Dawid Wysakowicz (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-3414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16406119#comment-16406119
 ] 

Dawid Wysakowicz commented on BEAM-3414:


I think it might be the same problem as noticed in BEAM-3863

> AfterProcessingTime trigger issue with Flink Runner
> ---
>
> Key: BEAM-3414
> URL: https://issues.apache.org/jira/browse/BEAM-3414
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core, runner-flink
>Affects Versions: 2.2.0
> Environment: idea, ubuntu 16.04, FlinkRunner
>Reporter: huangjianhuang
>Assignee: Dawid Wysakowicz
>Priority: Major
>
> in my demo, I read data from kafka and count globally, finally output the 
> total count of recieved data, as follow:
> {code:java}
> FlinkPipelineOptions options = 
> PipelineOptionsFactory.fromArgs(args).withValidation()
> .as(FlinkPipelineOptions.class);
> options.setStreaming(true);
> options.setRunner(FlinkRunner.class);
> Pipeline pipeline = Pipeline.create(options);
> pipeline
> .apply("Read from kafka",
> KafkaIO.read()
> //.withTimestampFn(kafkaData -> 
> TimeUtil.timeMillisToInstant(kafkaData.getKey()))
> .withBootstrapServers("localhost:9092")
> .withTopic("recharge")
> .withKeyDeserializer(StringDeserializer.class)
> 
> .withValueDeserializer(StringDeserializer.class)
> .withoutMetadata()
> )
> .apply(Values.create())
> .apply(Window.into(new GlobalWindows())
> .triggering(Repeatedly.forever(
> 
> AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(5
> .accumulatingFiredPanes()
> )
> .apply(Count.globally())
> .apply("output",
> ParDo.of(new DoFn() {
> @ProcessElement
> public void process(ProcessContext context) {
> System.out.println("---get at: " + 
> Instant.now() + "--");
> System.out.println(context.element());
> }
> }));
> {code}
> the result should be displayed after (5s) I sent first data, but sometimes 
> there were nothing display after I sent data. the pic shows the outputs i got 
> in a test:
> (cant upload a pic, desc as text)
> {code:java}
> Send 681Msg at: 2018-01-05T06:34:31.436
>   ---get at: 2018-01-05T06:34:36.668Z--
>   681
> Send 681Msg at: 2018-01-05T06:34:47.166
>   ---get at: 2018-01-05T06:34:52.284Z--
>   1362
> Send 681Msg at: 2018-01-05T06:34:55.505
> Send 681Msg at: 2018-01-05T06:35:22.068
>   ---get at: 2018-01-05T06:35:22.112Z--
>   2044
> {code}
> btw, the code works fine with direct runner.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (BEAM-3414) AfterProcessingTime trigger issue with Flink Runner

2018-03-19 Thread Dawid Wysakowicz (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3414?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dawid Wysakowicz reassigned BEAM-3414:
--

Assignee: Dawid Wysakowicz  (was: Aljoscha Krettek)

> AfterProcessingTime trigger issue with Flink Runner
> ---
>
> Key: BEAM-3414
> URL: https://issues.apache.org/jira/browse/BEAM-3414
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core, runner-flink
>Affects Versions: 2.2.0
> Environment: idea, ubuntu 16.04, FlinkRunner
>Reporter: huangjianhuang
>Assignee: Dawid Wysakowicz
>Priority: Major
>
> in my demo, I read data from kafka and count globally, finally output the 
> total count of recieved data, as follow:
> {code:java}
> FlinkPipelineOptions options = 
> PipelineOptionsFactory.fromArgs(args).withValidation()
> .as(FlinkPipelineOptions.class);
> options.setStreaming(true);
> options.setRunner(FlinkRunner.class);
> Pipeline pipeline = Pipeline.create(options);
> pipeline
> .apply("Read from kafka",
> KafkaIO.read()
> //.withTimestampFn(kafkaData -> 
> TimeUtil.timeMillisToInstant(kafkaData.getKey()))
> .withBootstrapServers("localhost:9092")
> .withTopic("recharge")
> .withKeyDeserializer(StringDeserializer.class)
> 
> .withValueDeserializer(StringDeserializer.class)
> .withoutMetadata()
> )
> .apply(Values.create())
> .apply(Window.into(new GlobalWindows())
> .triggering(Repeatedly.forever(
> 
> AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(5
> .accumulatingFiredPanes()
> )
> .apply(Count.globally())
> .apply("output",
> ParDo.of(new DoFn() {
> @ProcessElement
> public void process(ProcessContext context) {
> System.out.println("---get at: " + 
> Instant.now() + "--");
> System.out.println(context.element());
> }
> }));
> {code}
> the result should be displayed after (5s) I sent first data, but sometimes 
> there were nothing display after I sent data. the pic shows the outputs i got 
> in a test:
> (cant upload a pic, desc as text)
> {code:java}
> Send 681Msg at: 2018-01-05T06:34:31.436
>   ---get at: 2018-01-05T06:34:36.668Z--
>   681
> Send 681Msg at: 2018-01-05T06:34:47.166
>   ---get at: 2018-01-05T06:34:52.284Z--
>   1362
> Send 681Msg at: 2018-01-05T06:34:55.505
> Send 681Msg at: 2018-01-05T06:35:22.068
>   ---get at: 2018-01-05T06:35:22.112Z--
>   2044
> {code}
> btw, the code works fine with direct runner.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-3494) Snapshot state of aggregated data of apache beam project is not maintained in flink's checkpointing

2018-03-16 Thread Dawid Wysakowicz (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-3494?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16401919#comment-16401919
 ] 

Dawid Wysakowicz commented on BEAM-3494:


[~suganyap] Could you elaborate a bit more how you enable checkpointing? Do I 
understand correctly that you pass {{checkpointingInterval}} as cli parameter 
and {{state.backend}} is set in flink-conf.yaml? 

Also could you tell more what does it mean that the aggregated data is not 
restored? Best with some simple example?

> Snapshot state of aggregated data of apache beam project is not maintained in 
> flink's checkpointing 
> 
>
> Key: BEAM-3494
> URL: https://issues.apache.org/jira/browse/BEAM-3494
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: suganya
>Priority: Major
>
> We have a beam project which consumes events from kafka,does a groupby in a 
> time window(5 mins),after window elapses it pushes the events to downstream 
> for merge.This project is deployed using flink ,we have enabled checkpointing 
> to recover from failed state.
> (windowsize: 5mins , checkpointingInterval: 5mins,state.backend: filesystem)
> Offsets from kafka get checkpointed every 5 
> mins(checkpointingInterval).Before finishing the entire DAG(groupBy and 
> merge) , events offsets are getting checkpointed.So incase of any restart 
> from task-manager ,new task gets started from last successful checkpoint ,but 
> we could'nt able to get the aggregated snapshot data(data from groupBy task) 
> from the persisted checkpoint.
> Able to retrieve the last successful checkpointed offset from kafka ,but 
> couldnt able to get last aggregated data till checkpointing.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-3225) Non deterministic behaviour of AfterProcessingTime trigger with multiple group by transformations

2018-03-16 Thread Dawid Wysakowicz (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-3225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16401800#comment-16401800
 ] 

Dawid Wysakowicz commented on BEAM-3225:


Hi, I've tried to reproduce the _"allowed lateness configuration dictates that 
only non empty panes should be trigger!!!"_ but couldn't do it by any means. 
Also could not find any bug or issue in flink runner that could be reason for 
such behaviour. [~pawelbartoszek] are you able to provide some reliable way to 
reproduce the problem?

> Non deterministic behaviour of AfterProcessingTime trigger with multiple 
> group by transformations
> -
>
> Key: BEAM-3225
> URL: https://issues.apache.org/jira/browse/BEAM-3225
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core, runner-flink
>Affects Versions: 2.0.0, 2.1.0
>Reporter: Pawel Bartoszek
>Assignee: Aljoscha Krettek
>Priority: Critical
>
> *Context*
> I run my [test 
> code|https://gist.github.com/pbartoszek/fe6b1d7501333a2a4b385a1424f6bd80] 
> against different triggers and runners. My original problem was that when 
> writing to a file sink files weren't always produced in a deterministic way. 
> Please refer to this 
> [BEAM-3151|https://issues.apache.org/jira/browse/BEAM-3151] When I started 
> looking at WriteFiles class I noticed that file sink implementation includes 
> some multiple GroupByKey transformations. Knowing that I wrote my test code 
> that is using multiple GroupByKey transformations to conclude that this is a 
> bit buggy(?) support of After(Synchronised)ProcessingTime triggers by 
> GroupByKey that also influence the file sink. When I run my job using 
> Dataflow runner I was getting expected output.
> *About test code*
> The job is counting how many A and B elements it received within 30 sec 
> windows using Count.perElement. Then I am using GroupByKey to fire every time 
> count has increased.
> Below I outlined the expected standard output: 
> {code:java}
> Let's assume all events are received in the same 30 seconds window.
> A -> After count KV{A, 1} -> Final group by KV{A, [1]}
> A -> After count KV{A, 2} -> Final group by KV{A, [1,2]}
> A -> After count KV{A, 3} -> Final group by KV{A, [1,2,3]}
> B -> After count KV{B, 1} -> Final group by KV{B, [1]}
> With my trigger configuration I would expect that for every new element 
> 'After count' is printed with new value followed by 'Final group by' with new 
> counter. Final group by represents the history of counters then.{code}
>  
> *Problem*
> 'Final group by' trigger doesn't always go off although trigger set up would 
> suggest that. This behaviour is different for different runners and Beam 
> versions. 
>  
> *My observations when using Pubsub*
> Trigger configuration
> {code:java}
> Window.into(FixedWindows.of(standardSeconds(30)))
> 
> .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane())) 
>  
> .withAllowedLateness(standardSeconds(5), 
> Window.ClosingBehavior.FIRE_IF_NON_EMPTY)
> .accumulatingFiredPanes())
> {code}
>  
>  Beam 2.0 Flink Runner
> {code:java}
> 2017-11-16T14:51:44.294Z After count KV{A, 1} 
> [2017-11-16T14:51:30.000Z..2017-11-16T14:52:00.000Z)
> 2017-11-16T14:51:53.036Z Received Element A 
> [2017-11-16T14:51:30.000Z..2017-11-16T14:52:00.000Z)
> 2017-11-16T14:51:53.143Z After count KV{A, 2} 
> [2017-11-16T14:51:30.000Z..2017-11-16T14:52:00.000Z)
> 2017-11-16T14:51:53.246Z Final group by KV{A, [1, 2]} 
> [2017-11-16T14:51:30.000Z..2017-11-16T14:52:00.000Z)
> 2017-11-16T14:52:03.522Z Received Element A 
> [2017-11-16T14:52:00.000Z..2017-11-16T14:52:30.000Z)
> 2017-11-16T14:52:03.629Z After count KV{A, 1} 
> [2017-11-16T14:52:00.000Z..2017-11-16T14:52:30.000Z)
> 2017-11-16T14:52:03.732Z Final group by KV{A, [1]} 
> [2017-11-16T14:52:00.000Z..2017-11-16T14:52:30.000Z)
> 2017-11-16T14:52:07.270Z Received Element A 
> [2017-11-16T14:52:00.000Z..2017-11-16T14:52:30.000Z)
> 2017-11-16T14:52:07.372Z After count KV{A, 2} 
> [2017-11-16T14:52:00.000Z..2017-11-16T14:52:30.000Z)
> 2017-11-16T14:52:07.476Z Final group by KV{A, [1, 2]} 
> [2017-11-16T14:52:00.000Z..2017-11-16T14:52:30.000Z)
> 2017-11-16T14:52:10.394Z Received Element A 
> [2017-11-16T14:52:00.000Z..2017-11-16T14:52:30.000Z)
> 2017-11-16T14:52:10.501Z After count KV{A, 3} 
> [2017-11-16T14:52:00.000Z..2017-11-16T14:52:30.000Z)
> 2017-11-16T14:52:10.602Z Final group by KV{A, [1, 2, 3]} 
> [2017-11-16T14:52:00.000Z..2017-11-16T14:52:30.000Z)
> 2017-11-16T14:52:13.296Z Received Element A 
> [2017-11-16T14:52:00.000Z..2017-11-16T14:52:30.000Z)
> 2017-11-16T14:52:13.402Z After count KV{A, 4} 
> [2017-11-16T14:52:00.000Z..2017-11-16T14:52:30.000Z)
> 

[jira] [Updated] (BEAM-3703) java.io.IOException: KafkaWriter : failed to send 1 records (since last report)

2018-03-15 Thread Dawid Wysakowicz (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3703?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dawid Wysakowicz updated BEAM-3703:
---
Description: 
I am trying to read from file and write to Kafka in google cloud kafka and 
getting following error:

 
{code}
org.apache.beam.sdk.util.UserCodeException: java.io.IOException: KafkaWriter : 
failed to send 1 records (since last report)
   at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)
   at 
org.apache.beam.sdk.io.kafka.KafkaIO$KafkaWriter$DoFnInvoker.invokeProcessElement(Unknown
 Source)
   at 
org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:177)
   at 
org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:141)
   at 
org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:65)
   at 
org.apache.beam.runners.flink.translation.functions.FlinkDoFnFunction.mapPartition(FlinkDoFnFunction.java:120)
   at 
org.apache.flink.runtime.operators.MapPartitionDriver.run(MapPartitionDriver.java:103)
   at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490)
   at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
   at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: KafkaWriter : failed to send 1 records (since 
last report)
   at 
org.apache.beam.sdk.io.kafka.KafkaIO$KafkaWriter.checkForFailures(KafkaIO.java:1639)
   at 
org.apache.beam.sdk.io.kafka.KafkaIO$KafkaWriter.processElement(KafkaIO.java:1581)
Caused by: org.apache.kafka.common.errors.TimeoutException: Failed to update 
metadata after 6 ms.
{code}

 {code}
.apply(KafkaIO._write_()
.withBootstrapServers("ip1:9092,ip2:9092")
.withTopic("feed")
.withValueSerializer(StringSerializer.class)
.withKeySerializer(StringSerializer.class)
        
//.updateProducerProperties(ImmutableMap.of("security.protocol","PLAINTEXT"))
        //.updateProducerProperties(ImmutableMap.of("sasl.mechanism","PLAIN"))
.values() // writes values to Kafka with default key
{code}

 
Kafka is running on google cloud bitnami and I am using Flink runner

How do I pass security information to Kafka IO?

  was:
I am trying to read from file and write to Kafka in google cloud kafka and 
getting following error:

 

org.apache.beam.sdk.util.UserCodeException: java.io.IOException: KafkaWriter : 
failed to send 1 records (since last report)

at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)

at 
org.apache.beam.sdk.io.kafka.KafkaIO$KafkaWriter$DoFnInvoker.invokeProcessElement(Unknown
 Source)

at 
org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:177)

at 
org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:141)

at 
org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:65)

at 
org.apache.beam.runners.flink.translation.functions.FlinkDoFnFunction.mapPartition(FlinkDoFnFunction.java:120)

at 
org.apache.flink.runtime.operators.MapPartitionDriver.run(MapPartitionDriver.java:103)

at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490)

at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)

at java.lang.Thread.run(Thread.java:748)

Caused by: java.io.IOException: KafkaWriter : failed to send 1 records (since 
last report)

at 
org.apache.beam.sdk.io.kafka.KafkaIO$KafkaWriter.checkForFailures(KafkaIO.java:1639)

at 
org.apache.beam.sdk.io.kafka.KafkaIO$KafkaWriter.processElement(KafkaIO.java:1581)

Caused by: org.apache.kafka.common.errors.TimeoutException: Failed to update 
metadata after 6 ms.

 

 

 

.apply(KafkaIO._write_()

.withBootstrapServers("ip1:9092,ip2:9092")

.withTopic("feed")

.withValueSerializer(StringSerializer.class)

.withKeySerializer(StringSerializer.class)

        
//.updateProducerProperties(ImmutableMap.of("security.protocol","PLAINTEXT"))

        //.updateProducerProperties(ImmutableMap.of("sasl.mechanism","PLAIN"))

 

.values() // writes values to Kafka with default key

 

Kafka is running on google cloud bitnami and I am using Flink runner

How do I pass security information to Kafka IO?


> java.io.IOException: KafkaWriter : failed to send 1 records (since last 
> report)
> ---
>
> Key: BEAM-3703
> URL: https://issues.apache.org/jira/browse/BEAM-3703
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-kafka, runner-flink
>Affects Versions: 2.2.0
>Reporter: jagdeep sihota
>Assignee: Raghu Angadi
>

[jira] [Commented] (BEAM-3281) PTransform name not being propagated to the Flink Web UI

2018-03-15 Thread Dawid Wysakowicz (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-3281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16400476#comment-16400476
 ] 

Dawid Wysakowicz commented on BEAM-3281:


Fixed in [BEAM-3043]

> PTransform name not being propagated to the Flink Web UI
> 
>
> Key: BEAM-3281
> URL: https://issues.apache.org/jira/browse/BEAM-3281
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Affects Versions: 2.1.0
>Reporter: Thalita Vergilio
>Assignee: Aljoscha Krettek
>Priority: Minor
>  Labels: flink
> Fix For: 2.5.0
>
> Attachments: flink-dashboard.PNG
>
>
> This could be related to BEAM-1107, which was logged for Flink Batch 
> processing.
> I am experiencing a similar issue for stream processing. I would have 
> expected the name passed to 
> {code:java}
> pipeline.apply(String name, PTransform root)
> {code}
>  to be propagated to the Flink Web UI. 
> The documentation seems to suggest that this was the intended functionality: 
> https://beam.apache.org/documentation/sdks/javadoc/2.0.0/org/apache/beam/sdk/Pipeline.html#apply-java.lang.String-org.apache.beam.sdk.transforms.PTransform-
> Here is some sample code setting the name: 
> {code:java}
> p.apply("Apply Windowing Function", 
> Window.into(FixedWindows.of(Duration.standardSeconds(10
> .apply("Transform the Pipeline to Key by Window",
> ParDo.of(
> new DoFn, 
> KV>>() {
> @ProcessElement
> public void processElement(ProcessContext 
> context, IntervalWindow window) {
> context.output(KV.of(window, 
> context.element()));
> }
> }))
> .apply("Group by Key (window)", GroupByKey.create())
> .apply("Calculate PUE", ParDo.of(new PueCalculatorFn()))
> .apply("Write output to Kafka", 
> KafkaIO.write()
> .withBootstrapServers(KAFKA_IP + ":" + KAFKA_PORT)
> .withTopic("results")
> 
> .withKeySerializer(IntervalWindowResultSerialiser.class)
> .withValueSerializer(PueResultSerialiser.class)
> );
> {code}
> I will upload a screenshot of the results.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (BEAM-3281) PTransform name not being propagated to the Flink Web UI

2018-03-15 Thread Dawid Wysakowicz (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3281?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dawid Wysakowicz resolved BEAM-3281.

   Resolution: Duplicate
Fix Version/s: 2.5.0

> PTransform name not being propagated to the Flink Web UI
> 
>
> Key: BEAM-3281
> URL: https://issues.apache.org/jira/browse/BEAM-3281
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Affects Versions: 2.1.0
>Reporter: Thalita Vergilio
>Assignee: Aljoscha Krettek
>Priority: Minor
>  Labels: flink
> Fix For: 2.5.0
>
> Attachments: flink-dashboard.PNG
>
>
> This could be related to BEAM-1107, which was logged for Flink Batch 
> processing.
> I am experiencing a similar issue for stream processing. I would have 
> expected the name passed to 
> {code:java}
> pipeline.apply(String name, PTransform root)
> {code}
>  to be propagated to the Flink Web UI. 
> The documentation seems to suggest that this was the intended functionality: 
> https://beam.apache.org/documentation/sdks/javadoc/2.0.0/org/apache/beam/sdk/Pipeline.html#apply-java.lang.String-org.apache.beam.sdk.transforms.PTransform-
> Here is some sample code setting the name: 
> {code:java}
> p.apply("Apply Windowing Function", 
> Window.into(FixedWindows.of(Duration.standardSeconds(10
> .apply("Transform the Pipeline to Key by Window",
> ParDo.of(
> new DoFn, 
> KV>>() {
> @ProcessElement
> public void processElement(ProcessContext 
> context, IntervalWindow window) {
> context.output(KV.of(window, 
> context.element()));
> }
> }))
> .apply("Group by Key (window)", GroupByKey.create())
> .apply("Calculate PUE", ParDo.of(new PueCalculatorFn()))
> .apply("Write output to Kafka", 
> KafkaIO.write()
> .withBootstrapServers(KAFKA_IP + ":" + KAFKA_PORT)
> .withTopic("results")
> 
> .withKeySerializer(IntervalWindowResultSerialiser.class)
> .withValueSerializer(PueResultSerialiser.class)
> );
> {code}
> I will upload a screenshot of the results.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (BEAM-1107) Display user names for steps in the Flink Web UI

2018-03-15 Thread Dawid Wysakowicz (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1107?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dawid Wysakowicz updated BEAM-1107:
---
Fix Version/s: (was: Not applicable)
   2.5.0

> Display user names for steps in the Flink Web UI
> 
>
> Key: BEAM-1107
> URL: https://issues.apache.org/jira/browse/BEAM-1107
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-flink
>Reporter: Daniel Halperin
>Assignee: Aljoscha Krettek
>Priority: Major
> Fix For: 2.5.0
>
> Attachments: screenshot-1.png
>
>
> [copying in-person / email discussion at Strata Singapore to JIRA]
> The FlinkBatchTransformTranslators use transform.getName() [1] -- this is the 
> "SDK name" for the transform.
> The "user name" for the transform is not available here, it is in fact on the 
> TransformHierarchy.Node as node.getFullName() [2].
> getFullName() is used some in Flink, but not when setting step names.
> I drafted a quick commit that sort of propagates the user names to the web UI 
> (but only for DataSource, and still too verbose: 
> https://github.com/dhalperi/incubator-beam/commit/a2f1fb06b22a85ec738e4f2a604c9a129891916c)
> Before this change, the "ReadLines" step showed up as: "DataSource (at 
> Read(CompressedSource) 
> (org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat))"
> With this change, it shows up as "DataSource (at ReadLines/Read 
> (org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat))"
> which I think is closer. [I'd still like it to JUST be "ReadLines/Read" e.g.].
> Thoughts?
> [1] 
> https://github.com/apache/incubator-beam/blob/master/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java#L129
> [2] 
> https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java#L252



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (BEAM-1107) Display user names for steps in the Flink Web UI

2018-03-15 Thread Dawid Wysakowicz (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1107?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dawid Wysakowicz resolved BEAM-1107.

   Resolution: Duplicate
Fix Version/s: Not applicable

> Display user names for steps in the Flink Web UI
> 
>
> Key: BEAM-1107
> URL: https://issues.apache.org/jira/browse/BEAM-1107
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-flink
>Reporter: Daniel Halperin
>Assignee: Aljoscha Krettek
>Priority: Major
> Fix For: Not applicable
>
> Attachments: screenshot-1.png
>
>
> [copying in-person / email discussion at Strata Singapore to JIRA]
> The FlinkBatchTransformTranslators use transform.getName() [1] -- this is the 
> "SDK name" for the transform.
> The "user name" for the transform is not available here, it is in fact on the 
> TransformHierarchy.Node as node.getFullName() [2].
> getFullName() is used some in Flink, but not when setting step names.
> I drafted a quick commit that sort of propagates the user names to the web UI 
> (but only for DataSource, and still too verbose: 
> https://github.com/dhalperi/incubator-beam/commit/a2f1fb06b22a85ec738e4f2a604c9a129891916c)
> Before this change, the "ReadLines" step showed up as: "DataSource (at 
> Read(CompressedSource) 
> (org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat))"
> With this change, it shows up as "DataSource (at ReadLines/Read 
> (org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat))"
> which I think is closer. [I'd still like it to JUST be "ReadLines/Read" e.g.].
> Thoughts?
> [1] 
> https://github.com/apache/incubator-beam/blob/master/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java#L129
> [2] 
> https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java#L252



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-1107) Display user names for steps in the Flink Web UI

2018-03-15 Thread Dawid Wysakowicz (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1107?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16400471#comment-16400471
 ] 

Dawid Wysakowicz commented on BEAM-1107:


Fixed in [BEAM-3043]

> Display user names for steps in the Flink Web UI
> 
>
> Key: BEAM-1107
> URL: https://issues.apache.org/jira/browse/BEAM-1107
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-flink
>Reporter: Daniel Halperin
>Assignee: Aljoscha Krettek
>Priority: Major
> Attachments: screenshot-1.png
>
>
> [copying in-person / email discussion at Strata Singapore to JIRA]
> The FlinkBatchTransformTranslators use transform.getName() [1] -- this is the 
> "SDK name" for the transform.
> The "user name" for the transform is not available here, it is in fact on the 
> TransformHierarchy.Node as node.getFullName() [2].
> getFullName() is used some in Flink, but not when setting step names.
> I drafted a quick commit that sort of propagates the user names to the web UI 
> (but only for DataSource, and still too verbose: 
> https://github.com/dhalperi/incubator-beam/commit/a2f1fb06b22a85ec738e4f2a604c9a129891916c)
> Before this change, the "ReadLines" step showed up as: "DataSource (at 
> Read(CompressedSource) 
> (org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat))"
> With this change, it shows up as "DataSource (at ReadLines/Read 
> (org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat))"
> which I think is closer. [I'd still like it to JUST be "ReadLines/Read" e.g.].
> Thoughts?
> [1] 
> https://github.com/apache/incubator-beam/blob/master/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java#L129
> [2] 
> https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java#L252



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (BEAM-3359) Unable to change "flinkMaster" from "[auto]" in TestFlinkRunner

2018-02-28 Thread Dawid Wysakowicz (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3359?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dawid Wysakowicz reassigned BEAM-3359:
--

Assignee: Dawid Wysakowicz

> Unable to change "flinkMaster" from "[auto]" in TestFlinkRunner
> ---
>
> Key: BEAM-3359
> URL: https://issues.apache.org/jira/browse/BEAM-3359
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: Łukasz Gajowy
>Assignee: Dawid Wysakowicz
>Priority: Minor
>
> In TestFlinkRunner's constructor there is a line like this:
> {{options.setFlinkMaster("\[auto\]");}}
> which basically ignores any "flinkMaster" provided earlier (eg. using command 
> line) leading to  errors that are hard to find (for example wondering: "i 
> provided good url in pipeline options... why is it not connecting to my 
> cluster?). 
> Setting a {{@Default.String("\[auto\]")}} in FlinkPipelineOptions could be 
> one solution I guess. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (BEAM-2873) Detect number of shards for file sink in Flink Streaming Runner

2018-02-27 Thread Dawid Wysakowicz (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2873?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dawid Wysakowicz reassigned BEAM-2873:
--

Assignee: Dawid Wysakowicz

> Detect number of shards for file sink in Flink Streaming Runner
> ---
>
> Key: BEAM-2873
> URL: https://issues.apache.org/jira/browse/BEAM-2873
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-flink
>Reporter: Aljoscha Krettek
>Assignee: Dawid Wysakowicz
>Priority: Major
>
> [~reuvenlax] mentioned that this is done for the Dataflow Runner and the 
> default behaviour on Flink can be somewhat surprising for users.
> ML entry: https://www.mail-archive.com/dev@beam.apache.org/msg02665.html:
> This is how the file sink has always worked in Beam. If no sharding is 
> specified, then this means runner-determined sharding, and by default that is 
> one file per bundle. If Flink has small bundles, then I suggest using the 
> withNumShards method to explicitly pick the number of output shards.
> The Flink runner can detect that runner-determined sharding has been chosen, 
> and override it with a specific number of shards. For example, the Dataflow 
> streaming runner (which as you mentioned also has small bundles) detects this 
> case and sets the number of out files shards based on the number of workers 
> in the worker pool 
> [Here|https://github.com/apache/beam/blob/9e6530adb00669b7cf0f01cb8b128be0a21fd721/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java#L354]
>  is the code that does this; it should be quite simple to do something 
> similar for Flink, and then there will be no need for users to explicitly 
> call withNumShards themselves.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-2995) can't read/write hdfs in Flink CLUSTER(Standalone)

2018-02-04 Thread Dawid Wysakowicz (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2995?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16352114#comment-16352114
 ] 

Dawid Wysakowicz commented on BEAM-2995:


[~huangjianhuang] I could not reproduce your exact problem, but it does not 
connect to hdfs in my case if I do not add proper transformer, as explained in 
BEAM-2457.
{code:java}

  
org.apache.maven.plugins
maven-shade-plugin

  false
  

  *:*
  
META-INF/*.SF
META-INF/*.DSA
META-INF/*.RSA
  

  


  
package

  shade


  true
  shaded
  

  

  

  
{code}
 

 

> can't read/write hdfs in Flink CLUSTER(Standalone)
> --
>
> Key: BEAM-2995
> URL: https://issues.apache.org/jira/browse/BEAM-2995
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Affects Versions: 2.2.0
>Reporter: huangjianhuang
>Assignee: Dawid Wysakowicz
>Priority: Major
>
> i just write a simple demo like:
> {code:java}
> Configuration conf = new Configuration();
> conf.set("fs.default.name", "hdfs://localhost:9000");
> //other codes
> p.apply("ReadLines", 
> TextIO.read().from("hdfs://localhost:9000/tmp/words"))
> 
> .apply(TextIO.write().to("hdfs://localhost:9000/tmp/hdfsout"));
> {code}
> it works in flink local model with cmd:
> {code:java}
> mvn exec:java -Dexec.mainClass=com.joe.FlinkWithHDFS -Pflink-runner 
> -Dexec.args="--runner=FlinkRunner 
> --filesToStage=target/flinkBeam-2.2.0-SNAPSHOT-shaded.jar"
> {code}
> but not works in CLUSTER mode:
> {code:java}
> mvn exec:java -Dexec.mainClass=com.joe.FlinkWithHDFS -Pflink-runner 
> -Dexec.args="--runner=FlinkRunner 
> --filesToStage=target/flinkBeam-2.2.0-SNAPSHOT-shaded.jar 
> --flinkMaster=localhost:6123 "
> {code}
> it seems the flink cluster regard the hdfs as local file system. 
> The input log from flink-jobmanger.log is:
> {code:java}
> 2017-09-27 20:17:37,962 INFO  org.apache.flink.runtime.jobmanager.JobManager  
>   - Successfully ran initialization on master in 136 ms.
> 2017-09-27 20:17:37,968 INFO  org.apache.beam.sdk.io.FileBasedSource  
>   - {color:red}Filepattern hdfs://localhost:9000/tmp/words2 
> matched 0 files with total size 0{color}
> 2017-09-27 20:17:37,968 INFO  org.apache.beam.sdk.io.FileBasedSource  
>   - Splitting filepattern hdfs://localhost:9000/tmp/words2 into 
> bundles of size 0 took 0 ms and produced 0 files a
> nd 0 bundles
> {code}
> The output  error message is :
> {code:java}
> Caused by: java.lang.ClassCastException: 
> {color:red}org.apache.beam.sdk.io.hdfs.HadoopResourceId cannot be cast to 
> org.apache.beam.sdk.io.LocalResourceId{color}
> at 
> org.apache.beam.sdk.io.LocalFileSystem.create(LocalFileSystem.java:77)
> at org.apache.beam.sdk.io.FileSystems.create(FileSystems.java:256)
> at org.apache.beam.sdk.io.FileSystems.create(FileSystems.java:243)
> at 
> org.apache.beam.sdk.io.FileBasedSink$Writer.open(FileBasedSink.java:922)
> at 
> org.apache.beam.sdk.io.FileBasedSink$Writer.openUnwindowed(FileBasedSink.java:884)
> at 
> org.apache.beam.sdk.io.WriteFiles.finalizeForDestinationFillEmptyShards(WriteFiles.java:909)
> at org.apache.beam.sdk.io.WriteFiles.access$900(WriteFiles.java:110)
> at 
> org.apache.beam.sdk.io.WriteFiles$2.processElement(WriteFiles.java:858)
> {code}
> can somebody help me, i've try all the way just can't work it out [cry]
> https://issues.apache.org/jira/browse/BEAM-2457



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (BEAM-2995) can't read/write hdfs in Flink CLUSTER(Standalone)

2018-02-02 Thread Dawid Wysakowicz (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2995?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dawid Wysakowicz reassigned BEAM-2995:
--

Assignee: Dawid Wysakowicz  (was: Aljoscha Krettek)

> can't read/write hdfs in Flink CLUSTER(Standalone)
> --
>
> Key: BEAM-2995
> URL: https://issues.apache.org/jira/browse/BEAM-2995
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Affects Versions: 2.2.0
>Reporter: huangjianhuang
>Assignee: Dawid Wysakowicz
>Priority: Major
>
> i just write a simple demo like:
> {code:java}
> Configuration conf = new Configuration();
> conf.set("fs.default.name", "hdfs://localhost:9000");
> //other codes
> p.apply("ReadLines", 
> TextIO.read().from("hdfs://localhost:9000/tmp/words"))
> 
> .apply(TextIO.write().to("hdfs://localhost:9000/tmp/hdfsout"));
> {code}
> it works in flink local model with cmd:
> {code:java}
> mvn exec:java -Dexec.mainClass=com.joe.FlinkWithHDFS -Pflink-runner 
> -Dexec.args="--runner=FlinkRunner 
> --filesToStage=target/flinkBeam-2.2.0-SNAPSHOT-shaded.jar"
> {code}
> but not works in CLUSTER mode:
> {code:java}
> mvn exec:java -Dexec.mainClass=com.joe.FlinkWithHDFS -Pflink-runner 
> -Dexec.args="--runner=FlinkRunner 
> --filesToStage=target/flinkBeam-2.2.0-SNAPSHOT-shaded.jar 
> --flinkMaster=localhost:6123 "
> {code}
> it seems the flink cluster regard the hdfs as local file system. 
> The input log from flink-jobmanger.log is:
> {code:java}
> 2017-09-27 20:17:37,962 INFO  org.apache.flink.runtime.jobmanager.JobManager  
>   - Successfully ran initialization on master in 136 ms.
> 2017-09-27 20:17:37,968 INFO  org.apache.beam.sdk.io.FileBasedSource  
>   - {color:red}Filepattern hdfs://localhost:9000/tmp/words2 
> matched 0 files with total size 0{color}
> 2017-09-27 20:17:37,968 INFO  org.apache.beam.sdk.io.FileBasedSource  
>   - Splitting filepattern hdfs://localhost:9000/tmp/words2 into 
> bundles of size 0 took 0 ms and produced 0 files a
> nd 0 bundles
> {code}
> The output  error message is :
> {code:java}
> Caused by: java.lang.ClassCastException: 
> {color:red}org.apache.beam.sdk.io.hdfs.HadoopResourceId cannot be cast to 
> org.apache.beam.sdk.io.LocalResourceId{color}
> at 
> org.apache.beam.sdk.io.LocalFileSystem.create(LocalFileSystem.java:77)
> at org.apache.beam.sdk.io.FileSystems.create(FileSystems.java:256)
> at org.apache.beam.sdk.io.FileSystems.create(FileSystems.java:243)
> at 
> org.apache.beam.sdk.io.FileBasedSink$Writer.open(FileBasedSink.java:922)
> at 
> org.apache.beam.sdk.io.FileBasedSink$Writer.openUnwindowed(FileBasedSink.java:884)
> at 
> org.apache.beam.sdk.io.WriteFiles.finalizeForDestinationFillEmptyShards(WriteFiles.java:909)
> at org.apache.beam.sdk.io.WriteFiles.access$900(WriteFiles.java:110)
> at 
> org.apache.beam.sdk.io.WriteFiles$2.processElement(WriteFiles.java:858)
> {code}
> can somebody help me, i've try all the way just can't work it out [cry]
> https://issues.apache.org/jira/browse/BEAM-2457



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)