[jira] [Commented] (BEAM-3858) Data from JdbcIO.read() cannot pass to next transform on ApexRunner
[ https://issues.apache.org/jira/browse/BEAM-3858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16418409#comment-16418409 ] huangjianhuang commented on BEAM-3858: -- [~jbonofre] any suggestion? > Data from JdbcIO.read() cannot pass to next transform on ApexRunner > --- > > Key: BEAM-3858 > URL: https://issues.apache.org/jira/browse/BEAM-3858 > Project: Beam > Issue Type: Bug > Components: io-java-jdbc, runner-apex >Affects Versions: 2.3.0 > Environment: ubuntu16.04 >Reporter: huangjianhuang >Assignee: Jean-Baptiste Onofré >Priority: Major > > {code:java} > public static void testJDBCRead(Pipeline pipeline) { > System.out.println("in testJDBCRead()"); > pipeline.apply(JdbcIO.read() > > .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create( > "com.mysql.jdbc.Driver", > "jdbc:mysql://localhost:3307/libra") > .withUsername("root") > .withPassword("123456")) > .withQuery("SELECT * FROM o_flow_account_login limit 3") > .withCoder(StringUtf8Coder.of()) > .withRowMapper(new JdbcIO.RowMapper() { > public String mapRow(ResultSet resultSet) throws Exception { > System.out.println("maprow"); > return "tmp"; > } > }) > ) > .apply(ParDo.of(new DoFn() { > @ProcessElement > public void process(ProcessContext context) { > System.out.println("??"); > context.output(" "); > } > })); > } > {code} > On DirectRunner or FlinkRunner, screen shows: > {code:java} > maprow > maprow > maprow > ?? > ?? > ?? > {code} > however on ApexRunner, screen only shows: > {code:java} > maprow > maprow > maprow > {code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (BEAM-3858) Data from JdbcIO.read() cannot pass to next transform on ApexRunner
huangjianhuang created BEAM-3858: Summary: Data from JdbcIO.read() cannot pass to next transform on ApexRunner Key: BEAM-3858 URL: https://issues.apache.org/jira/browse/BEAM-3858 Project: Beam Issue Type: Bug Components: io-java-jdbc, runner-apex Affects Versions: 2.3.0 Environment: ubuntu16.04 Reporter: huangjianhuang Assignee: Jean-Baptiste Onofré {code:java} public static void testJDBCRead(Pipeline pipeline) { System.out.println("in testJDBCRead()"); pipeline.apply(JdbcIO.read() .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create( "com.mysql.jdbc.Driver", "jdbc:mysql://localhost:3307/libra") .withUsername("root") .withPassword("123456")) .withQuery("SELECT * FROM o_flow_account_login limit 3") .withCoder(StringUtf8Coder.of()) .withRowMapper(new JdbcIO.RowMapper() { public String mapRow(ResultSet resultSet) throws Exception { System.out.println("maprow"); return "tmp"; } }) ) .apply(ParDo.of(new DoFn() { @ProcessElement public void process(ProcessContext context) { System.out.println("??"); context.output(" "); } })); } {code} On DirectRunner or FlinkRunner, screen shows: {code:java} maprow maprow maprow ?? ?? ?? {code} however on ApexRunner, screen only shows: {code:java} maprow maprow maprow {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (BEAM-3414) AfterProcessingTime trigger issue with Flink Runner
[ https://issues.apache.org/jira/browse/BEAM-3414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16352010#comment-16352010 ] huangjianhuang commented on BEAM-3414: -- [~kenn] Is jstorm-runner released? [Official document|https://beam.apache.org/documentation/runners/jstorm/] says we can add it by: {code:java} // code placeholder org.apache.beam beam-runners-jstorm 2.2.0 {code} But the dependency seems not available. > AfterProcessingTime trigger issue with Flink Runner > --- > > Key: BEAM-3414 > URL: https://issues.apache.org/jira/browse/BEAM-3414 > Project: Beam > Issue Type: Bug > Components: runner-core, runner-flink >Affects Versions: 2.2.0 > Environment: idea, ubuntu 16.04, FlinkRunner >Reporter: huangjianhuang >Assignee: Aljoscha Krettek >Priority: Major > > in my demo, I read data from kafka and count globally, finally output the > total count of recieved data, as follow: > {code:java} > FlinkPipelineOptions options = > PipelineOptionsFactory.fromArgs(args).withValidation() > .as(FlinkPipelineOptions.class); > options.setStreaming(true); > options.setRunner(FlinkRunner.class); > Pipeline pipeline = Pipeline.create(options); > pipeline > .apply("Read from kafka", > KafkaIO.read() > //.withTimestampFn(kafkaData -> > TimeUtil.timeMillisToInstant(kafkaData.getKey())) > .withBootstrapServers("localhost:9092") > .withTopic("recharge") > .withKeyDeserializer(StringDeserializer.class) > > .withValueDeserializer(StringDeserializer.class) > .withoutMetadata() > ) > .apply(Values.create()) > .apply(Window.into(new GlobalWindows()) > .triggering(Repeatedly.forever( > > AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(5 > .accumulatingFiredPanes() > ) > .apply(Count.globally()) > .apply("output", > ParDo.of(new DoFn () { > @ProcessElement > public void process(ProcessContext context) { > System.out.println("---get at: " + > Instant.now() + "--"); > System.out.println(context.element()); > } > })); > {code} > the result should be displayed after (5s) I sent first data, but sometimes > there were nothing display after I sent data. the pic shows the outputs i got > in a test: > (cant upload a pic, desc as text) > {code:java} > Send 681Msg at: 2018-01-05T06:34:31.436 > ---get at: 2018-01-05T06:34:36.668Z-- > 681 > Send 681Msg at: 2018-01-05T06:34:47.166 > ---get at: 2018-01-05T06:34:52.284Z-- > 1362 > Send 681Msg at: 2018-01-05T06:34:55.505 > Send 681Msg at: 2018-01-05T06:35:22.068 > ---get at: 2018-01-05T06:35:22.112Z-- > 2044 > {code} > btw, the code works fine with direct runner. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (BEAM-3414) AfterProcessingTime trigger issue with Flink Runner
[ https://issues.apache.org/jira/browse/BEAM-3414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16352008#comment-16352008 ] huangjianhuang commented on BEAM-3414: -- [~aljoscha] thanks, can it be fixed this month? Our project was delayed by this issue for a long time, if not, we have to try other frameworks. > AfterProcessingTime trigger issue with Flink Runner > --- > > Key: BEAM-3414 > URL: https://issues.apache.org/jira/browse/BEAM-3414 > Project: Beam > Issue Type: Bug > Components: runner-core, runner-flink >Affects Versions: 2.2.0 > Environment: idea, ubuntu 16.04, FlinkRunner >Reporter: huangjianhuang >Assignee: Aljoscha Krettek >Priority: Major > > in my demo, I read data from kafka and count globally, finally output the > total count of recieved data, as follow: > {code:java} > FlinkPipelineOptions options = > PipelineOptionsFactory.fromArgs(args).withValidation() > .as(FlinkPipelineOptions.class); > options.setStreaming(true); > options.setRunner(FlinkRunner.class); > Pipeline pipeline = Pipeline.create(options); > pipeline > .apply("Read from kafka", > KafkaIO.read() > //.withTimestampFn(kafkaData -> > TimeUtil.timeMillisToInstant(kafkaData.getKey())) > .withBootstrapServers("localhost:9092") > .withTopic("recharge") > .withKeyDeserializer(StringDeserializer.class) > > .withValueDeserializer(StringDeserializer.class) > .withoutMetadata() > ) > .apply(Values.create()) > .apply(Window.into(new GlobalWindows()) > .triggering(Repeatedly.forever( > > AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(5 > .accumulatingFiredPanes() > ) > .apply(Count.globally()) > .apply("output", > ParDo.of(new DoFn () { > @ProcessElement > public void process(ProcessContext context) { > System.out.println("---get at: " + > Instant.now() + "--"); > System.out.println(context.element()); > } > })); > {code} > the result should be displayed after (5s) I sent first data, but sometimes > there were nothing display after I sent data. the pic shows the outputs i got > in a test: > (cant upload a pic, desc as text) > {code:java} > Send 681Msg at: 2018-01-05T06:34:31.436 > ---get at: 2018-01-05T06:34:36.668Z-- > 681 > Send 681Msg at: 2018-01-05T06:34:47.166 > ---get at: 2018-01-05T06:34:52.284Z-- > 1362 > Send 681Msg at: 2018-01-05T06:34:55.505 > Send 681Msg at: 2018-01-05T06:35:22.068 > ---get at: 2018-01-05T06:35:22.112Z-- > 2044 > {code} > btw, the code works fine with direct runner. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (BEAM-3414) AfterProcessingTime trigger issue with Flink Runner
[ https://issues.apache.org/jira/browse/BEAM-3414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16342837#comment-16342837 ] huangjianhuang commented on BEAM-3414: -- [~kenn] [~aljoscha] are you still fixing this issue? I‘m distressed on this issue. > AfterProcessingTime trigger issue with Flink Runner > --- > > Key: BEAM-3414 > URL: https://issues.apache.org/jira/browse/BEAM-3414 > Project: Beam > Issue Type: Bug > Components: runner-core, runner-flink >Affects Versions: 2.2.0 > Environment: idea, ubuntu 16.04, FlinkRunner >Reporter: huangjianhuang >Assignee: Aljoscha Krettek >Priority: Major > > in my demo, I read data from kafka and count globally, finally output the > total count of recieved data, as follow: > {code:java} > FlinkPipelineOptions options = > PipelineOptionsFactory.fromArgs(args).withValidation() > .as(FlinkPipelineOptions.class); > options.setStreaming(true); > options.setRunner(FlinkRunner.class); > Pipeline pipeline = Pipeline.create(options); > pipeline > .apply("Read from kafka", > KafkaIO.read() > //.withTimestampFn(kafkaData -> > TimeUtil.timeMillisToInstant(kafkaData.getKey())) > .withBootstrapServers("localhost:9092") > .withTopic("recharge") > .withKeyDeserializer(StringDeserializer.class) > > .withValueDeserializer(StringDeserializer.class) > .withoutMetadata() > ) > .apply(Values.create()) > .apply(Window.into(new GlobalWindows()) > .triggering(Repeatedly.forever( > > AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(5 > .accumulatingFiredPanes() > ) > .apply(Count.globally()) > .apply("output", > ParDo.of(new DoFn () { > @ProcessElement > public void process(ProcessContext context) { > System.out.println("---get at: " + > Instant.now() + "--"); > System.out.println(context.element()); > } > })); > {code} > the result should be displayed after (5s) I sent first data, but sometimes > there were nothing display after I sent data. the pic shows the outputs i got > in a test: > (cant upload a pic, desc as text) > {code:java} > Send 681Msg at: 2018-01-05T06:34:31.436 > ---get at: 2018-01-05T06:34:36.668Z-- > 681 > Send 681Msg at: 2018-01-05T06:34:47.166 > ---get at: 2018-01-05T06:34:52.284Z-- > 1362 > Send 681Msg at: 2018-01-05T06:34:55.505 > Send 681Msg at: 2018-01-05T06:35:22.068 > ---get at: 2018-01-05T06:35:22.112Z-- > 2044 > {code} > btw, the code works fine with direct runner. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Issue Comment Deleted] (BEAM-3414) AfterProcessingTime trigger issue with Flink Runner
[ https://issues.apache.org/jira/browse/BEAM-3414?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] huangjianhuang updated BEAM-3414: - Comment: was deleted (was: Thanks for help. Does this bug appear in other Runner? Or can you give me some advice which Runner is more close to the DirectRunner. My codes works fine with DirectRunner but got so many problems with FlinkRunner:() > AfterProcessingTime trigger issue with Flink Runner > --- > > Key: BEAM-3414 > URL: https://issues.apache.org/jira/browse/BEAM-3414 > Project: Beam > Issue Type: Bug > Components: runner-core, runner-flink >Affects Versions: 2.2.0 > Environment: idea, ubuntu 16.04, FlinkRunner >Reporter: huangjianhuang >Assignee: Kenneth Knowles >Priority: Major > > in my demo, I read data from kafka and count globally, finally output the > total count of recieved data, as follow: > {code:java} > FlinkPipelineOptions options = > PipelineOptionsFactory.fromArgs(args).withValidation() > .as(FlinkPipelineOptions.class); > options.setStreaming(true); > options.setRunner(FlinkRunner.class); > Pipeline pipeline = Pipeline.create(options); > pipeline > .apply("Read from kafka", > KafkaIO.read() > //.withTimestampFn(kafkaData -> > TimeUtil.timeMillisToInstant(kafkaData.getKey())) > .withBootstrapServers("localhost:9092") > .withTopic("recharge") > .withKeyDeserializer(StringDeserializer.class) > > .withValueDeserializer(StringDeserializer.class) > .withoutMetadata() > ) > .apply(Values.create()) > .apply(Window.into(new GlobalWindows()) > .triggering(Repeatedly.forever( > > AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(5 > .accumulatingFiredPanes() > ) > .apply(Count.globally()) > .apply("output", > ParDo.of(new DoFn () { > @ProcessElement > public void process(ProcessContext context) { > System.out.println("---get at: " + > Instant.now() + "--"); > System.out.println(context.element()); > } > })); > {code} > the result should be displayed after (5s) I sent first data, but sometimes > there were nothing display after I sent data. the pic shows the outputs i got > in a test: > (cant upload a pic, desc as text) > {code:java} > Send 681Msg at: 2018-01-05T06:34:31.436 > ---get at: 2018-01-05T06:34:36.668Z-- > 681 > Send 681Msg at: 2018-01-05T06:34:47.166 > ---get at: 2018-01-05T06:34:52.284Z-- > 1362 > Send 681Msg at: 2018-01-05T06:34:55.505 > Send 681Msg at: 2018-01-05T06:35:22.068 > ---get at: 2018-01-05T06:35:22.112Z-- > 2044 > {code} > btw, the code works fine with direct runner. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (BEAM-3423) Distinct.withRepresentativeValueFn throws CoderException "cannot encode null KV"
[ https://issues.apache.org/jira/browse/BEAM-3423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16319609#comment-16319609 ] huangjianhuang commented on BEAM-3423: -- [~kenn] thx. yes, it does work fine without early firings. Maybe the null KVs came out because inputs were flush by early firings and there were no inputs when event trigger fried? > Distinct.withRepresentativeValueFn throws CoderException "cannot encode null > KV" > - > > Key: BEAM-3423 > URL: https://issues.apache.org/jira/browse/BEAM-3423 > Project: Beam > Issue Type: Bug > Components: sdk-java-core >Affects Versions: 2.2.0 > Environment: ubuntu16.04, idea, java8 >Reporter: huangjianhuang >Assignee: Kenneth Knowles > > My code as follow: > {code:java} > pipeline > //Read data > .apply("Read from kafka", > KafkaIO.read() > .withBootstrapServers("localhost:9092") > .withTopic(topic) > .withKeyDeserializer(StringDeserializer.class) > > .withValueDeserializer(StringDeserializer.class) > .withoutMetadata() > ) > .apply(Window. String>>into(FixedWindows.of(Duration.standardSeconds(10))) > .triggering(AfterWatermark.pastEndOfWindow() > > .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(5 > > .discardingFiredPanes().withAllowedLateness(Duration.ZERO)) > //works fine > //.apply(Distinct.create()) > //ops! -> CoderException: cannot encode a null KV > .apply(Distinct.withRepresentativeValueFn(new > Val()).withRepresentativeType(TypeDescriptors.strings())) > .apply(MapElements.into(TypeDescriptors.nulls()) > .via(input -> { > System.out.println(Instant.now()); > System.out.println(input); > return null; > })); > private static class Val implements SerializableFunction String>, String> { > @Override > public String apply(KV input) { > return input.getValue(); > } > } > {code} > Input words to Kafka: > word1 > //after 10s > word2 > Then got exceptions as follow: > {code:java} > begin > 2018-01-06T11:18:52.971Z > KV{null, a} > Exception in thread "main" > org.apache.beam.sdk.Pipeline$PipelineExecutionException: > java.lang.RuntimeException: org.apache.beam.sdk.coders.CoderException: cannot > encode a null KV > at > org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:344) > at > org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:314) > at > org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:208) > at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:62) > at org.apache.beam.sdk.Pipeline.run(Pipeline.java:303) > at org.apache.beam.sdk.Pipeline.run(Pipeline.java:289) > at com.xiaomi.huyu.processor.dev.EntryPoint.main(EntryPoint.java:37) > Caused by: java.lang.RuntimeException: > org.apache.beam.sdk.coders.CoderException: cannot encode a null KV > at > org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add(ImmutabilityCheckingBundleFactory.java:113) > at > org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output(ParDoEvaluator.java:235) > at > org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:211) > at > org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner.access$700(SimpleDoFnRunner.java:66) > at > org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:436) > at > org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:424) > at > org.apache.beam.sdk.transforms.Combine$GroupedValues$1.processElement(Combine.java:2149) > Caused by: org.apache.beam.sdk.coders.CoderException: cannot encode a null KV > at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:70) > at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:36) > at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:73) > at
[jira] [Comment Edited] (BEAM-3423) Distinct.withRepresentativeValueFn with EventTime Trigger throws Exceptions
[ https://issues.apache.org/jira/browse/BEAM-3423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16315057#comment-16315057 ] huangjianhuang edited comment on BEAM-3423 at 1/7/18 4:05 AM: -- [~kenn] sorry, I cant understand what you mentioned "relevant trigger" means, did i make some mistake in my code? To advoid this exception, finally I use MyKV defined by myself instead of KV, and it works well. I have no idea where the null KV came out in my code. {code:java} public class MyKV implements Serializable() { private String key; private String val; //getter and setter } {code} was (Author: huangjianhuang): [~kenn] sorry, I cant understand what you mentioned "relevant trigger" means, did i make some mistake in my code? To advoid this exception, finally I use MyKV defined by myself instead of KV, and it works ok. I have no idea where the null KV came out in my code. {code:java} public class MyKV implements Serializable() { private String key; private String val; //getter and setter } {code} > Distinct.withRepresentativeValueFn with EventTime Trigger throws Exceptions > > > Key: BEAM-3423 > URL: https://issues.apache.org/jira/browse/BEAM-3423 > Project: Beam > Issue Type: Bug > Components: sdk-java-core >Affects Versions: 2.2.0 > Environment: ubuntu16.04, idea, java8 >Reporter: huangjianhuang >Assignee: Kenneth Knowles > > My code as follow: > {code:java} > pipeline > //Read data > .apply("Read from kafka", > KafkaIO.read() > .withBootstrapServers("localhost:9092") > .withTopic(topic) > .withKeyDeserializer(StringDeserializer.class) > > .withValueDeserializer(StringDeserializer.class) > .withoutMetadata() > ) > .apply(Window. String>>into(FixedWindows.of(Duration.standardSeconds(10))) > .triggering(AfterWatermark.pastEndOfWindow() > > .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(5 > > .discardingFiredPanes().withAllowedLateness(Duration.ZERO)) > //works fine > //.apply(Distinct.create()) > //ops! -> CoderException: cannot encode a null KV > .apply(Distinct.withRepresentativeValueFn(new > Val()).withRepresentativeType(TypeDescriptors.strings())) > .apply(MapElements.into(TypeDescriptors.nulls()) > .via(input -> { > System.out.println(Instant.now()); > System.out.println(input); > return null; > })); > private static class Val implements SerializableFunction String>, String> { > @Override > public String apply(KV input) { > return input.getValue(); > } > } > {code} > Input words to Kafka: > word1 > //after 10s > word2 > Then got exceptions as follow: > {code:java} > begin > 2018-01-06T11:18:52.971Z > KV{null, a} > Exception in thread "main" > org.apache.beam.sdk.Pipeline$PipelineExecutionException: > java.lang.RuntimeException: org.apache.beam.sdk.coders.CoderException: cannot > encode a null KV > at > org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:344) > at > org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:314) > at > org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:208) > at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:62) > at org.apache.beam.sdk.Pipeline.run(Pipeline.java:303) > at org.apache.beam.sdk.Pipeline.run(Pipeline.java:289) > at com.xiaomi.huyu.processor.dev.EntryPoint.main(EntryPoint.java:37) > Caused by: java.lang.RuntimeException: > org.apache.beam.sdk.coders.CoderException: cannot encode a null KV > at > org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add(ImmutabilityCheckingBundleFactory.java:113) > at > org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output(ParDoEvaluator.java:235) > at > org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:211) > at > org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner.access$700(SimpleDoFnRunner.java:66) >
[jira] [Commented] (BEAM-3423) Distinct.withRepresentativeValueFn with EventTime Trigger throws Exceptions
[ https://issues.apache.org/jira/browse/BEAM-3423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16315057#comment-16315057 ] huangjianhuang commented on BEAM-3423: -- [~kenn] sorry, I cant understand what you mentioned "relevant trigger" means, did i make some mistake in my code? To advoid this exception, finally I use MyKV defined by myself instead of KV, and it works ok. I have no idea where the null KV came out in my code. {code:java} public class MyKV implements Serializable() { private String key; private String val; //getter and setter } {code} > Distinct.withRepresentativeValueFn with EventTime Trigger throws Exceptions > > > Key: BEAM-3423 > URL: https://issues.apache.org/jira/browse/BEAM-3423 > Project: Beam > Issue Type: Bug > Components: sdk-java-core >Affects Versions: 2.2.0 > Environment: ubuntu16.04, idea, java8 >Reporter: huangjianhuang >Assignee: Kenneth Knowles > > My code as follow: > {code:java} > pipeline > //Read data > .apply("Read from kafka", > KafkaIO.read() > .withBootstrapServers("localhost:9092") > .withTopic(topic) > .withKeyDeserializer(StringDeserializer.class) > > .withValueDeserializer(StringDeserializer.class) > .withoutMetadata() > ) > .apply(Window. String>>into(FixedWindows.of(Duration.standardSeconds(10))) > .triggering(AfterWatermark.pastEndOfWindow() > > .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(5 > > .discardingFiredPanes().withAllowedLateness(Duration.ZERO)) > //works fine > //.apply(Distinct.create()) > //ops! -> CoderException: cannot encode a null KV > .apply(Distinct.withRepresentativeValueFn(new > Val()).withRepresentativeType(TypeDescriptors.strings())) > .apply(MapElements.into(TypeDescriptors.nulls()) > .via(input -> { > System.out.println(Instant.now()); > System.out.println(input); > return null; > })); > private static class Val implements SerializableFunction String>, String> { > @Override > public String apply(KV input) { > return input.getValue(); > } > } > {code} > Input words to Kafka: > word1 > //after 10s > word2 > Then got exceptions as follow: > {code:java} > begin > 2018-01-06T11:18:52.971Z > KV{null, a} > Exception in thread "main" > org.apache.beam.sdk.Pipeline$PipelineExecutionException: > java.lang.RuntimeException: org.apache.beam.sdk.coders.CoderException: cannot > encode a null KV > at > org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:344) > at > org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:314) > at > org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:208) > at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:62) > at org.apache.beam.sdk.Pipeline.run(Pipeline.java:303) > at org.apache.beam.sdk.Pipeline.run(Pipeline.java:289) > at com.xiaomi.huyu.processor.dev.EntryPoint.main(EntryPoint.java:37) > Caused by: java.lang.RuntimeException: > org.apache.beam.sdk.coders.CoderException: cannot encode a null KV > at > org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add(ImmutabilityCheckingBundleFactory.java:113) > at > org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output(ParDoEvaluator.java:235) > at > org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:211) > at > org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner.access$700(SimpleDoFnRunner.java:66) > at > org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:436) > at > org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:424) > at > org.apache.beam.sdk.transforms.Combine$GroupedValues$1.processElement(Combine.java:2149) > Caused by: org.apache.beam.sdk.coders.CoderException: cannot encode a null KV > at
[jira] [Commented] (BEAM-3414) AfterProcessingTime trigger issue with Flink Runner
[ https://issues.apache.org/jira/browse/BEAM-3414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16314517#comment-16314517 ] huangjianhuang commented on BEAM-3414: -- Thanks for help. Does this bug appear in other Runner? Or can you give me some advice which Runner is more close to the DirectRunner. My codes works fine with DirectRunner but got so many problems with FlinkRunner:( > AfterProcessingTime trigger issue with Flink Runner > --- > > Key: BEAM-3414 > URL: https://issues.apache.org/jira/browse/BEAM-3414 > Project: Beam > Issue Type: Bug > Components: runner-core, runner-flink >Affects Versions: 2.2.0 > Environment: idea, ubuntu 16.04, FlinkRunner >Reporter: huangjianhuang >Assignee: Kenneth Knowles > > in my demo, I read data from kafka and count globally, finally output the > total count of recieved data, as follow: > {code:java} > FlinkPipelineOptions options = > PipelineOptionsFactory.fromArgs(args).withValidation() > .as(FlinkPipelineOptions.class); > options.setStreaming(true); > options.setRunner(FlinkRunner.class); > Pipeline pipeline = Pipeline.create(options); > pipeline > .apply("Read from kafka", > KafkaIO.read() > //.withTimestampFn(kafkaData -> > TimeUtil.timeMillisToInstant(kafkaData.getKey())) > .withBootstrapServers("localhost:9092") > .withTopic("recharge") > .withKeyDeserializer(StringDeserializer.class) > > .withValueDeserializer(StringDeserializer.class) > .withoutMetadata() > ) > .apply(Values.create()) > .apply(Window.into(new GlobalWindows()) > .triggering(Repeatedly.forever( > > AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(5 > .accumulatingFiredPanes() > ) > .apply(Count.globally()) > .apply("output", > ParDo.of(new DoFn () { > @ProcessElement > public void process(ProcessContext context) { > System.out.println("---get at: " + > Instant.now() + "--"); > System.out.println(context.element()); > } > })); > {code} > the result should be displayed after (5s) I sent first data, but sometimes > there were nothing display after I sent data. the pic shows the outputs i got > in a test: > (cant upload a pic, desc as text) > {code:java} > Send 681Msg at: 2018-01-05T06:34:31.436 > ---get at: 2018-01-05T06:34:36.668Z-- > 681 > Send 681Msg at: 2018-01-05T06:34:47.166 > ---get at: 2018-01-05T06:34:52.284Z-- > 1362 > Send 681Msg at: 2018-01-05T06:34:55.505 > Send 681Msg at: 2018-01-05T06:35:22.068 > ---get at: 2018-01-05T06:35:22.112Z-- > 2044 > {code} > btw, the code works fine with direct runner. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (BEAM-3423) Distinct.withRepresentativeValueFn with EventTime Trigger throws Exceptions
huangjianhuang created BEAM-3423: Summary: Distinct.withRepresentativeValueFn with EventTime Trigger throws Exceptions Key: BEAM-3423 URL: https://issues.apache.org/jira/browse/BEAM-3423 Project: Beam Issue Type: Bug Components: runner-core, runner-direct Affects Versions: 2.2.0 Environment: ubuntu16.04, idea, java8 Reporter: huangjianhuang Assignee: Kenneth Knowles My code as follow: {code:java} pipeline //Read data .apply("Read from kafka", KafkaIO.read() .withBootstrapServers("localhost:9092") .withTopic(topic) .withKeyDeserializer(StringDeserializer.class) .withValueDeserializer(StringDeserializer.class) .withoutMetadata() ) .apply(Window. >into(FixedWindows.of(Duration.standardSeconds(10))) .triggering(AfterWatermark.pastEndOfWindow() .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(5 .discardingFiredPanes().withAllowedLateness(Duration.ZERO)) //works fine //.apply(Distinct.create()) //ops! -> CoderException: cannot encode a null KV .apply(Distinct.withRepresentativeValueFn(new Val()).withRepresentativeType(TypeDescriptors.strings())) .apply(MapElements.into(TypeDescriptors.nulls()) .via(input -> { System.out.println(Instant.now()); System.out.println(input); return null; })); private static class Val implements SerializableFunction , String> { @Override public String apply(KV input) { return input.getValue(); } } {code} Input words to Kafka: word1 //after 10s word2 Then got exceptions as follow: {code:java} begin 2018-01-06T11:18:52.971Z KV{null, a} Exception in thread "main" org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.RuntimeException: org.apache.beam.sdk.coders.CoderException: cannot encode a null KV at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:344) at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:314) at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:208) at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:62) at org.apache.beam.sdk.Pipeline.run(Pipeline.java:303) at org.apache.beam.sdk.Pipeline.run(Pipeline.java:289) at com.xiaomi.huyu.processor.dev.EntryPoint.main(EntryPoint.java:37) Caused by: java.lang.RuntimeException: org.apache.beam.sdk.coders.CoderException: cannot encode a null KV at org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add(ImmutabilityCheckingBundleFactory.java:113) at org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output(ParDoEvaluator.java:235) at org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:211) at org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner.access$700(SimpleDoFnRunner.java:66) at org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:436) at org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:424) at org.apache.beam.sdk.transforms.Combine$GroupedValues$1.processElement(Combine.java:2149) Caused by: org.apache.beam.sdk.coders.CoderException: cannot encode a null KV at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:70) at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:36) at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:73) at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:36) at org.apache.beam.sdk.util.CoderUtils.encodeToSafeStream(CoderUtils.java:93) at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:77) at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:62) at org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.(MutationDetectors.java:106) at org.apache.beam.sdk.util.MutationDetectors.forValueWithCoder(MutationDetectors.java:44) at
[jira] [Commented] (BEAM-3414) AfterProcessingTime trigger issue with Flink Runner
[ https://issues.apache.org/jira/browse/BEAM-3414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16312593#comment-16312593 ] huangjianhuang commented on BEAM-3414: -- the trigger would never fire after 06:34:55.505 if i didnot send new data. how could i fix it without new input? > AfterProcessingTime trigger issue with Flink Runner > --- > > Key: BEAM-3414 > URL: https://issues.apache.org/jira/browse/BEAM-3414 > Project: Beam > Issue Type: Bug > Components: runner-core, runner-flink >Affects Versions: 2.2.0 > Environment: idea, ubuntu 16.04, FlinkRunner >Reporter: huangjianhuang >Assignee: Kenneth Knowles > > in my demo, I read data from kafka and count globally, finally output the > total count of recieved data, as follow: > {code:java} > FlinkPipelineOptions options = > PipelineOptionsFactory.fromArgs(args).withValidation() > .as(FlinkPipelineOptions.class); > options.setStreaming(true); > options.setRunner(FlinkRunner.class); > Pipeline pipeline = Pipeline.create(options); > pipeline > .apply("Read from kafka", > KafkaIO.read() > //.withTimestampFn(kafkaData -> > TimeUtil.timeMillisToInstant(kafkaData.getKey())) > .withBootstrapServers("localhost:9092") > .withTopic("recharge") > .withKeyDeserializer(StringDeserializer.class) > > .withValueDeserializer(StringDeserializer.class) > .withoutMetadata() > ) > .apply(Values.create()) > .apply(Window.into(new GlobalWindows()) > .triggering(Repeatedly.forever( > > AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(5 > .accumulatingFiredPanes() > ) > .apply(Count.globally()) > .apply("output", > ParDo.of(new DoFn () { > @ProcessElement > public void process(ProcessContext context) { > System.out.println("---get at: " + > Instant.now() + "--"); > System.out.println(context.element()); > } > })); > {code} > the result should be displayed after (5s) I sent first data, but sometimes > there were nothing display after I sent data. the pic shows the outputs i got > in a test: > (cant upload a pic, desc as text) > {code:java} > Send 681Msg at: 2018-01-05T06:34:31.436 > ---get at: 2018-01-05T06:34:36.668Z-- > 681 > Send 681Msg at: 2018-01-05T06:34:47.166 > ---get at: 2018-01-05T06:34:52.284Z-- > 1362 > Send 681Msg at: 2018-01-05T06:34:55.505 > Send 681Msg at: 2018-01-05T06:35:22.068 > ---get at: 2018-01-05T06:35:22.112Z-- > 2044 > {code} > btw, the code works fine with direct runner. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (BEAM-3414) AfterProcessingTime trigger issue with Flink Runner
huangjianhuang created BEAM-3414: Summary: AfterProcessingTime trigger issue with Flink Runner Key: BEAM-3414 URL: https://issues.apache.org/jira/browse/BEAM-3414 Project: Beam Issue Type: Bug Components: runner-core, runner-flink Affects Versions: 2.2.0 Environment: idea, ubuntu 16.04, FlinkRunner Reporter: huangjianhuang Assignee: Kenneth Knowles in my demo, I read data from kafka and count globally, finally output the total count of recieved data, as follow: {code:java} FlinkPipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation() .as(FlinkPipelineOptions.class); options.setStreaming(true); options.setRunner(FlinkRunner.class); Pipeline pipeline = Pipeline.create(options); pipeline .apply("Read from kafka", KafkaIO.read() //.withTimestampFn(kafkaData -> TimeUtil.timeMillisToInstant(kafkaData.getKey())) .withBootstrapServers("localhost:9092") .withTopic("recharge") .withKeyDeserializer(StringDeserializer.class) .withValueDeserializer(StringDeserializer.class) .withoutMetadata() ) .apply(Values.create()) .apply(Window.into(new GlobalWindows()) .triggering(Repeatedly.forever( AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(5 .accumulatingFiredPanes() ) .apply(Count.globally()) .apply("output", ParDo.of(new DoFn () { @ProcessElement public void process(ProcessContext context) { System.out.println("---get at: " + Instant.now() + "--"); System.out.println(context.element()); } })); {code} the result should be displayed after (5s) I sent first data, but sometimes there were nothing display after I sent data. the pic shows the outputs i got in a test: (cant upload a pic, desc as text) {code:java} Send 681Msg at: 2018-01-05T06:34:31.436 ---get at: 2018-01-05T06:34:36.668Z-- 681 Send 681Msg at: 2018-01-05T06:34:47.166 ---get at: 2018-01-05T06:34:52.284Z-- 1362 Send 681Msg at: 2018-01-05T06:34:55.505 Send 681Msg at: 2018-01-05T06:35:22.068 ---get at: 2018-01-05T06:35:22.112Z-- 2044 {code} btw, the code works fine with direct runner. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (BEAM-3390) unable to serialize org.apache.beam.sdk.io.jdbc.JdbcIO$Read$ReadFn
[ https://issues.apache.org/jira/browse/BEAM-3390?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] huangjianhuang closed BEAM-3390. Resolution: Not A Problem Fix Version/s: 2.1.0 sorry, it is my mistake, not an issue. i put the code into a function and called it in main() by => new ClassName().func(); > unable to serialize org.apache.beam.sdk.io.jdbc.JdbcIO$Read$ReadFn > -- > > Key: BEAM-3390 > URL: https://issues.apache.org/jira/browse/BEAM-3390 > Project: Beam > Issue Type: Bug > Components: dsl-sql >Affects Versions: 2.1.0 > Environment: ubuntu 16.04, idea, direct runner >Reporter: huangjianhuang >Assignee: Xu Mingmin > Fix For: 2.1.0 > > > Here is my Demo code: > {code:java} > PipelineOptions options = > PipelineOptionsFactory.fromArgs(args).withValidation() > .as(PipelineOptions.class); > Pipeline pipeline = Pipeline.create(options); > pipeline.apply(JdbcIO.>read() > > .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create( > "com.mysql.jdbc.Driver", > "jdbc:mysql://localhost:3307/libra_stat") > .withUsername("root") > .withPassword("123456")) > .withQuery("select id, game_id from test_tb") >.withCoder(KvCoder.of(BigEndianIntegerCoder.of(), > StringUtf8Coder.of())) >.withRowMapper(new JdbcIO.RowMapper >() > { > public KV mapRow(ResultSet > resultSet) throws Exception { > System.out.println(resultSet.getInt(1)); > System.out.println(resultSet.getString(2)); > return KV.of(resultSet.getInt(1), > resultSet.getString(2)); > } > }) > ); > {code} > I run this demo by direct runner and got NotSerializableException as follow: > {code:java} > java.lang.IllegalArgumentException: unable to serialize > org.apache.beam.sdk.io.jdbc.JdbcIO$Read$ReadFn@68f4865 > at > org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:53) > at > org.apache.beam.sdk.util.SerializableUtils.clone(SerializableUtils.java:90) > at > org.apache.beam.sdk.transforms.ParDo$SingleOutput.(ParDo.java:591) > at org.apache.beam.sdk.transforms.ParDo.of(ParDo.java:435) > at org.apache.beam.sdk.io.jdbc.JdbcIO$Read.expand(JdbcIO.java:325) > at org.apache.beam.sdk.io.jdbc.JdbcIO$Read.expand(JdbcIO.java:272) > at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:514) > at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:454) > at org.apache.beam.sdk.values.PBegin.apply(PBegin.java:44) > at org.apache.beam.sdk.Pipeline.apply(Pipeline.java:165) > at com.xiaomi.huyu.processor.demo.SqlDemo.run(SqlDemo.java:30) > at com.xiaomi.huyu.processor.demo.SqlDemo.main(SqlDemo.java:21) > Caused by: java.io.NotSerializableException: > com.xiaomi.huyu.processor.demo.SqlDemo > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) > at > org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:49) > ... 11 more > {code} > Any suggestions and comments are welcome, thanks a lot! -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (BEAM-3390) unable to serialize org.apache.beam.sdk.io.jdbc.JdbcIO$Read$ReadFn
huangjianhuang created BEAM-3390: Summary: unable to serialize org.apache.beam.sdk.io.jdbc.JdbcIO$Read$ReadFn Key: BEAM-3390 URL: https://issues.apache.org/jira/browse/BEAM-3390 Project: Beam Issue Type: Bug Components: dsl-sql Affects Versions: 2.1.0 Environment: ubuntu 16.04, idea, direct runner Reporter: huangjianhuang Assignee: Xu Mingmin Here is my Demo code: {code:java} PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation() .as(PipelineOptions.class); Pipeline pipeline = Pipeline.create(options); pipeline.apply(JdbcIO.>read() .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create( "com.mysql.jdbc.Driver", "jdbc:mysql://localhost:3307/libra_stat") .withUsername("root") .withPassword("123456")) .withQuery("select id, game_id from test_tb") .withCoder(KvCoder.of(BigEndianIntegerCoder.of(), StringUtf8Coder.of())) .withRowMapper(new JdbcIO.RowMapper >() { public KV mapRow(ResultSet resultSet) throws Exception { System.out.println(resultSet.getInt(1)); System.out.println(resultSet.getString(2)); return KV.of(resultSet.getInt(1), resultSet.getString(2)); } }) ); {code} I run this demo by direct runner and got NotSerializableException as follow: {code:java} java.lang.IllegalArgumentException: unable to serialize org.apache.beam.sdk.io.jdbc.JdbcIO$Read$ReadFn@68f4865 at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:53) at org.apache.beam.sdk.util.SerializableUtils.clone(SerializableUtils.java:90) at org.apache.beam.sdk.transforms.ParDo$SingleOutput.(ParDo.java:591) at org.apache.beam.sdk.transforms.ParDo.of(ParDo.java:435) at org.apache.beam.sdk.io.jdbc.JdbcIO$Read.expand(JdbcIO.java:325) at org.apache.beam.sdk.io.jdbc.JdbcIO$Read.expand(JdbcIO.java:272) at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:514) at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:454) at org.apache.beam.sdk.values.PBegin.apply(PBegin.java:44) at org.apache.beam.sdk.Pipeline.apply(Pipeline.java:165) at com.xiaomi.huyu.processor.demo.SqlDemo.run(SqlDemo.java:30) at com.xiaomi.huyu.processor.demo.SqlDemo.main(SqlDemo.java:21) Caused by: java.io.NotSerializableException: com.xiaomi.huyu.processor.demo.SqlDemo at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:49) ... 11 more {code} Any suggestions and comments are welcome, thanks a lot! -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (BEAM-3002) Unable to provide a Coder for org.apache.hadoop.hbase.client.Mutation
huangjianhuang created BEAM-3002: Summary: Unable to provide a Coder for org.apache.hadoop.hbase.client.Mutation Key: BEAM-3002 URL: https://issues.apache.org/jira/browse/BEAM-3002 Project: Beam Issue Type: Bug Components: sdk-java-core Affects Versions: 2.1.0 Environment: hadoop2.8.0, hbase1.2.6 Reporter: huangjianhuang Assignee: Kenneth Knowles i write a demo with HbaseIO, and format data into Mutation to write to hbase. The demo works fine on idea or using mvn exec:java command, but doesn't work after shade packaged as jar (run with java -cp). The error message is : {code:java} Using the default output Coder from the producing PTransform failed: Unable to provide a Coder for org.apache.hadoop.hbase.client.Mutation. Building a Coder using a registered CoderProvider failed. See suppressed exceptions for detailed failures. at org.apache.beam.sdk.repackaged.com.google.common.base.Preconditions.checkState(Preconditions.java:444) at org.apache.beam.sdk.values.PCollection.getCoder(PCollection.java:257) at org.apache.beam.sdk.values.PCollection.finishSpecifying(PCollection.java:106) at org.apache.beam.sdk.runners.TransformHierarchy.finishSpecifying(TransformHierarchy.java:222) at org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:208) at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:440) at org.apache.beam.sdk.Pipeline.validate(Pipeline.java:552) at org.apache.beam.sdk.Pipeline.run(Pipeline.java:296) at org.apache.beam.sdk.Pipeline.run(Pipeline.java:283) at com.joe.FlinkDemoFinal.main(FlinkDemoFinal.java:113) {code} And i tried to print the default coder of Mutation, on IDEA it works fine and print "HBaseMutationCoder", but show nothing by running as jar. And then i tried to register "HBaseMutationCoder" manully, but found that the HBaseMutationCoder is a private class, i don't know how to register a coder for Mutation. Part of my code: {code:java} .apply("Hbase data format", ParDo.of(new DoFn() { @ProcessElement public void processElement(ProcessContext context) { System.out.println(context.element()); byte[] qual = Bytes.toBytes("count"); byte[] cf = Bytes.toBytes("cf"); byte[] row = Bytes.toBytes("kafka"); byte[] val = Bytes.toBytes(context.element().toString()); Mutation mutation = new Put(row).addColumn(cf, qual, val); context.output(mutation); } })); {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (BEAM-2995) can't read/write hdfs in Flink CLUSTER(Standalone)
[ https://issues.apache.org/jira/browse/BEAM-2995?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16184216#comment-16184216 ] huangjianhuang commented on BEAM-2995: -- yes, i've read BEAM-2457 before, and tried what you suggested (with HADOOP_CONF_DIR), but make no difference. i started my cluster with only one host(localhost), by shell command: FLINK_DIR/bin/start-cluster.sh BTW, i access HDFS with HbaseIO now. it works fine on flink cluster;) > can't read/write hdfs in Flink CLUSTER(Standalone) > -- > > Key: BEAM-2995 > URL: https://issues.apache.org/jira/browse/BEAM-2995 > Project: Beam > Issue Type: Bug > Components: runner-flink >Affects Versions: 2.2.0 >Reporter: huangjianhuang >Assignee: Aljoscha Krettek > > i just write a simple demo like: > {code:java} > Configuration conf = new Configuration(); > conf.set("fs.default.name", "hdfs://localhost:9000"); > //other codes > p.apply("ReadLines", > TextIO.read().from("hdfs://localhost:9000/tmp/words")) > > .apply(TextIO.write().to("hdfs://localhost:9000/tmp/hdfsout")); > {code} > it works in flink local model with cmd: > {code:java} > mvn exec:java -Dexec.mainClass=com.joe.FlinkWithHDFS -Pflink-runner > -Dexec.args="--runner=FlinkRunner > --filesToStage=target/flinkBeam-2.2.0-SNAPSHOT-shaded.jar" > {code} > but not works in CLUSTER mode: > {code:java} > mvn exec:java -Dexec.mainClass=com.joe.FlinkWithHDFS -Pflink-runner > -Dexec.args="--runner=FlinkRunner > --filesToStage=target/flinkBeam-2.2.0-SNAPSHOT-shaded.jar > --flinkMaster=localhost:6123 " > {code} > it seems the flink cluster regard the hdfs as local file system. > The input log from flink-jobmanger.log is: > {code:java} > 2017-09-27 20:17:37,962 INFO org.apache.flink.runtime.jobmanager.JobManager > - Successfully ran initialization on master in 136 ms. > 2017-09-27 20:17:37,968 INFO org.apache.beam.sdk.io.FileBasedSource > - {color:red}Filepattern hdfs://localhost:9000/tmp/words2 > matched 0 files with total size 0{color} > 2017-09-27 20:17:37,968 INFO org.apache.beam.sdk.io.FileBasedSource > - Splitting filepattern hdfs://localhost:9000/tmp/words2 into > bundles of size 0 took 0 ms and produced 0 files a > nd 0 bundles > {code} > The output error message is : > {code:java} > Caused by: java.lang.ClassCastException: > {color:red}org.apache.beam.sdk.io.hdfs.HadoopResourceId cannot be cast to > org.apache.beam.sdk.io.LocalResourceId{color} > at > org.apache.beam.sdk.io.LocalFileSystem.create(LocalFileSystem.java:77) > at org.apache.beam.sdk.io.FileSystems.create(FileSystems.java:256) > at org.apache.beam.sdk.io.FileSystems.create(FileSystems.java:243) > at > org.apache.beam.sdk.io.FileBasedSink$Writer.open(FileBasedSink.java:922) > at > org.apache.beam.sdk.io.FileBasedSink$Writer.openUnwindowed(FileBasedSink.java:884) > at > org.apache.beam.sdk.io.WriteFiles.finalizeForDestinationFillEmptyShards(WriteFiles.java:909) > at org.apache.beam.sdk.io.WriteFiles.access$900(WriteFiles.java:110) > at > org.apache.beam.sdk.io.WriteFiles$2.processElement(WriteFiles.java:858) > {code} > can somebody help me, i've try all the way just can't work it out [cry] > https://issues.apache.org/jira/browse/BEAM-2457 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (BEAM-2995) can't read/write hdfs in Flink CLUSTER(Standalone)
[ https://issues.apache.org/jira/browse/BEAM-2995?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16183553#comment-16183553 ] huangjianhuang commented on BEAM-2995: -- by the way, my pom.xml is: {code:java} com.joe flinkBeam 2.2.0-SNAPSHOT org.apache.beam beam-runners-flink_2.10 ${project.version} org.apache.beam beam-sdks-java-core ${project.version} org.apache.beam beam-sdks-java-io-kafka ${project.version} org.apache.beam beam-sdks-java-io-hadoop-file-system ${project.version} org.apache.beam beam-sdks-java-io-google-cloud-platform ${project.version} org.apache.beam beam-sdks-java-extensions-google-cloud-platform-core ${project.version} org.apache.hadoop hadoop-common 2.8.1 org.apache.hadoop hadoop-hdfs 2.8.1 org.apache.hadoop hadoop-client 2.8.1 org.apache.beam beam-sdks-java-extensions-protobuf ${project.version} com.google.protobuf protobuf-java 3.2.0 com.google.protobuf protobuf-java-util 3.2.0 org.codehaus.mojo exec-maven-plugin 1.4.0 false org.apache.maven.plugins maven-shade-plugin false *:* META-INF/*.SF META-INF/*.DSA META-INF/*.RSA package shade true shaded {code} run with flink1.3.2, hadoop2.8.1 > can't read/write hdfs in Flink CLUSTER(Standalone) > -- > > Key: BEAM-2995 > URL: https://issues.apache.org/jira/browse/BEAM-2995 > Project: Beam > Issue Type: Bug > Components: runner-flink >Affects Versions: 2.2.0 >Reporter: huangjianhuang >Assignee: Aljoscha Krettek > > i just write a simple demo like: > {code:java} > Configuration conf = new Configuration(); > conf.set("fs.default.name", "hdfs://localhost:9000"); > //other codes > p.apply("ReadLines", > TextIO.read().from("hdfs://localhost:9000/tmp/words")) > > .apply(TextIO.write().to("hdfs://localhost:9000/tmp/hdfsout")); > {code} > it works in flink local model with cmd: > {code:java} > mvn exec:java -Dexec.mainClass=com.joe.FlinkWithHDFS -Pflink-runner > -Dexec.args="--runner=FlinkRunner > --filesToStage=target/flinkBeam-2.2.0-SNAPSHOT-shaded.jar" > {code} > but not works in CLUSTER mode: > {code:java} > mvn exec:java -Dexec.mainClass=com.joe.FlinkWithHDFS -Pflink-runner > -Dexec.args="--runner=FlinkRunner > --filesToStage=target/flinkBeam-2.2.0-SNAPSHOT-shaded.jar > --flinkMaster=localhost:6123 " > {code} > it seems the flink cluster regard the hdfs as local file system. > The input log from flink-jobmanger.log is: > {code:java} > 2017-09-27 20:17:37,962 INFO org.apache.flink.runtime.jobmanager.JobManager > - Successfully ran initialization on master in 136 ms. > 2017-09-27 20:17:37,968 INFO org.apache.beam.sdk.io.FileBasedSource > - {color:red}Filepattern hdfs://localhost:9000/tmp/words2 > matched 0 files with total size 0{color} > 2017-09-27 20:17:37,968 INFO org.apache.beam.sdk.io.FileBasedSource > - Splitting filepattern hdfs://localhost:9000/tmp/words2 into > bundles of size 0 took 0 ms and produced 0 files a > nd 0 bundles > {code} > The output error message is : > {code:java} > Caused by: java.lang.ClassCastException: > {color:red}org.apache.beam.sdk.io.hdfs.HadoopResourceId cannot be cast to >
[jira] [Created] (BEAM-2995) can't read/write hdfs in Flink CLUSTER(Standalone)
huangjianhuang created BEAM-2995: Summary: can't read/write hdfs in Flink CLUSTER(Standalone) Key: BEAM-2995 URL: https://issues.apache.org/jira/browse/BEAM-2995 Project: Beam Issue Type: Bug Components: runner-flink Affects Versions: 2.2.0 Reporter: huangjianhuang Assignee: Aljoscha Krettek i just write a simple demo like: {code:java} Configuration conf = new Configuration(); conf.set("fs.default.name", "hdfs://localhost:9000"); //other codes p.apply("ReadLines", TextIO.read().from("hdfs://localhost:9000/tmp/words")) .apply(TextIO.write().to("hdfs://localhost:9000/tmp/hdfsout")); {code} it works in flink local model with cmd: {code:java} mvn exec:java -Dexec.mainClass=com.joe.FlinkWithHDFS -Pflink-runner -Dexec.args="--runner=FlinkRunner --filesToStage=target/flinkBeam-2.2.0-SNAPSHOT-shaded.jar" {code} but not works in CLUSTER mode: {code:java} mvn exec:java -Dexec.mainClass=com.joe.FlinkWithHDFS -Pflink-runner -Dexec.args="--runner=FlinkRunner --filesToStage=target/flinkBeam-2.2.0-SNAPSHOT-shaded.jar --flinkMaster=localhost:6123 " {code} it seems the flink cluster regard the hdfs as local file system. The input log from flink-jobmanger.log is: {code:java} 2017-09-27 20:17:37,962 INFO org.apache.flink.runtime.jobmanager.JobManager - Successfully ran initialization on master in 136 ms. 2017-09-27 20:17:37,968 INFO org.apache.beam.sdk.io.FileBasedSource - {color:red}Filepattern hdfs://localhost:9000/tmp/words2 matched 0 files with total size 0{color} 2017-09-27 20:17:37,968 INFO org.apache.beam.sdk.io.FileBasedSource - Splitting filepattern hdfs://localhost:9000/tmp/words2 into bundles of size 0 took 0 ms and produced 0 files a nd 0 bundles {code} The output error message is : {code:java} Caused by: java.lang.ClassCastException: {color:red}org.apache.beam.sdk.io.hdfs.HadoopResourceId cannot be cast to org.apache.beam.sdk.io.LocalResourceId{color} at org.apache.beam.sdk.io.LocalFileSystem.create(LocalFileSystem.java:77) at org.apache.beam.sdk.io.FileSystems.create(FileSystems.java:256) at org.apache.beam.sdk.io.FileSystems.create(FileSystems.java:243) at org.apache.beam.sdk.io.FileBasedSink$Writer.open(FileBasedSink.java:922) at org.apache.beam.sdk.io.FileBasedSink$Writer.openUnwindowed(FileBasedSink.java:884) at org.apache.beam.sdk.io.WriteFiles.finalizeForDestinationFillEmptyShards(WriteFiles.java:909) at org.apache.beam.sdk.io.WriteFiles.access$900(WriteFiles.java:110) at org.apache.beam.sdk.io.WriteFiles$2.processElement(WriteFiles.java:858) {code} can somebody help me, i've try all the way just can't work it out [cry] https://issues.apache.org/jira/browse/BEAM-2457 -- This message was sent by Atlassian JIRA (v6.4.14#64029)