[jira] [Commented] (BEAM-3858) Data from JdbcIO.read() cannot pass to next transform on ApexRunner

2018-03-28 Thread huangjianhuang (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-3858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16418409#comment-16418409
 ] 

huangjianhuang commented on BEAM-3858:
--

[~jbonofre] any suggestion?

> Data from JdbcIO.read() cannot pass to next transform on ApexRunner
> ---
>
> Key: BEAM-3858
> URL: https://issues.apache.org/jira/browse/BEAM-3858
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-jdbc, runner-apex
>Affects Versions: 2.3.0
> Environment: ubuntu16.04
>Reporter: huangjianhuang
>Assignee: Jean-Baptiste Onofré
>Priority: Major
>
> {code:java}
> public static void testJDBCRead(Pipeline pipeline) {
> System.out.println("in testJDBCRead()");
> pipeline.apply(JdbcIO.read()
> 
> .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
> "com.mysql.jdbc.Driver", 
> "jdbc:mysql://localhost:3307/libra")
> .withUsername("root")
> .withPassword("123456"))
> .withQuery("SELECT * FROM o_flow_account_login limit 3")
> .withCoder(StringUtf8Coder.of())
> .withRowMapper(new JdbcIO.RowMapper() {
> public String mapRow(ResultSet resultSet) throws Exception {
> System.out.println("maprow");
> return "tmp";
> }
> })
> )
> .apply(ParDo.of(new DoFn() {
> @ProcessElement
> public void process(ProcessContext context) {
> System.out.println("??");
> context.output(" ");
> }
> }));
> }
> {code}
> On DirectRunner or FlinkRunner, screen shows:
> {code:java}
> maprow
> maprow
> maprow
> ??
> ??
> ??
> {code}
> however on ApexRunner, screen only shows:
> {code:java}
> maprow
> maprow
> maprow
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (BEAM-3858) Data from JdbcIO.read() cannot pass to next transform on ApexRunner

2018-03-14 Thread huangjianhuang (JIRA)
huangjianhuang created BEAM-3858:


 Summary: Data from JdbcIO.read() cannot pass to next transform on 
ApexRunner
 Key: BEAM-3858
 URL: https://issues.apache.org/jira/browse/BEAM-3858
 Project: Beam
  Issue Type: Bug
  Components: io-java-jdbc, runner-apex
Affects Versions: 2.3.0
 Environment: ubuntu16.04
Reporter: huangjianhuang
Assignee: Jean-Baptiste Onofré


{code:java}

public static void testJDBCRead(Pipeline pipeline) {
System.out.println("in testJDBCRead()");
pipeline.apply(JdbcIO.read()
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
"com.mysql.jdbc.Driver", 
"jdbc:mysql://localhost:3307/libra")
.withUsername("root")
.withPassword("123456"))
.withQuery("SELECT * FROM o_flow_account_login limit 3")
.withCoder(StringUtf8Coder.of())
.withRowMapper(new JdbcIO.RowMapper() {
public String mapRow(ResultSet resultSet) throws Exception {
System.out.println("maprow");
return "tmp";
}
})

)

.apply(ParDo.of(new DoFn() {
@ProcessElement
public void process(ProcessContext context) {
System.out.println("??");
context.output(" ");
}
}));
}
{code}
On DirectRunner or FlinkRunner, screen shows:
{code:java}
maprow
maprow
maprow
??
??
??
{code}
however on ApexRunner, screen only shows:
{code:java}
maprow
maprow
maprow
{code}
 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-3414) AfterProcessingTime trigger issue with Flink Runner

2018-02-04 Thread huangjianhuang (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-3414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16352010#comment-16352010
 ] 

huangjianhuang commented on BEAM-3414:
--

[~kenn] Is jstorm-runner released? [Official 
document|https://beam.apache.org/documentation/runners/jstorm/] says we can add 
it by:
{code:java}
// code placeholder

  org.apache.beam
  beam-runners-jstorm
  2.2.0


{code}
But the dependency seems not available.

 

> AfterProcessingTime trigger issue with Flink Runner
> ---
>
> Key: BEAM-3414
> URL: https://issues.apache.org/jira/browse/BEAM-3414
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core, runner-flink
>Affects Versions: 2.2.0
> Environment: idea, ubuntu 16.04, FlinkRunner
>Reporter: huangjianhuang
>Assignee: Aljoscha Krettek
>Priority: Major
>
> in my demo, I read data from kafka and count globally, finally output the 
> total count of recieved data, as follow:
> {code:java}
> FlinkPipelineOptions options = 
> PipelineOptionsFactory.fromArgs(args).withValidation()
> .as(FlinkPipelineOptions.class);
> options.setStreaming(true);
> options.setRunner(FlinkRunner.class);
> Pipeline pipeline = Pipeline.create(options);
> pipeline
> .apply("Read from kafka",
> KafkaIO.read()
> //.withTimestampFn(kafkaData -> 
> TimeUtil.timeMillisToInstant(kafkaData.getKey()))
> .withBootstrapServers("localhost:9092")
> .withTopic("recharge")
> .withKeyDeserializer(StringDeserializer.class)
> 
> .withValueDeserializer(StringDeserializer.class)
> .withoutMetadata()
> )
> .apply(Values.create())
> .apply(Window.into(new GlobalWindows())
> .triggering(Repeatedly.forever(
> 
> AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(5
> .accumulatingFiredPanes()
> )
> .apply(Count.globally())
> .apply("output",
> ParDo.of(new DoFn() {
> @ProcessElement
> public void process(ProcessContext context) {
> System.out.println("---get at: " + 
> Instant.now() + "--");
> System.out.println(context.element());
> }
> }));
> {code}
> the result should be displayed after (5s) I sent first data, but sometimes 
> there were nothing display after I sent data. the pic shows the outputs i got 
> in a test:
> (cant upload a pic, desc as text)
> {code:java}
> Send 681Msg at: 2018-01-05T06:34:31.436
>   ---get at: 2018-01-05T06:34:36.668Z--
>   681
> Send 681Msg at: 2018-01-05T06:34:47.166
>   ---get at: 2018-01-05T06:34:52.284Z--
>   1362
> Send 681Msg at: 2018-01-05T06:34:55.505
> Send 681Msg at: 2018-01-05T06:35:22.068
>   ---get at: 2018-01-05T06:35:22.112Z--
>   2044
> {code}
> btw, the code works fine with direct runner.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-3414) AfterProcessingTime trigger issue with Flink Runner

2018-02-04 Thread huangjianhuang (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-3414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16352008#comment-16352008
 ] 

huangjianhuang commented on BEAM-3414:
--

[~aljoscha] thanks, can it be fixed this month? Our project was delayed by this 
issue for a long time, if not, we have to try other frameworks.

> AfterProcessingTime trigger issue with Flink Runner
> ---
>
> Key: BEAM-3414
> URL: https://issues.apache.org/jira/browse/BEAM-3414
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core, runner-flink
>Affects Versions: 2.2.0
> Environment: idea, ubuntu 16.04, FlinkRunner
>Reporter: huangjianhuang
>Assignee: Aljoscha Krettek
>Priority: Major
>
> in my demo, I read data from kafka and count globally, finally output the 
> total count of recieved data, as follow:
> {code:java}
> FlinkPipelineOptions options = 
> PipelineOptionsFactory.fromArgs(args).withValidation()
> .as(FlinkPipelineOptions.class);
> options.setStreaming(true);
> options.setRunner(FlinkRunner.class);
> Pipeline pipeline = Pipeline.create(options);
> pipeline
> .apply("Read from kafka",
> KafkaIO.read()
> //.withTimestampFn(kafkaData -> 
> TimeUtil.timeMillisToInstant(kafkaData.getKey()))
> .withBootstrapServers("localhost:9092")
> .withTopic("recharge")
> .withKeyDeserializer(StringDeserializer.class)
> 
> .withValueDeserializer(StringDeserializer.class)
> .withoutMetadata()
> )
> .apply(Values.create())
> .apply(Window.into(new GlobalWindows())
> .triggering(Repeatedly.forever(
> 
> AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(5
> .accumulatingFiredPanes()
> )
> .apply(Count.globally())
> .apply("output",
> ParDo.of(new DoFn() {
> @ProcessElement
> public void process(ProcessContext context) {
> System.out.println("---get at: " + 
> Instant.now() + "--");
> System.out.println(context.element());
> }
> }));
> {code}
> the result should be displayed after (5s) I sent first data, but sometimes 
> there were nothing display after I sent data. the pic shows the outputs i got 
> in a test:
> (cant upload a pic, desc as text)
> {code:java}
> Send 681Msg at: 2018-01-05T06:34:31.436
>   ---get at: 2018-01-05T06:34:36.668Z--
>   681
> Send 681Msg at: 2018-01-05T06:34:47.166
>   ---get at: 2018-01-05T06:34:52.284Z--
>   1362
> Send 681Msg at: 2018-01-05T06:34:55.505
> Send 681Msg at: 2018-01-05T06:35:22.068
>   ---get at: 2018-01-05T06:35:22.112Z--
>   2044
> {code}
> btw, the code works fine with direct runner.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-3414) AfterProcessingTime trigger issue with Flink Runner

2018-01-28 Thread huangjianhuang (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-3414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16342837#comment-16342837
 ] 

huangjianhuang commented on BEAM-3414:
--

[~kenn] [~aljoscha] are you still fixing this issue? I‘m distressed on this 
issue.

> AfterProcessingTime trigger issue with Flink Runner
> ---
>
> Key: BEAM-3414
> URL: https://issues.apache.org/jira/browse/BEAM-3414
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core, runner-flink
>Affects Versions: 2.2.0
> Environment: idea, ubuntu 16.04, FlinkRunner
>Reporter: huangjianhuang
>Assignee: Aljoscha Krettek
>Priority: Major
>
> in my demo, I read data from kafka and count globally, finally output the 
> total count of recieved data, as follow:
> {code:java}
> FlinkPipelineOptions options = 
> PipelineOptionsFactory.fromArgs(args).withValidation()
> .as(FlinkPipelineOptions.class);
> options.setStreaming(true);
> options.setRunner(FlinkRunner.class);
> Pipeline pipeline = Pipeline.create(options);
> pipeline
> .apply("Read from kafka",
> KafkaIO.read()
> //.withTimestampFn(kafkaData -> 
> TimeUtil.timeMillisToInstant(kafkaData.getKey()))
> .withBootstrapServers("localhost:9092")
> .withTopic("recharge")
> .withKeyDeserializer(StringDeserializer.class)
> 
> .withValueDeserializer(StringDeserializer.class)
> .withoutMetadata()
> )
> .apply(Values.create())
> .apply(Window.into(new GlobalWindows())
> .triggering(Repeatedly.forever(
> 
> AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(5
> .accumulatingFiredPanes()
> )
> .apply(Count.globally())
> .apply("output",
> ParDo.of(new DoFn() {
> @ProcessElement
> public void process(ProcessContext context) {
> System.out.println("---get at: " + 
> Instant.now() + "--");
> System.out.println(context.element());
> }
> }));
> {code}
> the result should be displayed after (5s) I sent first data, but sometimes 
> there were nothing display after I sent data. the pic shows the outputs i got 
> in a test:
> (cant upload a pic, desc as text)
> {code:java}
> Send 681Msg at: 2018-01-05T06:34:31.436
>   ---get at: 2018-01-05T06:34:36.668Z--
>   681
> Send 681Msg at: 2018-01-05T06:34:47.166
>   ---get at: 2018-01-05T06:34:52.284Z--
>   1362
> Send 681Msg at: 2018-01-05T06:34:55.505
> Send 681Msg at: 2018-01-05T06:35:22.068
>   ---get at: 2018-01-05T06:35:22.112Z--
>   2044
> {code}
> btw, the code works fine with direct runner.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Issue Comment Deleted] (BEAM-3414) AfterProcessingTime trigger issue with Flink Runner

2018-01-17 Thread huangjianhuang (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3414?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

huangjianhuang updated BEAM-3414:
-
Comment: was deleted

(was: Thanks for help. Does this bug appear in other Runner? Or can you give me 
some advice which Runner is more close to the DirectRunner. My codes works fine 
with DirectRunner but got so many problems with FlinkRunner:()

> AfterProcessingTime trigger issue with Flink Runner
> ---
>
> Key: BEAM-3414
> URL: https://issues.apache.org/jira/browse/BEAM-3414
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core, runner-flink
>Affects Versions: 2.2.0
> Environment: idea, ubuntu 16.04, FlinkRunner
>Reporter: huangjianhuang
>Assignee: Kenneth Knowles
>Priority: Major
>
> in my demo, I read data from kafka and count globally, finally output the 
> total count of recieved data, as follow:
> {code:java}
> FlinkPipelineOptions options = 
> PipelineOptionsFactory.fromArgs(args).withValidation()
> .as(FlinkPipelineOptions.class);
> options.setStreaming(true);
> options.setRunner(FlinkRunner.class);
> Pipeline pipeline = Pipeline.create(options);
> pipeline
> .apply("Read from kafka",
> KafkaIO.read()
> //.withTimestampFn(kafkaData -> 
> TimeUtil.timeMillisToInstant(kafkaData.getKey()))
> .withBootstrapServers("localhost:9092")
> .withTopic("recharge")
> .withKeyDeserializer(StringDeserializer.class)
> 
> .withValueDeserializer(StringDeserializer.class)
> .withoutMetadata()
> )
> .apply(Values.create())
> .apply(Window.into(new GlobalWindows())
> .triggering(Repeatedly.forever(
> 
> AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(5
> .accumulatingFiredPanes()
> )
> .apply(Count.globally())
> .apply("output",
> ParDo.of(new DoFn() {
> @ProcessElement
> public void process(ProcessContext context) {
> System.out.println("---get at: " + 
> Instant.now() + "--");
> System.out.println(context.element());
> }
> }));
> {code}
> the result should be displayed after (5s) I sent first data, but sometimes 
> there were nothing display after I sent data. the pic shows the outputs i got 
> in a test:
> (cant upload a pic, desc as text)
> {code:java}
> Send 681Msg at: 2018-01-05T06:34:31.436
>   ---get at: 2018-01-05T06:34:36.668Z--
>   681
> Send 681Msg at: 2018-01-05T06:34:47.166
>   ---get at: 2018-01-05T06:34:52.284Z--
>   1362
> Send 681Msg at: 2018-01-05T06:34:55.505
> Send 681Msg at: 2018-01-05T06:35:22.068
>   ---get at: 2018-01-05T06:35:22.112Z--
>   2044
> {code}
> btw, the code works fine with direct runner.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-3423) Distinct.withRepresentativeValueFn throws CoderException "cannot encode null KV"

2018-01-09 Thread huangjianhuang (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-3423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16319609#comment-16319609
 ] 

huangjianhuang commented on BEAM-3423:
--

[~kenn] thx. yes, it does work fine without early firings. Maybe the null KVs 
came out because inputs were flush by  early firings and there were no inputs 
when event trigger fried? 

> Distinct.withRepresentativeValueFn throws CoderException "cannot encode null 
> KV" 
> -
>
> Key: BEAM-3423
> URL: https://issues.apache.org/jira/browse/BEAM-3423
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-core
>Affects Versions: 2.2.0
> Environment: ubuntu16.04, idea, java8
>Reporter: huangjianhuang
>Assignee: Kenneth Knowles
>
> My code as follow:
> {code:java}
> pipeline
> //Read data
> .apply("Read from kafka",
> KafkaIO.read()
> .withBootstrapServers("localhost:9092")
> .withTopic(topic)
> .withKeyDeserializer(StringDeserializer.class)
> 
> .withValueDeserializer(StringDeserializer.class)
> .withoutMetadata()
> )
> .apply(Window. String>>into(FixedWindows.of(Duration.standardSeconds(10)))
> .triggering(AfterWatermark.pastEndOfWindow()
> 
> .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(5
> 
> .discardingFiredPanes().withAllowedLateness(Duration.ZERO))
> //works fine
> //.apply(Distinct.create())
> //ops! -> CoderException: cannot encode a null KV
> .apply(Distinct.withRepresentativeValueFn(new 
> Val()).withRepresentativeType(TypeDescriptors.strings()))
> .apply(MapElements.into(TypeDescriptors.nulls())
> .via(input -> {
> System.out.println(Instant.now());
> System.out.println(input);
> return null;
> }));
> private static class Val implements SerializableFunction String>, String> {
> @Override
> public String apply(KV input) {
> return input.getValue();
> }
> }
> {code}
> Input words to Kafka:
> word1
> //after 10s
> word2
> Then got exceptions as follow:
> {code:java}
> begin
> 2018-01-06T11:18:52.971Z
> KV{null, a}
> Exception in thread "main" 
> org.apache.beam.sdk.Pipeline$PipelineExecutionException: 
> java.lang.RuntimeException: org.apache.beam.sdk.coders.CoderException: cannot 
> encode a null KV
>   at 
> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:344)
>   at 
> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:314)
>   at 
> org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:208)
>   at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:62)
>   at org.apache.beam.sdk.Pipeline.run(Pipeline.java:303)
>   at org.apache.beam.sdk.Pipeline.run(Pipeline.java:289)
>   at com.xiaomi.huyu.processor.dev.EntryPoint.main(EntryPoint.java:37)
> Caused by: java.lang.RuntimeException: 
> org.apache.beam.sdk.coders.CoderException: cannot encode a null KV
>   at 
> org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add(ImmutabilityCheckingBundleFactory.java:113)
>   at 
> org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output(ParDoEvaluator.java:235)
>   at 
> org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:211)
>   at 
> org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner.access$700(SimpleDoFnRunner.java:66)
>   at 
> org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:436)
>   at 
> org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:424)
>   at 
> org.apache.beam.sdk.transforms.Combine$GroupedValues$1.processElement(Combine.java:2149)
> Caused by: org.apache.beam.sdk.coders.CoderException: cannot encode a null KV
>   at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:70)
>   at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:36)
>   at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:73)
>   at 

[jira] [Comment Edited] (BEAM-3423) Distinct.withRepresentativeValueFn with EventTime Trigger throws Exceptions

2018-01-06 Thread huangjianhuang (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-3423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16315057#comment-16315057
 ] 

huangjianhuang edited comment on BEAM-3423 at 1/7/18 4:05 AM:
--

[~kenn] sorry, I cant understand what you mentioned "relevant trigger" means, 
did i make some mistake in my code?
To advoid this exception, finally I use MyKV defined by myself instead of KV, 
and it works well.
I have no idea where the null KV came out in my code.


{code:java}
public class MyKV implements Serializable()
{
private String key;
private String val;

//getter and setter
}
{code}



was (Author: huangjianhuang):
[~kenn] sorry, I cant understand what you mentioned "relevant trigger" means, 
did i make some mistake in my code?
To advoid this exception, finally I use MyKV defined by myself instead of KV, 
and it works ok.
I have no idea where the null KV came out in my code.


{code:java}
public class MyKV implements Serializable()
{
private String key;
private String val;

//getter and setter
}
{code}


> Distinct.withRepresentativeValueFn with EventTime Trigger throws Exceptions 
> 
>
> Key: BEAM-3423
> URL: https://issues.apache.org/jira/browse/BEAM-3423
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-core
>Affects Versions: 2.2.0
> Environment: ubuntu16.04, idea, java8
>Reporter: huangjianhuang
>Assignee: Kenneth Knowles
>
> My code as follow:
> {code:java}
> pipeline
> //Read data
> .apply("Read from kafka",
> KafkaIO.read()
> .withBootstrapServers("localhost:9092")
> .withTopic(topic)
> .withKeyDeserializer(StringDeserializer.class)
> 
> .withValueDeserializer(StringDeserializer.class)
> .withoutMetadata()
> )
> .apply(Window. String>>into(FixedWindows.of(Duration.standardSeconds(10)))
> .triggering(AfterWatermark.pastEndOfWindow()
> 
> .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(5
> 
> .discardingFiredPanes().withAllowedLateness(Duration.ZERO))
> //works fine
> //.apply(Distinct.create())
> //ops! -> CoderException: cannot encode a null KV
> .apply(Distinct.withRepresentativeValueFn(new 
> Val()).withRepresentativeType(TypeDescriptors.strings()))
> .apply(MapElements.into(TypeDescriptors.nulls())
> .via(input -> {
> System.out.println(Instant.now());
> System.out.println(input);
> return null;
> }));
> private static class Val implements SerializableFunction String>, String> {
> @Override
> public String apply(KV input) {
> return input.getValue();
> }
> }
> {code}
> Input words to Kafka:
> word1
> //after 10s
> word2
> Then got exceptions as follow:
> {code:java}
> begin
> 2018-01-06T11:18:52.971Z
> KV{null, a}
> Exception in thread "main" 
> org.apache.beam.sdk.Pipeline$PipelineExecutionException: 
> java.lang.RuntimeException: org.apache.beam.sdk.coders.CoderException: cannot 
> encode a null KV
>   at 
> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:344)
>   at 
> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:314)
>   at 
> org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:208)
>   at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:62)
>   at org.apache.beam.sdk.Pipeline.run(Pipeline.java:303)
>   at org.apache.beam.sdk.Pipeline.run(Pipeline.java:289)
>   at com.xiaomi.huyu.processor.dev.EntryPoint.main(EntryPoint.java:37)
> Caused by: java.lang.RuntimeException: 
> org.apache.beam.sdk.coders.CoderException: cannot encode a null KV
>   at 
> org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add(ImmutabilityCheckingBundleFactory.java:113)
>   at 
> org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output(ParDoEvaluator.java:235)
>   at 
> org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:211)
>   at 
> org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner.access$700(SimpleDoFnRunner.java:66)
>   

[jira] [Commented] (BEAM-3423) Distinct.withRepresentativeValueFn with EventTime Trigger throws Exceptions

2018-01-06 Thread huangjianhuang (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-3423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16315057#comment-16315057
 ] 

huangjianhuang commented on BEAM-3423:
--

[~kenn] sorry, I cant understand what you mentioned "relevant trigger" means, 
did i make some mistake in my code?
To advoid this exception, finally I use MyKV defined by myself instead of KV, 
and it works ok.
I have no idea where the null KV came out in my code.


{code:java}
public class MyKV implements Serializable()
{
private String key;
private String val;

//getter and setter
}
{code}


> Distinct.withRepresentativeValueFn with EventTime Trigger throws Exceptions 
> 
>
> Key: BEAM-3423
> URL: https://issues.apache.org/jira/browse/BEAM-3423
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-core
>Affects Versions: 2.2.0
> Environment: ubuntu16.04, idea, java8
>Reporter: huangjianhuang
>Assignee: Kenneth Knowles
>
> My code as follow:
> {code:java}
> pipeline
> //Read data
> .apply("Read from kafka",
> KafkaIO.read()
> .withBootstrapServers("localhost:9092")
> .withTopic(topic)
> .withKeyDeserializer(StringDeserializer.class)
> 
> .withValueDeserializer(StringDeserializer.class)
> .withoutMetadata()
> )
> .apply(Window. String>>into(FixedWindows.of(Duration.standardSeconds(10)))
> .triggering(AfterWatermark.pastEndOfWindow()
> 
> .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(5
> 
> .discardingFiredPanes().withAllowedLateness(Duration.ZERO))
> //works fine
> //.apply(Distinct.create())
> //ops! -> CoderException: cannot encode a null KV
> .apply(Distinct.withRepresentativeValueFn(new 
> Val()).withRepresentativeType(TypeDescriptors.strings()))
> .apply(MapElements.into(TypeDescriptors.nulls())
> .via(input -> {
> System.out.println(Instant.now());
> System.out.println(input);
> return null;
> }));
> private static class Val implements SerializableFunction String>, String> {
> @Override
> public String apply(KV input) {
> return input.getValue();
> }
> }
> {code}
> Input words to Kafka:
> word1
> //after 10s
> word2
> Then got exceptions as follow:
> {code:java}
> begin
> 2018-01-06T11:18:52.971Z
> KV{null, a}
> Exception in thread "main" 
> org.apache.beam.sdk.Pipeline$PipelineExecutionException: 
> java.lang.RuntimeException: org.apache.beam.sdk.coders.CoderException: cannot 
> encode a null KV
>   at 
> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:344)
>   at 
> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:314)
>   at 
> org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:208)
>   at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:62)
>   at org.apache.beam.sdk.Pipeline.run(Pipeline.java:303)
>   at org.apache.beam.sdk.Pipeline.run(Pipeline.java:289)
>   at com.xiaomi.huyu.processor.dev.EntryPoint.main(EntryPoint.java:37)
> Caused by: java.lang.RuntimeException: 
> org.apache.beam.sdk.coders.CoderException: cannot encode a null KV
>   at 
> org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add(ImmutabilityCheckingBundleFactory.java:113)
>   at 
> org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output(ParDoEvaluator.java:235)
>   at 
> org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:211)
>   at 
> org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner.access$700(SimpleDoFnRunner.java:66)
>   at 
> org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:436)
>   at 
> org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:424)
>   at 
> org.apache.beam.sdk.transforms.Combine$GroupedValues$1.processElement(Combine.java:2149)
> Caused by: org.apache.beam.sdk.coders.CoderException: cannot encode a null KV
>   at 

[jira] [Commented] (BEAM-3414) AfterProcessingTime trigger issue with Flink Runner

2018-01-06 Thread huangjianhuang (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-3414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16314517#comment-16314517
 ] 

huangjianhuang commented on BEAM-3414:
--

Thanks for help. Does this bug appear in other Runner? Or can you give me some 
advice which Runner is more close to the DirectRunner. My codes works fine with 
DirectRunner but got so many problems with FlinkRunner:(

> AfterProcessingTime trigger issue with Flink Runner
> ---
>
> Key: BEAM-3414
> URL: https://issues.apache.org/jira/browse/BEAM-3414
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core, runner-flink
>Affects Versions: 2.2.0
> Environment: idea, ubuntu 16.04, FlinkRunner
>Reporter: huangjianhuang
>Assignee: Kenneth Knowles
>
> in my demo, I read data from kafka and count globally, finally output the 
> total count of recieved data, as follow:
> {code:java}
> FlinkPipelineOptions options = 
> PipelineOptionsFactory.fromArgs(args).withValidation()
> .as(FlinkPipelineOptions.class);
> options.setStreaming(true);
> options.setRunner(FlinkRunner.class);
> Pipeline pipeline = Pipeline.create(options);
> pipeline
> .apply("Read from kafka",
> KafkaIO.read()
> //.withTimestampFn(kafkaData -> 
> TimeUtil.timeMillisToInstant(kafkaData.getKey()))
> .withBootstrapServers("localhost:9092")
> .withTopic("recharge")
> .withKeyDeserializer(StringDeserializer.class)
> 
> .withValueDeserializer(StringDeserializer.class)
> .withoutMetadata()
> )
> .apply(Values.create())
> .apply(Window.into(new GlobalWindows())
> .triggering(Repeatedly.forever(
> 
> AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(5
> .accumulatingFiredPanes()
> )
> .apply(Count.globally())
> .apply("output",
> ParDo.of(new DoFn() {
> @ProcessElement
> public void process(ProcessContext context) {
> System.out.println("---get at: " + 
> Instant.now() + "--");
> System.out.println(context.element());
> }
> }));
> {code}
> the result should be displayed after (5s) I sent first data, but sometimes 
> there were nothing display after I sent data. the pic shows the outputs i got 
> in a test:
> (cant upload a pic, desc as text)
> {code:java}
> Send 681Msg at: 2018-01-05T06:34:31.436
>   ---get at: 2018-01-05T06:34:36.668Z--
>   681
> Send 681Msg at: 2018-01-05T06:34:47.166
>   ---get at: 2018-01-05T06:34:52.284Z--
>   1362
> Send 681Msg at: 2018-01-05T06:34:55.505
> Send 681Msg at: 2018-01-05T06:35:22.068
>   ---get at: 2018-01-05T06:35:22.112Z--
>   2044
> {code}
> btw, the code works fine with direct runner.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (BEAM-3423) Distinct.withRepresentativeValueFn with EventTime Trigger throws Exceptions

2018-01-06 Thread huangjianhuang (JIRA)
huangjianhuang created BEAM-3423:


 Summary: Distinct.withRepresentativeValueFn with EventTime Trigger 
throws Exceptions 
 Key: BEAM-3423
 URL: https://issues.apache.org/jira/browse/BEAM-3423
 Project: Beam
  Issue Type: Bug
  Components: runner-core, runner-direct
Affects Versions: 2.2.0
 Environment: ubuntu16.04, idea, java8
Reporter: huangjianhuang
Assignee: Kenneth Knowles


My code as follow:


{code:java}
pipeline
//Read data
.apply("Read from kafka",
KafkaIO.read()
.withBootstrapServers("localhost:9092")
.withTopic(topic)
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
.withoutMetadata()
)
.apply(Window.>into(FixedWindows.of(Duration.standardSeconds(10)))
.triggering(AfterWatermark.pastEndOfWindow()

.withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(5

.discardingFiredPanes().withAllowedLateness(Duration.ZERO))
//works fine
//.apply(Distinct.create())
//ops! -> CoderException: cannot encode a null KV
.apply(Distinct.withRepresentativeValueFn(new 
Val()).withRepresentativeType(TypeDescriptors.strings()))
.apply(MapElements.into(TypeDescriptors.nulls())
.via(input -> {
System.out.println(Instant.now());
System.out.println(input);
return null;
}));

private static class Val implements SerializableFunction, String> {
@Override
public String apply(KV input) {
return input.getValue();
}
}
{code}

Input words to Kafka:
word1
//after 10s
word2

Then got exceptions as follow:

{code:java}
begin
2018-01-06T11:18:52.971Z
KV{null, a}
Exception in thread "main" 
org.apache.beam.sdk.Pipeline$PipelineExecutionException: 
java.lang.RuntimeException: org.apache.beam.sdk.coders.CoderException: cannot 
encode a null KV
at 
org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:344)
at 
org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:314)
at 
org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:208)
at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:62)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:303)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:289)
at com.xiaomi.huyu.processor.dev.EntryPoint.main(EntryPoint.java:37)
Caused by: java.lang.RuntimeException: 
org.apache.beam.sdk.coders.CoderException: cannot encode a null KV
at 
org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add(ImmutabilityCheckingBundleFactory.java:113)
at 
org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output(ParDoEvaluator.java:235)
at 
org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:211)
at 
org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner.access$700(SimpleDoFnRunner.java:66)
at 
org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:436)
at 
org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:424)
at 
org.apache.beam.sdk.transforms.Combine$GroupedValues$1.processElement(Combine.java:2149)
Caused by: org.apache.beam.sdk.coders.CoderException: cannot encode a null KV
at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:70)
at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:36)
at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:73)
at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:36)
at 
org.apache.beam.sdk.util.CoderUtils.encodeToSafeStream(CoderUtils.java:93)
at 
org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:77)
at 
org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:62)
at 
org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.(MutationDetectors.java:106)
at 
org.apache.beam.sdk.util.MutationDetectors.forValueWithCoder(MutationDetectors.java:44)
at 

[jira] [Commented] (BEAM-3414) AfterProcessingTime trigger issue with Flink Runner

2018-01-04 Thread huangjianhuang (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-3414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16312593#comment-16312593
 ] 

huangjianhuang commented on BEAM-3414:
--

the trigger would never fire after 06:34:55.505 if i didnot send new data. how 
could i fix it without new input?

> AfterProcessingTime trigger issue with Flink Runner
> ---
>
> Key: BEAM-3414
> URL: https://issues.apache.org/jira/browse/BEAM-3414
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core, runner-flink
>Affects Versions: 2.2.0
> Environment: idea, ubuntu 16.04, FlinkRunner
>Reporter: huangjianhuang
>Assignee: Kenneth Knowles
>
> in my demo, I read data from kafka and count globally, finally output the 
> total count of recieved data, as follow:
> {code:java}
> FlinkPipelineOptions options = 
> PipelineOptionsFactory.fromArgs(args).withValidation()
> .as(FlinkPipelineOptions.class);
> options.setStreaming(true);
> options.setRunner(FlinkRunner.class);
> Pipeline pipeline = Pipeline.create(options);
> pipeline
> .apply("Read from kafka",
> KafkaIO.read()
> //.withTimestampFn(kafkaData -> 
> TimeUtil.timeMillisToInstant(kafkaData.getKey()))
> .withBootstrapServers("localhost:9092")
> .withTopic("recharge")
> .withKeyDeserializer(StringDeserializer.class)
> 
> .withValueDeserializer(StringDeserializer.class)
> .withoutMetadata()
> )
> .apply(Values.create())
> .apply(Window.into(new GlobalWindows())
> .triggering(Repeatedly.forever(
> 
> AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(5
> .accumulatingFiredPanes()
> )
> .apply(Count.globally())
> .apply("output",
> ParDo.of(new DoFn() {
> @ProcessElement
> public void process(ProcessContext context) {
> System.out.println("---get at: " + 
> Instant.now() + "--");
> System.out.println(context.element());
> }
> }));
> {code}
> the result should be displayed after (5s) I sent first data, but sometimes 
> there were nothing display after I sent data. the pic shows the outputs i got 
> in a test:
> (cant upload a pic, desc as text)
> {code:java}
> Send 681Msg at: 2018-01-05T06:34:31.436
>   ---get at: 2018-01-05T06:34:36.668Z--
>   681
> Send 681Msg at: 2018-01-05T06:34:47.166
>   ---get at: 2018-01-05T06:34:52.284Z--
>   1362
> Send 681Msg at: 2018-01-05T06:34:55.505
> Send 681Msg at: 2018-01-05T06:35:22.068
>   ---get at: 2018-01-05T06:35:22.112Z--
>   2044
> {code}
> btw, the code works fine with direct runner.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (BEAM-3414) AfterProcessingTime trigger issue with Flink Runner

2018-01-04 Thread huangjianhuang (JIRA)
huangjianhuang created BEAM-3414:


 Summary: AfterProcessingTime trigger issue with Flink Runner
 Key: BEAM-3414
 URL: https://issues.apache.org/jira/browse/BEAM-3414
 Project: Beam
  Issue Type: Bug
  Components: runner-core, runner-flink
Affects Versions: 2.2.0
 Environment: idea, ubuntu 16.04, FlinkRunner
Reporter: huangjianhuang
Assignee: Kenneth Knowles


in my demo, I read data from kafka and count globally, finally output the total 
count of recieved data, as follow:

{code:java}
FlinkPipelineOptions options = 
PipelineOptionsFactory.fromArgs(args).withValidation()
.as(FlinkPipelineOptions.class);

options.setStreaming(true);
options.setRunner(FlinkRunner.class);
Pipeline pipeline = Pipeline.create(options);
pipeline
.apply("Read from kafka",
KafkaIO.read()
//.withTimestampFn(kafkaData -> 
TimeUtil.timeMillisToInstant(kafkaData.getKey()))
.withBootstrapServers("localhost:9092")
.withTopic("recharge")
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
.withoutMetadata()
)
.apply(Values.create())
.apply(Window.into(new GlobalWindows())
.triggering(Repeatedly.forever(

AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(5
.accumulatingFiredPanes()
)
.apply(Count.globally())
.apply("output",
ParDo.of(new DoFn() {
@ProcessElement
public void process(ProcessContext context) {
System.out.println("---get at: " + 
Instant.now() + "--");
System.out.println(context.element());
}
}));
{code}

the result should be displayed after (5s) I sent first data, but sometimes 
there were nothing display after I sent data. the pic shows the outputs i got 
in a test:

(cant upload a pic, desc as text)

{code:java}
Send 681Msg at: 2018-01-05T06:34:31.436

---get at: 2018-01-05T06:34:36.668Z--
681

Send 681Msg at: 2018-01-05T06:34:47.166

---get at: 2018-01-05T06:34:52.284Z--
1362

Send 681Msg at: 2018-01-05T06:34:55.505

Send 681Msg at: 2018-01-05T06:35:22.068

---get at: 2018-01-05T06:35:22.112Z--
2044
{code}

btw, the code works fine with direct runner.




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (BEAM-3390) unable to serialize org.apache.beam.sdk.io.jdbc.JdbcIO$Read$ReadFn

2017-12-21 Thread huangjianhuang (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3390?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

huangjianhuang closed BEAM-3390.

   Resolution: Not A Problem
Fix Version/s: 2.1.0

sorry, it is my mistake, not an issue. i put the code into a function and 
called it in main() by => new ClassName().func();

> unable to serialize org.apache.beam.sdk.io.jdbc.JdbcIO$Read$ReadFn
> --
>
> Key: BEAM-3390
> URL: https://issues.apache.org/jira/browse/BEAM-3390
> Project: Beam
>  Issue Type: Bug
>  Components: dsl-sql
>Affects Versions: 2.1.0
> Environment: ubuntu 16.04, idea, direct runner
>Reporter: huangjianhuang
>Assignee: Xu Mingmin
> Fix For: 2.1.0
>
>
> Here is my Demo code:
> {code:java}
> PipelineOptions options = 
> PipelineOptionsFactory.fromArgs(args).withValidation()
> .as(PipelineOptions.class);
> Pipeline pipeline = Pipeline.create(options);
>   pipeline.apply(JdbcIO.>read()
>
> .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
>   "com.mysql.jdbc.Driver", 
> "jdbc:mysql://localhost:3307/libra_stat")
> .withUsername("root")
> .withPassword("123456"))
> .withQuery("select id, game_id from test_tb")
>.withCoder(KvCoder.of(BigEndianIntegerCoder.of(), 
> StringUtf8Coder.of()))
>.withRowMapper(new JdbcIO.RowMapper>() 
> {
> public KV mapRow(ResultSet 
> resultSet) throws Exception {
> System.out.println(resultSet.getInt(1));
> System.out.println(resultSet.getString(2));
> return KV.of(resultSet.getInt(1), 
> resultSet.getString(2));
>   }
> })
>  );
> {code}
> I run this demo by direct runner and got NotSerializableException as follow:
> {code:java}
> java.lang.IllegalArgumentException: unable to serialize 
> org.apache.beam.sdk.io.jdbc.JdbcIO$Read$ReadFn@68f4865
>   at 
> org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:53)
>   at 
> org.apache.beam.sdk.util.SerializableUtils.clone(SerializableUtils.java:90)
>   at 
> org.apache.beam.sdk.transforms.ParDo$SingleOutput.(ParDo.java:591)
>   at org.apache.beam.sdk.transforms.ParDo.of(ParDo.java:435)
>   at org.apache.beam.sdk.io.jdbc.JdbcIO$Read.expand(JdbcIO.java:325)
>   at org.apache.beam.sdk.io.jdbc.JdbcIO$Read.expand(JdbcIO.java:272)
>   at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:514)
>   at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:454)
>   at org.apache.beam.sdk.values.PBegin.apply(PBegin.java:44)
>   at org.apache.beam.sdk.Pipeline.apply(Pipeline.java:165)
>   at com.xiaomi.huyu.processor.demo.SqlDemo.run(SqlDemo.java:30)
>   at com.xiaomi.huyu.processor.demo.SqlDemo.main(SqlDemo.java:21)
> Caused by: java.io.NotSerializableException: 
> com.xiaomi.huyu.processor.demo.SqlDemo
>   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
>   at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>   at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>   at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>   at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>   at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>   at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>   at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>   at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>   at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>   at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>   at 
> org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:49)
>   ... 11 more
> {code}
> Any suggestions and comments are welcome, thanks a lot!



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (BEAM-3390) unable to serialize org.apache.beam.sdk.io.jdbc.JdbcIO$Read$ReadFn

2017-12-21 Thread huangjianhuang (JIRA)
huangjianhuang created BEAM-3390:


 Summary: unable to serialize 
org.apache.beam.sdk.io.jdbc.JdbcIO$Read$ReadFn
 Key: BEAM-3390
 URL: https://issues.apache.org/jira/browse/BEAM-3390
 Project: Beam
  Issue Type: Bug
  Components: dsl-sql
Affects Versions: 2.1.0
 Environment: ubuntu 16.04, idea, direct runner
Reporter: huangjianhuang
Assignee: Xu Mingmin


Here is my Demo code:

{code:java}
PipelineOptions options = 
PipelineOptionsFactory.fromArgs(args).withValidation()
.as(PipelineOptions.class);

Pipeline pipeline = Pipeline.create(options);
  pipeline.apply(JdbcIO.>read()
   
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
  "com.mysql.jdbc.Driver", 
"jdbc:mysql://localhost:3307/libra_stat")
.withUsername("root")
.withPassword("123456"))
.withQuery("select id, game_id from test_tb")
   .withCoder(KvCoder.of(BigEndianIntegerCoder.of(), 
StringUtf8Coder.of()))
   .withRowMapper(new JdbcIO.RowMapper>() {
public KV mapRow(ResultSet 
resultSet) throws Exception {
System.out.println(resultSet.getInt(1));
System.out.println(resultSet.getString(2));
return KV.of(resultSet.getInt(1), 
resultSet.getString(2));
  }
})
 );

{code}

I run this demo by direct runner and got NotSerializableException as follow:

{code:java}
java.lang.IllegalArgumentException: unable to serialize 
org.apache.beam.sdk.io.jdbc.JdbcIO$Read$ReadFn@68f4865
at 
org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:53)
at 
org.apache.beam.sdk.util.SerializableUtils.clone(SerializableUtils.java:90)
at 
org.apache.beam.sdk.transforms.ParDo$SingleOutput.(ParDo.java:591)
at org.apache.beam.sdk.transforms.ParDo.of(ParDo.java:435)
at org.apache.beam.sdk.io.jdbc.JdbcIO$Read.expand(JdbcIO.java:325)
at org.apache.beam.sdk.io.jdbc.JdbcIO$Read.expand(JdbcIO.java:272)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:514)
at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:454)
at org.apache.beam.sdk.values.PBegin.apply(PBegin.java:44)
at org.apache.beam.sdk.Pipeline.apply(Pipeline.java:165)
at com.xiaomi.huyu.processor.demo.SqlDemo.run(SqlDemo.java:30)
at com.xiaomi.huyu.processor.demo.SqlDemo.main(SqlDemo.java:21)
Caused by: java.io.NotSerializableException: 
com.xiaomi.huyu.processor.demo.SqlDemo
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at 
org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:49)
... 11 more
{code}

Any suggestions and comments are welcome, thanks a lot!



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (BEAM-3002) Unable to provide a Coder for org.apache.hadoop.hbase.client.Mutation

2017-09-29 Thread huangjianhuang (JIRA)
huangjianhuang created BEAM-3002:


 Summary: Unable to provide a Coder for 
org.apache.hadoop.hbase.client.Mutation
 Key: BEAM-3002
 URL: https://issues.apache.org/jira/browse/BEAM-3002
 Project: Beam
  Issue Type: Bug
  Components: sdk-java-core
Affects Versions: 2.1.0
 Environment: hadoop2.8.0, hbase1.2.6
Reporter: huangjianhuang
Assignee: Kenneth Knowles


i write a demo with HbaseIO, and format data into Mutation to write to hbase.
The demo works fine on idea or using mvn exec:java command, but doesn't work 
after shade packaged as jar (run with java -cp). 
The error message is :

{code:java}
Using the default output Coder from the producing PTransform failed: Unable to 
provide a Coder for org.apache.hadoop.hbase.client.Mutation.
  Building a Coder using a registered CoderProvider failed.
  See suppressed exceptions for detailed failures.
at 
org.apache.beam.sdk.repackaged.com.google.common.base.Preconditions.checkState(Preconditions.java:444)
at org.apache.beam.sdk.values.PCollection.getCoder(PCollection.java:257)
at 
org.apache.beam.sdk.values.PCollection.finishSpecifying(PCollection.java:106)
at 
org.apache.beam.sdk.runners.TransformHierarchy.finishSpecifying(TransformHierarchy.java:222)
at 
org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:208)
at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:440)
at org.apache.beam.sdk.Pipeline.validate(Pipeline.java:552)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:296)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:283)
at com.joe.FlinkDemoFinal.main(FlinkDemoFinal.java:113)

{code}

And i tried to print the default coder of Mutation, on IDEA it works fine and 
print "HBaseMutationCoder", but show nothing by running as jar.

And then i tried to register "HBaseMutationCoder" manully, but found that the 
HBaseMutationCoder is a private class, i don't know how to register a coder for 
Mutation.

Part of my code:
{code:java}
.apply("Hbase data format",
ParDo.of(new DoFn() {
@ProcessElement
public void processElement(ProcessContext context) {

System.out.println(context.element());

byte[] qual = Bytes.toBytes("count");

byte[] cf = Bytes.toBytes("cf");
byte[] row = Bytes.toBytes("kafka");
byte[] val = 
Bytes.toBytes(context.element().toString());
Mutation mutation = new Put(row).addColumn(cf, qual, 
val);
context.output(mutation);
}

}));
{code}




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-2995) can't read/write hdfs in Flink CLUSTER(Standalone)

2017-09-28 Thread huangjianhuang (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2995?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16184216#comment-16184216
 ] 

huangjianhuang commented on BEAM-2995:
--

yes, i've read BEAM-2457 before, and tried what you suggested (with 
HADOOP_CONF_DIR), but make no difference.
i started my cluster with only one host(localhost), by shell command: 
FLINK_DIR/bin/start-cluster.sh

BTW, i access HDFS with HbaseIO now. it works fine on flink cluster;)

> can't read/write hdfs in Flink CLUSTER(Standalone)
> --
>
> Key: BEAM-2995
> URL: https://issues.apache.org/jira/browse/BEAM-2995
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Affects Versions: 2.2.0
>Reporter: huangjianhuang
>Assignee: Aljoscha Krettek
>
> i just write a simple demo like:
> {code:java}
> Configuration conf = new Configuration();
> conf.set("fs.default.name", "hdfs://localhost:9000");
> //other codes
> p.apply("ReadLines", 
> TextIO.read().from("hdfs://localhost:9000/tmp/words"))
> 
> .apply(TextIO.write().to("hdfs://localhost:9000/tmp/hdfsout"));
> {code}
> it works in flink local model with cmd:
> {code:java}
> mvn exec:java -Dexec.mainClass=com.joe.FlinkWithHDFS -Pflink-runner 
> -Dexec.args="--runner=FlinkRunner 
> --filesToStage=target/flinkBeam-2.2.0-SNAPSHOT-shaded.jar"
> {code}
> but not works in CLUSTER mode:
> {code:java}
> mvn exec:java -Dexec.mainClass=com.joe.FlinkWithHDFS -Pflink-runner 
> -Dexec.args="--runner=FlinkRunner 
> --filesToStage=target/flinkBeam-2.2.0-SNAPSHOT-shaded.jar 
> --flinkMaster=localhost:6123 "
> {code}
> it seems the flink cluster regard the hdfs as local file system. 
> The input log from flink-jobmanger.log is:
> {code:java}
> 2017-09-27 20:17:37,962 INFO  org.apache.flink.runtime.jobmanager.JobManager  
>   - Successfully ran initialization on master in 136 ms.
> 2017-09-27 20:17:37,968 INFO  org.apache.beam.sdk.io.FileBasedSource  
>   - {color:red}Filepattern hdfs://localhost:9000/tmp/words2 
> matched 0 files with total size 0{color}
> 2017-09-27 20:17:37,968 INFO  org.apache.beam.sdk.io.FileBasedSource  
>   - Splitting filepattern hdfs://localhost:9000/tmp/words2 into 
> bundles of size 0 took 0 ms and produced 0 files a
> nd 0 bundles
> {code}
> The output  error message is :
> {code:java}
> Caused by: java.lang.ClassCastException: 
> {color:red}org.apache.beam.sdk.io.hdfs.HadoopResourceId cannot be cast to 
> org.apache.beam.sdk.io.LocalResourceId{color}
> at 
> org.apache.beam.sdk.io.LocalFileSystem.create(LocalFileSystem.java:77)
> at org.apache.beam.sdk.io.FileSystems.create(FileSystems.java:256)
> at org.apache.beam.sdk.io.FileSystems.create(FileSystems.java:243)
> at 
> org.apache.beam.sdk.io.FileBasedSink$Writer.open(FileBasedSink.java:922)
> at 
> org.apache.beam.sdk.io.FileBasedSink$Writer.openUnwindowed(FileBasedSink.java:884)
> at 
> org.apache.beam.sdk.io.WriteFiles.finalizeForDestinationFillEmptyShards(WriteFiles.java:909)
> at org.apache.beam.sdk.io.WriteFiles.access$900(WriteFiles.java:110)
> at 
> org.apache.beam.sdk.io.WriteFiles$2.processElement(WriteFiles.java:858)
> {code}
> can somebody help me, i've try all the way just can't work it out [cry]
> https://issues.apache.org/jira/browse/BEAM-2457



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-2995) can't read/write hdfs in Flink CLUSTER(Standalone)

2017-09-27 Thread huangjianhuang (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2995?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16183553#comment-16183553
 ] 

huangjianhuang commented on BEAM-2995:
--

by the way, my pom.xml is:

{code:java}

com.joe
flinkBeam
2.2.0-SNAPSHOT




org.apache.beam
beam-runners-flink_2.10
${project.version}





org.apache.beam
beam-sdks-java-core
${project.version}



org.apache.beam
beam-sdks-java-io-kafka
${project.version}









org.apache.beam
beam-sdks-java-io-hadoop-file-system
${project.version}



org.apache.beam
beam-sdks-java-io-google-cloud-platform
${project.version}



org.apache.beam

beam-sdks-java-extensions-google-cloud-platform-core
${project.version}



org.apache.hadoop
hadoop-common
2.8.1



org.apache.hadoop
hadoop-hdfs
2.8.1



org.apache.hadoop
hadoop-client
2.8.1


org.apache.beam
beam-sdks-java-extensions-protobuf
${project.version}


com.google.protobuf
protobuf-java
3.2.0



com.google.protobuf
protobuf-java-util
3.2.0






org.codehaus.mojo
exec-maven-plugin
1.4.0

false



org.apache.maven.plugins
maven-shade-plugin


false


*:*

META-INF/*.SF
META-INF/*.DSA
META-INF/*.RSA






package

shade



true
shaded






{code}

run with flink1.3.2, hadoop2.8.1


> can't read/write hdfs in Flink CLUSTER(Standalone)
> --
>
> Key: BEAM-2995
> URL: https://issues.apache.org/jira/browse/BEAM-2995
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Affects Versions: 2.2.0
>Reporter: huangjianhuang
>Assignee: Aljoscha Krettek
>
> i just write a simple demo like:
> {code:java}
> Configuration conf = new Configuration();
> conf.set("fs.default.name", "hdfs://localhost:9000");
> //other codes
> p.apply("ReadLines", 
> TextIO.read().from("hdfs://localhost:9000/tmp/words"))
> 
> .apply(TextIO.write().to("hdfs://localhost:9000/tmp/hdfsout"));
> {code}
> it works in flink local model with cmd:
> {code:java}
> mvn exec:java -Dexec.mainClass=com.joe.FlinkWithHDFS -Pflink-runner 
> -Dexec.args="--runner=FlinkRunner 
> --filesToStage=target/flinkBeam-2.2.0-SNAPSHOT-shaded.jar"
> {code}
> but not works in CLUSTER mode:
> {code:java}
> mvn exec:java -Dexec.mainClass=com.joe.FlinkWithHDFS -Pflink-runner 
> -Dexec.args="--runner=FlinkRunner 
> --filesToStage=target/flinkBeam-2.2.0-SNAPSHOT-shaded.jar 
> --flinkMaster=localhost:6123 "
> {code}
> it seems the flink cluster regard the hdfs as local file system. 
> The input log from flink-jobmanger.log is:
> {code:java}
> 2017-09-27 20:17:37,962 INFO  org.apache.flink.runtime.jobmanager.JobManager  
>   - Successfully ran initialization on master in 136 ms.
> 2017-09-27 20:17:37,968 INFO  org.apache.beam.sdk.io.FileBasedSource  
>   - {color:red}Filepattern hdfs://localhost:9000/tmp/words2 
> matched 0 files with total size 0{color}
> 2017-09-27 20:17:37,968 INFO  org.apache.beam.sdk.io.FileBasedSource  
>   - Splitting filepattern hdfs://localhost:9000/tmp/words2 into 
> bundles of size 0 took 0 ms and produced 0 files a
> nd 0 bundles
> {code}
> The output  error message is :
> {code:java}
> Caused by: java.lang.ClassCastException: 
> {color:red}org.apache.beam.sdk.io.hdfs.HadoopResourceId cannot be cast to 
> 

[jira] [Created] (BEAM-2995) can't read/write hdfs in Flink CLUSTER(Standalone)

2017-09-27 Thread huangjianhuang (JIRA)
huangjianhuang created BEAM-2995:


 Summary: can't read/write hdfs in Flink CLUSTER(Standalone)
 Key: BEAM-2995
 URL: https://issues.apache.org/jira/browse/BEAM-2995
 Project: Beam
  Issue Type: Bug
  Components: runner-flink
Affects Versions: 2.2.0
Reporter: huangjianhuang
Assignee: Aljoscha Krettek


i just write a simple demo like:

{code:java}
Configuration conf = new Configuration();
conf.set("fs.default.name", "hdfs://localhost:9000");
//other codes
p.apply("ReadLines", 
TextIO.read().from("hdfs://localhost:9000/tmp/words"))
.apply(TextIO.write().to("hdfs://localhost:9000/tmp/hdfsout"));
{code}

it works in flink local model with cmd:

{code:java}
mvn exec:java -Dexec.mainClass=com.joe.FlinkWithHDFS -Pflink-runner 
-Dexec.args="--runner=FlinkRunner 
--filesToStage=target/flinkBeam-2.2.0-SNAPSHOT-shaded.jar"
{code}

but not works in CLUSTER mode:

{code:java}
mvn exec:java -Dexec.mainClass=com.joe.FlinkWithHDFS -Pflink-runner 
-Dexec.args="--runner=FlinkRunner 
--filesToStage=target/flinkBeam-2.2.0-SNAPSHOT-shaded.jar 
--flinkMaster=localhost:6123 "
{code}

it seems the flink cluster regard the hdfs as local file system. 
The input log from flink-jobmanger.log is:

{code:java}
2017-09-27 20:17:37,962 INFO  org.apache.flink.runtime.jobmanager.JobManager
- Successfully ran initialization on master in 136 ms.
2017-09-27 20:17:37,968 INFO  org.apache.beam.sdk.io.FileBasedSource
- {color:red}Filepattern hdfs://localhost:9000/tmp/words2 matched 0 
files with total size 0{color}
2017-09-27 20:17:37,968 INFO  org.apache.beam.sdk.io.FileBasedSource
- Splitting filepattern hdfs://localhost:9000/tmp/words2 into 
bundles of size 0 took 0 ms and produced 0 files a
nd 0 bundles

{code}

The output  error message is :

{code:java}
Caused by: java.lang.ClassCastException: 
{color:red}org.apache.beam.sdk.io.hdfs.HadoopResourceId cannot be cast to 
org.apache.beam.sdk.io.LocalResourceId{color}
at 
org.apache.beam.sdk.io.LocalFileSystem.create(LocalFileSystem.java:77)
at org.apache.beam.sdk.io.FileSystems.create(FileSystems.java:256)
at org.apache.beam.sdk.io.FileSystems.create(FileSystems.java:243)
at 
org.apache.beam.sdk.io.FileBasedSink$Writer.open(FileBasedSink.java:922)
at 
org.apache.beam.sdk.io.FileBasedSink$Writer.openUnwindowed(FileBasedSink.java:884)
at 
org.apache.beam.sdk.io.WriteFiles.finalizeForDestinationFillEmptyShards(WriteFiles.java:909)
at org.apache.beam.sdk.io.WriteFiles.access$900(WriteFiles.java:110)
at 
org.apache.beam.sdk.io.WriteFiles$2.processElement(WriteFiles.java:858)

{code}

can somebody help me, i've try all the way just can't work it out [cry]
https://issues.apache.org/jira/browse/BEAM-2457






--
This message was sent by Atlassian JIRA
(v6.4.14#64029)