[jira] [Created] (BEAM-2952) How to use KV.OrderByKey
Rick Lin created BEAM-2952: -- Summary: How to use KV.OrderByKey Key: BEAM-2952 URL: https://issues.apache.org/jira/browse/BEAM-2952 Project: Beam Issue Type: New Feature Components: examples-java Reporter: Rick Lin Assignee: Reuven Lax Fix For: 2.1.0 Hi all, I have a question how to use the beam java sdk: KV.OrderByKey My java code is as: int[] key=new int[] {2,1,3,4,5}; double[] value=new double[] {1.0,1.0,1.0,1.0,1.0}; List> KVlist = new ArrayList<>(); List > KVtest = new ArrayList<>(); int n=value.length; for (int i=0; i > t1=p.apply("create data",Create.of(KVlist)); p.run; Thanks Rick -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Comment Edited] (BEAM-3073) Connect to Apache ignite via JdbcIO sdk
[ https://issues.apache.org/jira/browse/BEAM-3073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16208867#comment-16208867 ] Rick Lin edited comment on BEAM-3073 at 10/18/17 7:02 AM: -- There are details about my run environment *OS:* Distributor ID: Ubuntu Description:Ubuntu 14.04.5 LTS Release:14.04 Codename: trusty *Port:* 11211 (LISTEN) *pom.xml:* org.apache.ignite ignite-core ${ignite.version} Here, I have not import any class about ignite package In addition, the ignite is running. Thanks Rick was (Author: ricklin): There are details about my run environment *OS:* Distributor ID: Ubuntu Description:Ubuntu 14.04.5 LTS Release:14.04 Codename: trusty *Port:* 11211 (LISTEN) *pom.xml:* org.apache.ignite ignite-core ${ignite.version} Here, I have not import any class about ignite package Thanks Rick > Connect to Apache ignite via JdbcIO sdk > --- > > Key: BEAM-3073 > URL: https://issues.apache.org/jira/browse/BEAM-3073 > Project: Beam > Issue Type: New Feature > Components: sdk-java-extensions >Reporter: Rick Lin >Assignee: Jean-Baptiste Onofré >Priority: Minor > > Hi all, > {color:#14892c}I tried to connect Apache Ignite(In-memory) via the beam's > sdk:org.apache.beam.sdk.io.jdbc.JdbcIO > Here, i am not sure if the JdbcIO sdk only is provided for some specific > Database: MySQL(disk), postgreSQL(disk)?{color} > my java test code is as follows: > import java.sql.PreparedStatement; > import java.sql.SQLException; > import java.util.ArrayList; > import java.util.List; > import org.apache.beam.sdk.Pipeline; > import org.apache.beam.sdk.io.jdbc.JdbcIO; > import org.apache.beam.sdk.options.PipelineOptionsFactory; > import org.apache.beam.sdk.transforms.Create; > import org.apache.beam.sdk.values.KV; > import org.apache.beam.sdk.values.PCollection; > public class BeamtoJDBC { > public static void main(String[] args) { > Integer[] value=new Integer[] {1,2,3,4,5}; > List> dataList = new ArrayList<>(); > int n=value.length; > int count=0; > for (int i=0; i { > dataList.add(KV.of(count,value[i])); > count=count+1; > } > > Pipeline p = > Pipeline.create(PipelineOptionsFactory.fromArgs(args).withValidation().create()); > > PCollection > data=p.apply("create data > with time",Create.of(dataList)); > data.apply(JdbcIO. >write() > > .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration > > .create("org.apache.ignite.IgniteJdbcDriver", > "jdbc:ignite://localhost:11211/") > ) > .withPreparedStatementSetter(new > JdbcIO.PreparedStatementSetter >() { > public void setParameters(KV Integer> element, PreparedStatement query) > throws SQLException { > query.setInt(1, > element.getKey()); > query.setInt(2, > element.getValue()); > } > }) > ); > p.run(); > } > } > {color:#d04437}my error message is: > " InvocationTargetException: org.apache.beam.sdk.util.UserCodeException: > java.sql.SQLException: Cannot create PoolableConnectionFactory > (Failed to establish connection.): Failed to get future result due to waiting > timed out. "{color} > {color:#14892c}I would like to know whether the connection between beam and > ignite is feasible or not?{color} > Thanks > Rick -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (BEAM-2952) How to use KV.OrderByKey
[ https://issues.apache.org/jira/browse/BEAM-2952?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rick Lin updated BEAM-2952: --- Priority: Minor (was: Major) > How to use KV.OrderByKey > > > Key: BEAM-2952 > URL: https://issues.apache.org/jira/browse/BEAM-2952 > Project: Beam > Issue Type: New Feature > Components: examples-java >Reporter: Rick Lin >Assignee: Reuven Lax >Priority: Minor > Fix For: Not applicable > > > Hi all, > I have a question how to use the beam java sdk: KV.OrderByKey > My java code is as: > int[] key=new int[] {2,1,3,4,5}; > double[] value=new double[] {1.0,1.0,1.0,1.0,1.0}; > List> KVlist = new ArrayList<>(); > List > KVtest = new ArrayList<>(); > int n=value.length; > for (int i=0; i KVlist.add(KV.of(i, value[i])); > System.out.println(KVlist.get(i)); > } > PipelineOptions options = PipelineOptionsFactory.create(); > Pipeline p = Pipeline.create(options); > PCollection > t1=p.apply("create data",Create.of(KVlist)); > p.run; > Thanks > Rick -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (BEAM-3073) Connect to Apache ignite via JdbcIO sdk
[ https://issues.apache.org/jira/browse/BEAM-3073?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rick Lin updated BEAM-3073: --- Description: Hi all, {color:#14892c}I tried to connect Apache Ignite(In-memory) via the beam's sdk:org.apache.beam.sdk.io.jdbc.JdbcIO Here, i am not sure if the JdbcIO sdk only is provided for some specific Database: MySQL(disk), postgreSQL(disk)?{color} my java test code is as follows: import java.sql.PreparedStatement; import java.sql.SQLException; import java.util.ArrayList; import java.util.List; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.jdbc.JdbcIO; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; public class BeamtoJDBC { public static void main(String[] args) { Integer[] value=new Integer[] {1,2,3,4,5}; List> dataList = new ArrayList<>(); int n=value.length; int count=0; for (int i=0; i > data=p.apply("create data with time",Create.of(dataList)); data.apply(JdbcIO. >write() .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration .create("org.apache.ignite.IgniteJdbcDriver", "jdbc:ignite://localhost:11211/") ) .withPreparedStatementSetter(new JdbcIO.PreparedStatementSetter >() { public void setParameters(KV element, PreparedStatement query) throws SQLException { query.setInt(1, element.getKey()); query.setInt(2, element.getValue()); } }) ); p.run(); } } {color:#d04437}my error message is: " InvocationTargetException: org.apache.beam.sdk.util.UserCodeException: java.sql.SQLException: Cannot create PoolableConnectionFactory (Failed to establish connection.): Failed to get future result due to waiting timed out. "{color} {color:#14892c}I would like to know whether the connection between beam and ignite is feasible or not?{color} Thanks Rick was: Hi all, {color:#14892c}I tried to connect Apache Ignite(In-memory) via the beam's sdk:org.apache.beam.sdk.io.jdbc.JdbcIO Here, i am not sure if the JdbcIO sdk only is provided for some specific Database: MySQL(disk), postgreSQL(disk)?{color} my java test code is as follows: import java.sql.PreparedStatement; import java.sql.SQLException; import java.util.ArrayList; import java.util.List; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.jdbc.JdbcIO; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; public class BeamtoJDBC { public static void main(String[] args) { Integer[] value=new Integer[] {1,2,3,4,5}; List > dataList = new ArrayList<>(); int n=value.length; int count=0; for (int i=0; i > data=p.apply("create data with time",Create.of(dataList)); data.apply(JdbcIO. >write() .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration .create("org.apache.ignite.IgniteJdbcDriver", "jdbc:ignite://localhost:11211/") ) .withPreparedStatementSetter(new JdbcIO.PreparedStatementSetter >() { public void setParameters(KV element, PreparedStatement query)
[jira] [Created] (BEAM-3073) Connect to Apache ignite via JdbcIO sdk
Rick Lin created BEAM-3073: -- Summary: Connect to Apache ignite via JdbcIO sdk Key: BEAM-3073 URL: https://issues.apache.org/jira/browse/BEAM-3073 Project: Beam Issue Type: New Feature Components: sdk-java-extensions Reporter: Rick Lin Assignee: Reuven Lax Priority: Minor Hi all, {color:#14892c}I tried to connect Apache Ignite(In-memory) via the beam's sdk:org.apache.beam.sdk.io.jdbc.JdbcIO Here, i am not sure if the JdbcIO sdk only is provided for some specific Database: MySQL(disk), postgreSQL(disk)?{color} my java test code is as follows: import java.sql.PreparedStatement; import java.sql.SQLException; import java.util.ArrayList; import java.util.List; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.jdbc.JdbcIO; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; public class BeamtoJDBC { public static void main(String[] args) { Integer[] value=new Integer[] {1,2,3,4,5}; List> dataList = new ArrayList<>(); int n=value.length; int count=0; for (int i=0; i > data=p.apply("create data with time",Create.of(dataList)); data.apply(JdbcIO. >write() .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration .create("org.apache.ignite.IgniteJdbcDriver", "jdbc:ignite://localhost:11211/") ) .withPreparedStatementSetter(new JdbcIO.PreparedStatementSetter >() { public void setParameters(KV element, PreparedStatement query) throws SQLException { query.setInt(1, element.getKey()); query.setInt(2, element.getValue()); } }) ); p.run(); } } {color:#d04437}my error message is: " InvocationTargetException: org.apache.beam.sdk.util.UserCodeException: java.sql.SQLException: Cannot create PoolableConnectionFactory (Failed to establish connection.): Failed to get future result due to waiting timed out. "{color} {color:#14892c}I would like to know whether the connection between beam and ignite is feasi{color}ble or not? Thanks Rick -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (BEAM-3073) Connect to Apache ignite via JdbcIO sdk
[ https://issues.apache.org/jira/browse/BEAM-3073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16208867#comment-16208867 ] Rick Lin commented on BEAM-3073: There are details about my run environment *OS:* Distributor ID: Ubuntu Description:Ubuntu 14.04.5 LTS Release:14.04 Codename: trusty *Port:* 11211 (LISTEN) *pom.xml:* org.apache.ignite ignite-core ${ignite.version} Here, I have not import any class about ignite package Thanks Rick > Connect to Apache ignite via JdbcIO sdk > --- > > Key: BEAM-3073 > URL: https://issues.apache.org/jira/browse/BEAM-3073 > Project: Beam > Issue Type: New Feature > Components: sdk-java-extensions >Reporter: Rick Lin >Assignee: Jean-Baptiste Onofré >Priority: Minor > > Hi all, > {color:#14892c}I tried to connect Apache Ignite(In-memory) via the beam's > sdk:org.apache.beam.sdk.io.jdbc.JdbcIO > Here, i am not sure if the JdbcIO sdk only is provided for some specific > Database: MySQL(disk), postgreSQL(disk)?{color} > my java test code is as follows: > import java.sql.PreparedStatement; > import java.sql.SQLException; > import java.util.ArrayList; > import java.util.List; > import org.apache.beam.sdk.Pipeline; > import org.apache.beam.sdk.io.jdbc.JdbcIO; > import org.apache.beam.sdk.options.PipelineOptionsFactory; > import org.apache.beam.sdk.transforms.Create; > import org.apache.beam.sdk.values.KV; > import org.apache.beam.sdk.values.PCollection; > public class BeamtoJDBC { > public static void main(String[] args) { > Integer[] value=new Integer[] {1,2,3,4,5}; > List> dataList = new ArrayList<>(); > int n=value.length; > int count=0; > for (int i=0; i { > dataList.add(KV.of(count,value[i])); > count=count+1; > } > > Pipeline p = > Pipeline.create(PipelineOptionsFactory.fromArgs(args).withValidation().create()); > > PCollection > data=p.apply("create data > with time",Create.of(dataList)); > data.apply(JdbcIO. >write() > > .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration > > .create("org.apache.ignite.IgniteJdbcDriver", > "jdbc:ignite://localhost:11211/") > ) > .withPreparedStatementSetter(new > JdbcIO.PreparedStatementSetter >() { > public void setParameters(KV Integer> element, PreparedStatement query) > throws SQLException { > query.setInt(1, > element.getKey()); > query.setInt(2, > element.getValue()); > } > }) > ); > p.run(); > } > } > {color:#d04437}my error message is: > " InvocationTargetException: org.apache.beam.sdk.util.UserCodeException: > java.sql.SQLException: Cannot create PoolableConnectionFactory > (Failed to establish connection.): Failed to get future result due to waiting > timed out. "{color} > {color:#14892c}I would like to know whether the connection between beam and > ignite is feasible or not?{color} > Thanks > Rick -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (BEAM-3210) The problem about the use of waitUntilFinish() in DirectRunner
Rick Lin created BEAM-3210: -- Summary: The problem about the use of waitUntilFinish() in DirectRunner Key: BEAM-3210 URL: https://issues.apache.org/jira/browse/BEAM-3210 Project: Beam Issue Type: Bug Components: runner-direct Affects Versions: 2.1.0 Environment: Ubuntn 14.04.3 LTS JDK 1.8 Beam 2.1.0 Maven 3.5.0 Reporter: Rick Lin Assignee: Thomas Groh Fix For: 2.1.0 Dear sir, The description of waitUntilFinish() is "waits until the pipeline finishes and returns the final status." In my project, a static variable is used to record a PCollection context, where the static variable is a data list type. For this, I considered the "p.run().waitUntilFinish()" to wait until the pipeline finishes to avoid the loss of record in the data list. Unfortunately, there is a problem that the data list{color:#d04437} *sometimes* {color}may record the "null" value instead of the realistic value In order to clearly explain, i provide my java code in the following. {color:#14892c}"import java.io.IOException; import java.util.ArrayList; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Mean; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.DoFn.ProcessContext; import org.apache.beam.sdk.transforms.DoFn.ProcessElement; public class BeamTestStatic extends Thread { public static ArrayList myList = new ArrayList(); public static class StaticTest extends DoFn{ @ProcessElement public void test(ProcessContext c) { myList.add(c.element()); } } public static void main(String[] args) throws IOException { StaticTest testa=new StaticTest(); PipelineOptions options = PipelineOptionsFactory.create(); Pipeline p = Pipeline.create(options); PCollection data=p.apply("Rawdata", Create.of(1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0,9.0,10.0,)); PCollection listtest= data.apply(ParDo.of(testa)); p.run().waitUntilFinish(); System.out.println("mylist_size_a="+myList.size()); for (int i = 0; i < myList.size(); i++) { System.out.println("mylist_data="+myList.get(i)); } "{color} In addition, the result of my code is: {color:#205081}"mylist_size_a=10 mylist_data=null mylist_data=4.0 mylist_data=5.0 mylist_data=9.0 mylist_data=6.0 mylist_data=1.0 mylist_data=7.0 mylist_data=8.0 mylist_data=10.0 mylist_data=3.0"{color} If you have any further information, I am glad to be informed. Thanks Rick -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (BEAM-4631) kafkIO should run the streaming mode over spark runner
[ https://issues.apache.org/jira/browse/BEAM-4631?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rick Lin updated BEAM-4631: --- Summary: kafkIO should run the streaming mode over spark runner (was: kafkIO should be the streaming mode over spark runner) > kafkIO should run the streaming mode over spark runner > -- > > Key: BEAM-4631 > URL: https://issues.apache.org/jira/browse/BEAM-4631 > Project: Beam > Issue Type: Bug > Components: io-java-kafka, runner-spark >Affects Versions: 2.4.0 > Environment: Ubuntu 16.04.4 LTS >Reporter: Rick Lin >Assignee: Raghu Angadi >Priority: Major > Fix For: 2.4.0 > > > Dear sir, > The following versions of related tools are set in my running program: > == > Beam 2.4.0 (Direct runner and Spark runner) > Spark 2.2.1 (local mode and standalone mode) > Kafka: 2.11-0.10.1.1 > scala: 2.11.8 > java: 1.8 > == > My programs (KafkaToKafka.java and StarterPipeline.java) are as shown on my > github: [https://github.com/LinRick/beamkafkaIO], > The description of my situation is as: > {color:#14892c}The kafka broker is working and kafkaIO.read (consumer) is > used to capture data from the assigned broker ip > ([http://ubuntu7:9092)|http://ubuntu7:9092)./].{color} > {color:#14892c}The user manual of kafkaIO SDK (on > web:[https://beam.apache.org/documentation/sdks/javadoc/2.4.0/]) indicates > that the following parameters need to be set, and then the kafkaIO can work > well.{color} > {color:#FF}.withBootstrapServers("kafka broker ip:9092"){color} > {color:#FF} .withTopic("kafkasink"){color} > {color:#FF} .withKeyDeserializer(IntegerDeserializer.class){color} > {color:#FF} .withValueDeserializer(StringDeserializer.class) {color} > When i run my program with these settings over direct runner, i can find that > my program perform well. In addition, my running program is the streaming > mode. *However, i run these codes with the same settings (kafkaIO) over spark > runner, and my running program is not the streaming mode and is shutdown*. > Here, as mentioned on the website: > [https://beam.apache.org/documentation/runners/spark/], the performing > program will automatically set streaming mode. > Unfortunately, it failed for my program. > On the other hand, If i set the parameter kafkaIO.read.withMaxNumRecords > (1000) or kafkaIO.read.withMaxReadTime (Duration second), my program will > successfully execute as the batch mode (batch processing). > The steps of performing StarterPipeline.java in my program are: > step1 mvn compile exec:java -Dexec.mainClass=com.itri.beam.StarterPipeline > -Pspark2-runner -Dexec.args="--runner=SparkRunner" > step2 mvn clean package > step3 cp -rf target/beamkafkaIO-0.1.jar /root/ > step4 cd /spark-2.2.1-bin-hadoop2.6/bin > step5 ./spark-submit --class com.itri.beam.StarterPipeline --master local[4] > /root/beamkafkaIO-0.1.jar --runner=SparkRunner > I am not sure if this issue is a bug about kafkaIO or I was wrong with some > parameter settings over spark runner ? > I really can't handle it, so I hope to get help from you. > if any further information is needed, i am glad to be informed and will > provide to you as soon as possible. > I will highly appreciate it if you can help me to deal with this issue. > i am looking forward to hearing from you. > > Sincerely yours, > > Rick > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (BEAM-4632) kafkIO should be the streaming mode over spark runner
Rick Lin created BEAM-4632: -- Summary: kafkIO should be the streaming mode over spark runner Key: BEAM-4632 URL: https://issues.apache.org/jira/browse/BEAM-4632 Project: Beam Issue Type: Bug Components: io-java-kafka, runner-spark Affects Versions: 2.4.0 Environment: Ubuntu 16.04.4 LTS Reporter: Rick Lin Assignee: Raghu Angadi Fix For: 2.4.0 Dear sir, The following versions of related tools are set in my running program: == Beam 2.4.0 (Direct runner and Spark runner) Spark 2.2.1 (local mode and standalone mode) Kafka: 2.11-0.10.1.1 scala: 2.11.8 java: 1.8 == My programs (KafkaToKafka.java and StarterPipeline.java) are as shown on my github: [https://github.com/LinRick/beamkafkaIO], The description of my situation is as: {color:#14892c}The kafka broker is working and kafkaIO.read (consumer) is used to capture data from the assigned broker ip ([http://ubuntu7:9092)|http://ubuntu7:9092)./].{color} {color:#14892c}The user manual of kafkaIO SDK (on web:[https://beam.apache.org/documentation/sdks/javadoc/2.4.0/]) indicates that the following parameters need to be set, and then the kafkaIO can work well.{color} {color:#FF}.withBootstrapServers("kafka broker ip:9092"){color} {color:#FF} .withTopic("kafkasink"){color} {color:#FF} .withKeyDeserializer(IntegerDeserializer.class){color} {color:#FF} .withValueDeserializer(StringDeserializer.class) {color} When i run my program with these settings over direct runner, i can find that my program perform well. In addition, my running program is the streaming mode. *However, i run these codes with the same settings (kafkaIO) over spark runner, and my running program is not the streaming mode and is shutdown*. Here, as mentioned on the website: [https://beam.apache.org/documentation/runners/spark/], the performing program will automatically set streaming mode. Unfortunately, it failed for my program. On the other hand, If i set the parameter kafkaIO.read.withMaxNumRecords (1000) or kafkaIO.read.withMaxReadTime (Duration second), my program will successfully execute as the batch mode (batch processing). The steps of performing StarterPipeline.java in my program are: step1 mvn compile exec:java -Dexec.mainClass=com.itri.beam.StarterPipeline -Pspark2-runner -Dexec.args="--runner=SparkRunner" step2 mvn clean package step3 cp -rf target/beamkafkaIO-0.1.jar /root/ step4 cd /spark-2.2.1-bin-hadoop2.6/bin step5 ./spark-submit --class com.itri.beam.StarterPipeline --master local[4] /root/beamkafkaIO-0.1.jar --runner=SparkRunner I am not sure if this issue is a bug about kafkaIO or I was wrong with some parameter settings over spark runner ? I really can't handle it, so I hope to get help from you. if any further information is needed, i am glad to be informed and will provide to you as soon as possible. I will highly appreciate it if you can help me to deal with this issue. i am looking forward to hearing from you. Sincerely yours, Rick -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (BEAM-4631) kafkIO should be the streaming mode over spark runner
Rick Lin created BEAM-4631: -- Summary: kafkIO should be the streaming mode over spark runner Key: BEAM-4631 URL: https://issues.apache.org/jira/browse/BEAM-4631 Project: Beam Issue Type: Bug Components: io-java-kafka, runner-spark Affects Versions: 2.4.0 Environment: Ubuntu 16.04.4 LTS Reporter: Rick Lin Assignee: Raghu Angadi Fix For: 2.4.0 Dear sir, The following versions of related tools are set in my running program: == Beam 2.4.0 (Direct runner and Spark runner) Spark 2.2.1 (local mode and standalone mode) Kafka: 2.11-0.10.1.1 scala: 2.11.8 java: 1.8 == My programs (KafkaToKafka.java and StarterPipeline.java) are as shown on my github: [https://github.com/LinRick/beamkafkaIO], The description of my situation is as: {color:#14892c}The kafka broker is working and kafkaIO.read (consumer) is used to capture data from the assigned broker ip ([http://ubuntu7:9092)|http://ubuntu7:9092)./].{color} {color:#14892c}The user manual of kafkaIO SDK (on web:[https://beam.apache.org/documentation/sdks/javadoc/2.4.0/]) indicates that the following parameters need to be set, and then the kafkaIO can work well.{color} {color:#FF}.withBootstrapServers("kafka broker ip:9092"){color} {color:#FF} .withTopic("kafkasink"){color} {color:#FF} .withKeyDeserializer(IntegerDeserializer.class){color} {color:#FF} .withValueDeserializer(StringDeserializer.class) {color} When i run my program with these settings over direct runner, i can find that my program perform well. In addition, my running program is the streaming mode. *However, i run these codes with the same settings (kafkaIO) over spark runner, and my running program is not the streaming mode and is shutdown*. Here, as mentioned on the website: [https://beam.apache.org/documentation/runners/spark/], the performing program will automatically set streaming mode. Unfortunately, it failed for my program. On the other hand, If i set the parameter kafkaIO.read.withMaxNumRecords (1000) or kafkaIO.read.withMaxReadTime (Duration second), my program will successfully execute as the batch mode (batch processing). The steps of performing StarterPipeline.java in my program are: step1 mvn compile exec:java -Dexec.mainClass=com.itri.beam.StarterPipeline -Pspark2-runner -Dexec.args="--runner=SparkRunner" step2 mvn clean package step3 cp -rf target/beamkafkaIO-0.1.jar /root/ step4 cd /spark-2.2.1-bin-hadoop2.6/bin step5 ./spark-submit --class com.itri.beam.StarterPipeline --master local[4] /root/beamkafkaIO-0.1.jar --runner=SparkRunner I am not sure if this issue is a bug about kafkaIO or I was wrong with some parameter settings over spark runner ? I really can't handle it, so I hope to get help from you. if any further information is needed, i am glad to be informed and will provide to you as soon as possible. I will highly appreciate it if you can help me to deal with this issue. i am looking forward to hearing from you. Sincerely yours, Rick -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (BEAM-4632) KafkaIO seems to fail on streaming mode over spark runner
[ https://issues.apache.org/jira/browse/BEAM-4632?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rick Lin updated BEAM-4632: --- Attachment: the error GeneratedMessageV3.JPG > KafkaIO seems to fail on streaming mode over spark runner > - > > Key: BEAM-4632 > URL: https://issues.apache.org/jira/browse/BEAM-4632 > Project: Beam > Issue Type: Bug > Components: io-java-kafka, runner-spark >Affects Versions: 2.4.0 > Environment: Ubuntu 16.04.4 LTS >Reporter: Rick Lin >Assignee: Alexey Romanenko >Priority: Major > Attachments: the error GeneratedMessageV3.JPG > > > Dear sir, > The following versions of related tools are set in my running program: > == > Beam 2.4.0 (Direct runner and Spark runner) > Spark 2.2.1 (local mode and standalone mode) > Kafka: 2.11-0.10.1.1 > scala: 2.11.8 > java: 1.8 > == > My programs (KafkaToKafka.java and StarterPipeline.java) are as shown on my > github: [https://github.com/LinRick/beamkafkaIO], > The description of my situation is as: > {color:#14892c}The kafka broker is working and kafkaIO.read (consumer) is > used to capture data from the assigned broker ip > ([http://ubuntu7:9092)|http://ubuntu7:9092)./].{color} > {color:#14892c}The user manual of kafkaIO SDK (on > web:[https://beam.apache.org/documentation/sdks/javadoc/2.4.0/]) indicates > that the following parameters need to be set, and then the kafkaIO can work > well.{color} > {color:#FF}.withBootstrapServers("kafka broker ip:9092"){color} > {color:#FF} .withTopic("kafkasink"){color} > {color:#FF} .withKeyDeserializer(IntegerDeserializer.class){color} > {color:#FF} .withValueDeserializer(StringDeserializer.class) {color} > When i run my program with these settings over direct runner, i can find that > my program perform well. In addition, my running program is the streaming > mode. *However, i run these codes with the same settings (kafkaIO) over spark > runner, and my running program is not the streaming mode and is shutdown*. > Here, as mentioned on the website: > [https://beam.apache.org/documentation/runners/spark/], the performing > program will automatically set streaming mode. > Unfortunately, it failed for my program. > On the other hand, If i set the parameter kafkaIO.read.withMaxNumRecords > (1000) or kafkaIO.read.withMaxReadTime (Duration second), my program will > successfully execute as the batch mode (batch processing). > The steps of performing StarterPipeline.java in my program are: > step1 mvn compile exec:java -Dexec.mainClass=com.itri.beam.StarterPipeline > -Pspark2-runner -Dexec.args="--runner=SparkRunner" > step2 mvn clean package > step3 cp -rf target/beamkafkaIO-0.1.jar /root/ > step4 cd /spark-2.2.1-bin-hadoop2.6/bin > step5 ./spark-submit --class com.itri.beam.StarterPipeline --master local[4] > /root/beamkafkaIO-0.1.jar --runner=SparkRunner > I am not sure if this issue is a bug about kafkaIO or I was wrong with some > parameter settings over spark runner ? > I really can't handle it, so I hope to get help from you. > if any further information is needed, i am glad to be informed and will > provide to you as soon as possible. > I will highly appreciate it if you can help me to deal with this issue. > i am looking forward to hearing from you. > > Sincerely yours, > > Rick > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (BEAM-4632) KafkaIO seems to fail on streaming mode over spark runner
[ https://issues.apache.org/jira/browse/BEAM-4632?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rick Lin updated BEAM-4632: --- Attachment: DB_table_kafkabeamdata_count.JPG > KafkaIO seems to fail on streaming mode over spark runner > - > > Key: BEAM-4632 > URL: https://issues.apache.org/jira/browse/BEAM-4632 > Project: Beam > Issue Type: Bug > Components: io-java-kafka, runner-spark >Affects Versions: 2.4.0 > Environment: Ubuntu 16.04.4 LTS >Reporter: Rick Lin >Assignee: Alexey Romanenko >Priority: Major > Attachments: DB_table_kafkabeamdata_count.JPG, the error > GeneratedMessageV3.JPG, the error GeneratedMessageV3.JPG > > > Dear sir, > The following versions of related tools are set in my running program: > == > Beam 2.4.0 (Direct runner and Spark runner) > Spark 2.2.1 (local mode and standalone mode) > Kafka: 2.11-0.10.1.1 > scala: 2.11.8 > java: 1.8 > == > My programs (KafkaToKafka.java and StarterPipeline.java) are as shown on my > github: [https://github.com/LinRick/beamkafkaIO], > The description of my situation is as: > {color:#14892c}The kafka broker is working and kafkaIO.read (consumer) is > used to capture data from the assigned broker ip > ([http://ubuntu7:9092)|http://ubuntu7:9092)./].{color} > {color:#14892c}The user manual of kafkaIO SDK (on > web:[https://beam.apache.org/documentation/sdks/javadoc/2.4.0/]) indicates > that the following parameters need to be set, and then the kafkaIO can work > well.{color} > {color:#FF}.withBootstrapServers("kafka broker ip:9092"){color} > {color:#FF} .withTopic("kafkasink"){color} > {color:#FF} .withKeyDeserializer(IntegerDeserializer.class){color} > {color:#FF} .withValueDeserializer(StringDeserializer.class) {color} > When i run my program with these settings over direct runner, i can find that > my program perform well. In addition, my running program is the streaming > mode. *However, i run these codes with the same settings (kafkaIO) over spark > runner, and my running program is not the streaming mode and is shutdown*. > Here, as mentioned on the website: > [https://beam.apache.org/documentation/runners/spark/], the performing > program will automatically set streaming mode. > Unfortunately, it failed for my program. > On the other hand, If i set the parameter kafkaIO.read.withMaxNumRecords > (1000) or kafkaIO.read.withMaxReadTime (Duration second), my program will > successfully execute as the batch mode (batch processing). > The steps of performing StarterPipeline.java in my program are: > step1 mvn compile exec:java -Dexec.mainClass=com.itri.beam.StarterPipeline > -Pspark2-runner -Dexec.args="--runner=SparkRunner" > step2 mvn clean package > step3 cp -rf target/beamkafkaIO-0.1.jar /root/ > step4 cd /spark-2.2.1-bin-hadoop2.6/bin > step5 ./spark-submit --class com.itri.beam.StarterPipeline --master local[4] > /root/beamkafkaIO-0.1.jar --runner=SparkRunner > I am not sure if this issue is a bug about kafkaIO or I was wrong with some > parameter settings over spark runner ? > I really can't handle it, so I hope to get help from you. > if any further information is needed, i am glad to be informed and will > provide to you as soon as possible. > I will highly appreciate it if you can help me to deal with this issue. > i am looking forward to hearing from you. > > Sincerely yours, > > Rick > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (BEAM-4632) KafkaIO seems to fail on streaming mode over spark runner
[ https://issues.apache.org/jira/browse/BEAM-4632?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rick Lin updated BEAM-4632: --- Attachment: the error GeneratedMessageV3.JPG > KafkaIO seems to fail on streaming mode over spark runner > - > > Key: BEAM-4632 > URL: https://issues.apache.org/jira/browse/BEAM-4632 > Project: Beam > Issue Type: Bug > Components: io-java-kafka, runner-spark >Affects Versions: 2.4.0 > Environment: Ubuntu 16.04.4 LTS >Reporter: Rick Lin >Assignee: Alexey Romanenko >Priority: Major > Attachments: the error GeneratedMessageV3.JPG, the error > GeneratedMessageV3.JPG > > > Dear sir, > The following versions of related tools are set in my running program: > == > Beam 2.4.0 (Direct runner and Spark runner) > Spark 2.2.1 (local mode and standalone mode) > Kafka: 2.11-0.10.1.1 > scala: 2.11.8 > java: 1.8 > == > My programs (KafkaToKafka.java and StarterPipeline.java) are as shown on my > github: [https://github.com/LinRick/beamkafkaIO], > The description of my situation is as: > {color:#14892c}The kafka broker is working and kafkaIO.read (consumer) is > used to capture data from the assigned broker ip > ([http://ubuntu7:9092)|http://ubuntu7:9092)./].{color} > {color:#14892c}The user manual of kafkaIO SDK (on > web:[https://beam.apache.org/documentation/sdks/javadoc/2.4.0/]) indicates > that the following parameters need to be set, and then the kafkaIO can work > well.{color} > {color:#FF}.withBootstrapServers("kafka broker ip:9092"){color} > {color:#FF} .withTopic("kafkasink"){color} > {color:#FF} .withKeyDeserializer(IntegerDeserializer.class){color} > {color:#FF} .withValueDeserializer(StringDeserializer.class) {color} > When i run my program with these settings over direct runner, i can find that > my program perform well. In addition, my running program is the streaming > mode. *However, i run these codes with the same settings (kafkaIO) over spark > runner, and my running program is not the streaming mode and is shutdown*. > Here, as mentioned on the website: > [https://beam.apache.org/documentation/runners/spark/], the performing > program will automatically set streaming mode. > Unfortunately, it failed for my program. > On the other hand, If i set the parameter kafkaIO.read.withMaxNumRecords > (1000) or kafkaIO.read.withMaxReadTime (Duration second), my program will > successfully execute as the batch mode (batch processing). > The steps of performing StarterPipeline.java in my program are: > step1 mvn compile exec:java -Dexec.mainClass=com.itri.beam.StarterPipeline > -Pspark2-runner -Dexec.args="--runner=SparkRunner" > step2 mvn clean package > step3 cp -rf target/beamkafkaIO-0.1.jar /root/ > step4 cd /spark-2.2.1-bin-hadoop2.6/bin > step5 ./spark-submit --class com.itri.beam.StarterPipeline --master local[4] > /root/beamkafkaIO-0.1.jar --runner=SparkRunner > I am not sure if this issue is a bug about kafkaIO or I was wrong with some > parameter settings over spark runner ? > I really can't handle it, so I hope to get help from you. > if any further information is needed, i am glad to be informed and will > provide to you as soon as possible. > I will highly appreciate it if you can help me to deal with this issue. > i am looking forward to hearing from you. > > Sincerely yours, > > Rick > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (BEAM-4632) KafkaIO seems to fail on streaming mode over spark runner
[ https://issues.apache.org/jira/browse/BEAM-4632?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rick Lin updated BEAM-4632: --- Attachment: .withMaxNumRecords(50).JPG > KafkaIO seems to fail on streaming mode over spark runner > - > > Key: BEAM-4632 > URL: https://issues.apache.org/jira/browse/BEAM-4632 > Project: Beam > Issue Type: Bug > Components: io-java-kafka, runner-spark >Affects Versions: 2.4.0 > Environment: Ubuntu 16.04.4 LTS >Reporter: Rick Lin >Assignee: Alexey Romanenko >Priority: Major > Attachments: .withMaxNumRecords(50).JPG, > DB_table_kafkabeamdata_count.JPG, the error GeneratedMessageV3.JPG, the error > GeneratedMessageV3.JPG > > > Dear sir, > The following versions of related tools are set in my running program: > == > Beam 2.4.0 (Direct runner and Spark runner) > Spark 2.2.1 (local mode and standalone mode) > Kafka: 2.11-0.10.1.1 > scala: 2.11.8 > java: 1.8 > == > My programs (KafkaToKafka.java and StarterPipeline.java) are as shown on my > github: [https://github.com/LinRick/beamkafkaIO], > The description of my situation is as: > {color:#14892c}The kafka broker is working and kafkaIO.read (consumer) is > used to capture data from the assigned broker ip > ([http://ubuntu7:9092)|http://ubuntu7:9092)./].{color} > {color:#14892c}The user manual of kafkaIO SDK (on > web:[https://beam.apache.org/documentation/sdks/javadoc/2.4.0/]) indicates > that the following parameters need to be set, and then the kafkaIO can work > well.{color} > {color:#FF}.withBootstrapServers("kafka broker ip:9092"){color} > {color:#FF} .withTopic("kafkasink"){color} > {color:#FF} .withKeyDeserializer(IntegerDeserializer.class){color} > {color:#FF} .withValueDeserializer(StringDeserializer.class) {color} > When i run my program with these settings over direct runner, i can find that > my program perform well. In addition, my running program is the streaming > mode. *However, i run these codes with the same settings (kafkaIO) over spark > runner, and my running program is not the streaming mode and is shutdown*. > Here, as mentioned on the website: > [https://beam.apache.org/documentation/runners/spark/], the performing > program will automatically set streaming mode. > Unfortunately, it failed for my program. > On the other hand, If i set the parameter kafkaIO.read.withMaxNumRecords > (1000) or kafkaIO.read.withMaxReadTime (Duration second), my program will > successfully execute as the batch mode (batch processing). > The steps of performing StarterPipeline.java in my program are: > step1 mvn compile exec:java -Dexec.mainClass=com.itri.beam.StarterPipeline > -Pspark2-runner -Dexec.args="--runner=SparkRunner" > step2 mvn clean package > step3 cp -rf target/beamkafkaIO-0.1.jar /root/ > step4 cd /spark-2.2.1-bin-hadoop2.6/bin > step5 ./spark-submit --class com.itri.beam.StarterPipeline --master local[4] > /root/beamkafkaIO-0.1.jar --runner=SparkRunner > I am not sure if this issue is a bug about kafkaIO or I was wrong with some > parameter settings over spark runner ? > I really can't handle it, so I hope to get help from you. > if any further information is needed, i am glad to be informed and will > provide to you as soon as possible. > I will highly appreciate it if you can help me to deal with this issue. > i am looking forward to hearing from you. > > Sincerely yours, > > Rick > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (BEAM-4632) KafkaIO seems to fail on streaming mode over spark runner
[ https://issues.apache.org/jira/browse/BEAM-4632?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16524523#comment-16524523 ] Rick Lin commented on BEAM-4632: Dear all, Thanks very much for your attention to this problem. Hi [~aromanenko], Yes, this project have been changed *beam.version* *from 2.5.0-SNAPSHOT to 2.4.0* and *spark2.jackson.version from 2.9.5 to 2.8.9* to run my project (updated on my Github). When running my pipeline with p.run().waitUntilFinish() for StarterPipeline.java program,{color:#d04437} *KafkaIO can be on streaming mode (spark runner with local[4])*.{color} The output of my pipeline is to write the amount of data into the PostgreSQL (raw_c42a25f4bd3d74429dbeb6162e60e5c7/kafkabeamdata) each second, as follows: {quote}{color:#205081}countData.apply(JdbcIO.>write(){color} {color:#205081} .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create({color} {color:#205081} "org.postgresql.Driver",{color} {color:#205081} "jdbc:postgresql://ubuntu7:5432/raw_c42a25f4bd3d74429dbeb6162e60e5c7"){color} {color:#205081} .withUsername("postgres"){color} {color:#205081} .withPassword("postgres")){color} {color:#205081} .withStatement("insert into kafkabeamdata (count) values(?)"){color} {color:#205081} .withPreparedStatementSetter(new JdbcIO.PreparedStatementSetter>() {{color} {color:#205081} @Override{color} {color:#205081} public void setParameters(KV element, PreparedStatement query){color} {color:#205081} throws SQLException {{color} {color:#205081} double count = element.getValue().doubleValue();{color} {color:#205081} query.setDouble(1, count);{color} {color:#205081} }{color} {color:#205081} }));{color} {quote} The following figure shows that the amount of data can be wrote into the DB,as: !DB_table_kafkabeamdata_count.JPG! In the above table, we can see many zero values (count), and that means there is no data in most windows with the applied window/triggering/Watermark: ".apply(Window.>into(FixedWindows.of(Duration.standardSeconds(1))) .triggering(AfterWatermark.pastEndOfWindow() .withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.ZERO))) .withAllowedLateness(Duration.ZERO) .discardingFiredPanes())". In this project, I hope that the count of data can mostly be equal than the amount of data generated in kafka producer. For example, the figure shows when using setting "kafkaIO.Read.withMaxNumRecords(50)" in the pipeline, as: !.withMaxNumRecords(50).JPG! {color:#d04437}The table shows that there is a expected quantity of streaming data in each window.{color} {color:#d04437}If I would like to realize the situation on a streaming mode, {color} {color:#d04437}what can i do related settings for spark runner (set standalone mode), spark pipeline (set MaxRecordsPerBatch), and kafkaIO.Read ?{color} On the other hand, there is another error: "Caused by: java.lang.ClassNotFoundException: com.google.protobuf.GeneratedMessageV3" as shown in attachments !the error GeneratedMessageV3.JPG|width=1156,height=249! For deal with this error, I have added the required dependency (added on my Github), as: "{color:#14892c}3.4.0{color} {color:#14892c}{color} {color:#14892c} {color} {color:#14892c} com.google.protobuf{color} {color:#14892c} protobuf-java{color} {color:#14892c} ${protobuf-java.version}{color} {color:#14892c} {color}{color:#33}"{color} Rick > KafkaIO seems to fail on streaming mode over spark runner > - > > Key: BEAM-4632 > URL: https://issues.apache.org/jira/browse/BEAM-4632 > Project: Beam > Issue Type: Bug > Components: io-java-kafka, runner-spark >Affects Versions: 2.4.0 > Environment: Ubuntu 16.04.4 LTS >Reporter: Rick Lin >Assignee: Alexey Romanenko >Priority: Major > Attachments: .withMaxNumRecords(50).JPG, > DB_table_kafkabeamdata_count.JPG, the error GeneratedMessageV3.JPG, the error > GeneratedMessageV3.JPG > > > Dear sir, > The following versions of related tools are set in my running program: > == > Beam 2.4.0 (Direct runner and Spark runner) > Spark 2.2.1 (local mode and standalone mode) > Kafka: 2.11-0.10.1.1 > scala: 2.11.8 > java: 1.8 > == > My programs (KafkaToKafka.java and StarterPipeline.java) are as shown on my > github: [https://github.com/LinRick/beamkafkaIO], > The description of my situation is as: > {color:#14892c}The kafka broker is working and kafkaIO.read (consumer) is > used to capture data from the assigned broker ip > ([http://ubuntu7:9092)|http://ubuntu7:9092)./].{color} > {color:#14892c}The user manual of kafkaIO SDK (on > web:[https://beam.apache.org/documentation/sdks/javadoc/2.4.0/]) indicates > that the following parameters need to be set, and then
[jira] [Comment Edited] (BEAM-4632) KafkaIO seems to fail on streaming mode over spark runner
[ https://issues.apache.org/jira/browse/BEAM-4632?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16524523#comment-16524523 ] Rick Lin edited comment on BEAM-4632 at 6/27/18 6:25 AM: - Dear all, Thanks very much for your attention to this problem. Hi [~aromanenko], Yes, this project have been changed *beam.version* *from 2.5.0-SNAPSHOT to 2.4.0* and *spark2.jackson.version from 2.9.5 to 2.8.9* to run my project (updated on my Github). When running my pipeline with p.run().waitUntilFinish() for StarterPipeline.java program,{color:#d04437} *KafkaIO can be on streaming mode (spark runner with local[4])*.{color} The output of my pipeline is to write the amount of data into the PostgreSQL (raw_c42a25f4bd3d74429dbeb6162e60e5c7/kafkabeamdata) each second, as follows: {quote}{color:#205081}countData.apply(JdbcIO.>write(){color} {color:#205081} .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create({color} {color:#205081} "org.postgresql.Driver",{color} {color:#205081} "jdbc:postgresql://ubuntu7:5432/raw_c42a25f4bd3d74429dbeb6162e60e5c7"){color} {color:#205081} .withUsername("postgres"){color} {color:#205081} .withPassword("postgres")){color} {color:#205081} .withStatement("insert into kafkabeamdata (count) values(?)"){color} {color:#205081} .withPreparedStatementSetter(new JdbcIO.PreparedStatementSetter>() {{color} {color:#205081} @Override{color} {color:#205081} public void setParameters(KV element, PreparedStatement query){color} {color:#205081} throws SQLException {{color} {color:#205081} double count = element.getValue().doubleValue();{color} {color:#205081} query.setDouble(1, count);{color} {color:#205081} }{color} {color:#205081} }));{color} {quote} The following figure shows that the amount of data can be wrote into the DB,as: !DB_table_kafkabeamdata_count.JPG! In the above table, we can see many zero values (count), and that means there is no data in most windows with the applied window/triggering/Watermark: {quote}".apply(Window.>into(FixedWindows.of(Duration.standardSeconds(1))) .triggering(AfterWatermark.pastEndOfWindow() .withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.ZERO))) .withAllowedLateness(Duration.ZERO) .discardingFiredPanes())". {quote} In this project, I hope that the count of data can mostly be equal than the amount of data generated in kafka producer. For example, the figure shows when using setting "kafkaIO.Read.withMaxNumRecords(50)" in the pipeline, as: !.withMaxNumRecords(50).JPG! {color:#d04437}The table shows that there is a expected quantity of streaming data in each window.{color} {color:#d04437}If I would like to realize the situation on a streaming mode, {color} {color:#d04437}what can i do related settings for spark runner (set standalone mode), spark pipeline (set MaxRecordsPerBatch), and kafkaIO.Read ?{color} On the other hand, there is another error: "Caused by: java.lang.ClassNotFoundException: com.google.protobuf.GeneratedMessageV3" as shown in attachments !the error GeneratedMessageV3.JPG|width=1156,height=249! For dealing with this error, I have added the required dependency (added on my Github), as: "{color:#14892c}3.4.0{color} {color:#14892c}{color} {color:#14892c} {color} {color:#14892c} com.google.protobuf{color} {color:#14892c} protobuf-java{color} {color:#14892c} ${protobuf-java.version}{color} {color:#14892c} {color}{color:#33}"{color} Rick was (Author: ricklin): Dear all, Thanks very much for your attention to this problem. Hi [~aromanenko], Yes, this project have been changed *beam.version* *from 2.5.0-SNAPSHOT to 2.4.0* and *spark2.jackson.version from 2.9.5 to 2.8.9* to run my project (updated on my Github). When running my pipeline with p.run().waitUntilFinish() for StarterPipeline.java program,{color:#d04437} *KafkaIO can be on streaming mode (spark runner with local[4])*.{color} The output of my pipeline is to write the amount of data into the PostgreSQL (raw_c42a25f4bd3d74429dbeb6162e60e5c7/kafkabeamdata) each second, as follows: {quote}{color:#205081}countData.apply(JdbcIO.>write(){color} {color:#205081} .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create({color} {color:#205081} "org.postgresql.Driver",{color} {color:#205081} "jdbc:postgresql://ubuntu7:5432/raw_c42a25f4bd3d74429dbeb6162e60e5c7"){color} {color:#205081} .withUsername("postgres"){color} {color:#205081} .withPassword("postgres")){color} {color:#205081} .withStatement("insert into kafkabeamdata (count) values(?)"){color} {color:#205081} .withPreparedStatementSetter(new JdbcIO.PreparedStatementSetter>() {{color} {color:#205081} @Override{color} {color:#205081} public void setParameters(KV element, PreparedStatement query){color} {color:#205081} throws SQLException {{color} {color:#205081} double count = element.getValue().doubleValue();{color}
[jira] [Commented] (BEAM-4632) KafkaIO seems to fail on streaming mode over spark runner
[ https://issues.apache.org/jira/browse/BEAM-4632?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16524674#comment-16524674 ] Rick Lin commented on BEAM-4632: Dear [~aromanenko], I have tried running my project with Spark runner (standalone mode) to {color:#d04437}capture more data from kafka{color} into each window, in which the settings of driver/master/worker nodes respectively are: Driver node (ubuntu8): spark-defaults.conf {quote}spark.driver.memory 10g spark.executor.memory 2g spark.executor.instances 4 {quote} master node (ubuntu8): {quote}export SPARK_MASTER_IP="ubuntu8" export SPARK_MASTER_WEBUI_PORT=8082 {quote} The settings of two worker nodes (ubuntu8 and ubuntu9) are the same as the master node. ||ExecutorID||Worker||Cores||Memory||State||Logs|| |1|[worker-20180627140534-ubuntu9-39922|http://10.236.1.9:8081/]|4|2048|KILLED|[stdout|http://10.236.1.9:8081/logPage?appId=app-20180627141506-0002=1=stdout] [stderr|http://10.236.1.9:8081/logPage?appId=app-20180627141506-0002=1=stderr]| |0|[worker-20180627141225-ubuntu8-33234|http://10.236.1.8:8081/]|4|2048|KILLED|[stdout|http://10.236.1.8:8081/logPage?appId=app-20180627141506-0002=0=stdout] [stderr|http://10.236.1.8:8081/logPage?appId=app-20180627141506-0002=0=stderr]| In addition, my pipeline is set as: {quote}Pipeline p = Pipeline.create(options); {color:#d04437}options.setMaxRecordsPerBatch(1000L);{color} {color:#d04437}options.setSparkMaster("spark://ubuntu8:7077");{color} ... PCollection> readData = p.apply(KafkaIO.read() .withBootstrapServers("ubuntu7:9092") .withTopic("kafkasink") .withKeyDeserializer(IntegerDeserializer.class) .withValueDeserializer(StringDeserializer.class) .withoutMetadata()); ... p.run().waitUntilFinish(); {quote} and uses the following command line to run project: ./spark-submit --class com.itri.beam.StarterPipeline --master spark://ubuntu8:7077 /root/beamkafkaIO-0.1.jar --runner=SparkRunner After a while, my program is broken, where this error is as shown in attachments (error UnboundedDataset.java 81(0) has different number of partitions.JPG) "UnboundedDataset.java:81(0) has different number of partitions from original RDD MapPartitionsRDD[698] at updateStateByKey at SparkGroupAlsoByWindowViaWindowSet.java:612(2)" Although only using one work node (ubuntu9), the similar error still appears. Best, Rick > KafkaIO seems to fail on streaming mode over spark runner > - > > Key: BEAM-4632 > URL: https://issues.apache.org/jira/browse/BEAM-4632 > Project: Beam > Issue Type: Bug > Components: io-java-kafka, runner-spark >Affects Versions: 2.4.0 > Environment: Ubuntu 16.04.4 LTS >Reporter: Rick Lin >Assignee: Alexey Romanenko >Priority: Major > Attachments: .withMaxNumRecords(50).JPG, > DB_table_kafkabeamdata_count.JPG, error UnboundedDataset.java 81(0) has > different number of partitions.JPG, the error GeneratedMessageV3.JPG, the > error GeneratedMessageV3.JPG > > > Dear sir, > The following versions of related tools are set in my running program: > == > Beam 2.4.0 (Direct runner and Spark runner) > Spark 2.2.1 (local mode and standalone mode) > Kafka: 2.11-0.10.1.1 > scala: 2.11.8 > java: 1.8 > == > My programs (KafkaToKafka.java and StarterPipeline.java) are as shown on my > github: [https://github.com/LinRick/beamkafkaIO], > The description of my situation is as: > {color:#14892c}The kafka broker is working and kafkaIO.read (consumer) is > used to capture data from the assigned broker ip > ([http://ubuntu7:9092)|http://ubuntu7:9092)./].{color} > {color:#14892c}The user manual of kafkaIO SDK (on > web:[https://beam.apache.org/documentation/sdks/javadoc/2.4.0/]) indicates > that the following parameters need to be set, and then the kafkaIO can work > well.{color} > {color:#FF}.withBootstrapServers("kafka broker ip:9092"){color} > {color:#FF} .withTopic("kafkasink"){color} > {color:#FF} .withKeyDeserializer(IntegerDeserializer.class){color} > {color:#FF} .withValueDeserializer(StringDeserializer.class) {color} > When i run my program with these settings over direct runner, i can find that > my program perform well. In addition, my running program is the streaming > mode. *However, i run these codes with the same settings (kafkaIO) over spark > runner, and my running program is not the streaming mode and is shutdown*. > Here, as mentioned on the website: > [https://beam.apache.org/documentation/runners/spark/], the performing > program will automatically set streaming mode. > Unfortunately, it failed for my program. > On the other hand, If i set the parameter kafkaIO.read.withMaxNumRecords > (1000) or kafkaIO.read.withMaxReadTime (Duration
[jira] [Updated] (BEAM-4632) KafkaIO seems to fail on streaming mode over spark runner
[ https://issues.apache.org/jira/browse/BEAM-4632?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rick Lin updated BEAM-4632: --- Attachment: error UnboundedDataset.java 81(0) has different number of partitions.JPG > KafkaIO seems to fail on streaming mode over spark runner > - > > Key: BEAM-4632 > URL: https://issues.apache.org/jira/browse/BEAM-4632 > Project: Beam > Issue Type: Bug > Components: io-java-kafka, runner-spark >Affects Versions: 2.4.0 > Environment: Ubuntu 16.04.4 LTS >Reporter: Rick Lin >Assignee: Alexey Romanenko >Priority: Major > Attachments: .withMaxNumRecords(50).JPG, > DB_table_kafkabeamdata_count.JPG, error UnboundedDataset.java 81(0) has > different number of partitions.JPG, the error GeneratedMessageV3.JPG, the > error GeneratedMessageV3.JPG > > > Dear sir, > The following versions of related tools are set in my running program: > == > Beam 2.4.0 (Direct runner and Spark runner) > Spark 2.2.1 (local mode and standalone mode) > Kafka: 2.11-0.10.1.1 > scala: 2.11.8 > java: 1.8 > == > My programs (KafkaToKafka.java and StarterPipeline.java) are as shown on my > github: [https://github.com/LinRick/beamkafkaIO], > The description of my situation is as: > {color:#14892c}The kafka broker is working and kafkaIO.read (consumer) is > used to capture data from the assigned broker ip > ([http://ubuntu7:9092)|http://ubuntu7:9092)./].{color} > {color:#14892c}The user manual of kafkaIO SDK (on > web:[https://beam.apache.org/documentation/sdks/javadoc/2.4.0/]) indicates > that the following parameters need to be set, and then the kafkaIO can work > well.{color} > {color:#FF}.withBootstrapServers("kafka broker ip:9092"){color} > {color:#FF} .withTopic("kafkasink"){color} > {color:#FF} .withKeyDeserializer(IntegerDeserializer.class){color} > {color:#FF} .withValueDeserializer(StringDeserializer.class) {color} > When i run my program with these settings over direct runner, i can find that > my program perform well. In addition, my running program is the streaming > mode. *However, i run these codes with the same settings (kafkaIO) over spark > runner, and my running program is not the streaming mode and is shutdown*. > Here, as mentioned on the website: > [https://beam.apache.org/documentation/runners/spark/], the performing > program will automatically set streaming mode. > Unfortunately, it failed for my program. > On the other hand, If i set the parameter kafkaIO.read.withMaxNumRecords > (1000) or kafkaIO.read.withMaxReadTime (Duration second), my program will > successfully execute as the batch mode (batch processing). > The steps of performing StarterPipeline.java in my program are: > step1 mvn compile exec:java -Dexec.mainClass=com.itri.beam.StarterPipeline > -Pspark2-runner -Dexec.args="--runner=SparkRunner" > step2 mvn clean package > step3 cp -rf target/beamkafkaIO-0.1.jar /root/ > step4 cd /spark-2.2.1-bin-hadoop2.6/bin > step5 ./spark-submit --class com.itri.beam.StarterPipeline --master local[4] > /root/beamkafkaIO-0.1.jar --runner=SparkRunner > I am not sure if this issue is a bug about kafkaIO or I was wrong with some > parameter settings over spark runner ? > I really can't handle it, so I hope to get help from you. > if any further information is needed, i am glad to be informed and will > provide to you as soon as possible. > I will highly appreciate it if you can help me to deal with this issue. > i am looking forward to hearing from you. > > Sincerely yours, > > Rick > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (BEAM-4632) KafkaIO seems to fail on streaming mode over spark runner
[ https://issues.apache.org/jira/browse/BEAM-4632?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16524674#comment-16524674 ] Rick Lin edited comment on BEAM-4632 at 6/27/18 7:04 AM: - Dear [~aromanenko], I have tried running my project with Spark runner (standalone mode) to {color:#d04437}capture more data from kafka{color} into each window, in which the settings of driver/master/worker nodes respectively are: Driver node (ubuntu8): spark-defaults.conf {quote}spark.driver.memory 10g spark.executor.memory 2g spark.executor.instances 4 {quote} master node (ubuntu8): {quote}export SPARK_MASTER_IP="ubuntu8" export SPARK_MASTER_WEBUI_PORT=8082 {quote} The settings of two worker nodes (ubuntu8 and ubuntu9) are the same as the master node. ||ExecutorID||Worker||Cores||Memory||State||Logs|| |1|ubuntu9|4|2048|KILLED| | |0|ubuntu8|4|2048|KILLED| | In addition, my pipeline is set as: {quote}Pipeline p = Pipeline.create(options); {color:#d04437}options.setMaxRecordsPerBatch(1000L);{color} {color:#d04437}options.setSparkMaster("spark://ubuntu8:7077");{color} ... PCollection> readData = p.apply(KafkaIO.read() .withBootstrapServers("ubuntu7:9092") .withTopic("kafkasink") .withKeyDeserializer(IntegerDeserializer.class) .withValueDeserializer(StringDeserializer.class) .withoutMetadata()); ... p.run().waitUntilFinish(); {quote} and uses the following command line to run project: ./spark-submit --class com.itri.beam.StarterPipeline --master spark://ubuntu8:7077 /root/beamkafkaIO-0.1.jar --runner=SparkRunner After a while, my program is broken, where this error is as shown in attachments (error UnboundedDataset.java 81(0) has different number of partitions.JPG) "UnboundedDataset.java:81(0) has different number of partitions from original RDD MapPartitionsRDD[698] at updateStateByKey at SparkGroupAlsoByWindowViaWindowSet.java:612(2)" Although only using one work node (ubuntu9), the similar error still appears. Best, Rick was (Author: ricklin): Dear [~aromanenko], I have tried running my project with Spark runner (standalone mode) to {color:#d04437}capture more data from kafka{color} into each window, in which the settings of driver/master/worker nodes respectively are: Driver node (ubuntu8): spark-defaults.conf {quote}spark.driver.memory 10g spark.executor.memory 2g spark.executor.instances 4 {quote} master node (ubuntu8): {quote}export SPARK_MASTER_IP="ubuntu8" export SPARK_MASTER_WEBUI_PORT=8082 {quote} The settings of two worker nodes (ubuntu8 and ubuntu9) are the same as the master node. ||ExecutorID||Worker||Cores||Memory||State||Logs|| |1|[worker-20180627140534-ubuntu9-39922|http://10.236.1.9:8081/]|4|2048|KILLED|[stdout|http://10.236.1.9:8081/logPage?appId=app-20180627141506-0002=1=stdout] [stderr|http://10.236.1.9:8081/logPage?appId=app-20180627141506-0002=1=stderr]| |0|[worker-20180627141225-ubuntu8-33234|http://10.236.1.8:8081/]|4|2048|KILLED|[stdout|http://10.236.1.8:8081/logPage?appId=app-20180627141506-0002=0=stdout] [stderr|http://10.236.1.8:8081/logPage?appId=app-20180627141506-0002=0=stderr]| In addition, my pipeline is set as: {quote}Pipeline p = Pipeline.create(options); {color:#d04437}options.setMaxRecordsPerBatch(1000L);{color} {color:#d04437}options.setSparkMaster("spark://ubuntu8:7077");{color} ... PCollection> readData = p.apply(KafkaIO.read() .withBootstrapServers("ubuntu7:9092") .withTopic("kafkasink") .withKeyDeserializer(IntegerDeserializer.class) .withValueDeserializer(StringDeserializer.class) .withoutMetadata()); ... p.run().waitUntilFinish(); {quote} and uses the following command line to run project: ./spark-submit --class com.itri.beam.StarterPipeline --master spark://ubuntu8:7077 /root/beamkafkaIO-0.1.jar --runner=SparkRunner After a while, my program is broken, where this error is as shown in attachments (error UnboundedDataset.java 81(0) has different number of partitions.JPG) "UnboundedDataset.java:81(0) has different number of partitions from original RDD MapPartitionsRDD[698] at updateStateByKey at SparkGroupAlsoByWindowViaWindowSet.java:612(2)" Although only using one work node (ubuntu9), the similar error still appears. Best, Rick > KafkaIO seems to fail on streaming mode over spark runner > - > > Key: BEAM-4632 > URL: https://issues.apache.org/jira/browse/BEAM-4632 > Project: Beam > Issue Type: Bug > Components: io-java-kafka, runner-spark >Affects Versions: 2.4.0 > Environment: Ubuntu 16.04.4 LTS >Reporter: Rick Lin >Assignee: Alexey Romanenko >Priority: Major > Attachments: .withMaxNumRecords(50).JPG, > DB_table_kafkabeamdata_count.JPG, error UnboundedDataset.java 81(0) has > different number of partitions.JPG, the error
[jira] [Commented] (BEAM-3073) Connect to Apache ignite via JdbcIO sdk
[ https://issues.apache.org/jira/browse/BEAM-3073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16212010#comment-16212010 ] Rick Lin commented on BEAM-3073: Hi, Apache Ignite is similar to Redis or Hazelcast (in-memory key-value database). I think that the JdbcIO sdk is not suitable for Ignite. So, developing the in-memory IO sdk for Ignite is needed. Thanks > Connect to Apache ignite via JdbcIO sdk > --- > > Key: BEAM-3073 > URL: https://issues.apache.org/jira/browse/BEAM-3073 > Project: Beam > Issue Type: New Feature > Components: sdk-java-extensions >Reporter: Rick Lin >Assignee: Jean-Baptiste Onofré >Priority: Minor > > Hi all, > {color:#14892c}I tried to connect Apache Ignite(In-memory) via the beam's > sdk:org.apache.beam.sdk.io.jdbc.JdbcIO > Here, i am not sure if the JdbcIO sdk only is provided for some specific > Database: MySQL(disk), postgreSQL(disk)?{color} > my java test code is as follows: > import java.sql.PreparedStatement; > import java.sql.SQLException; > import java.util.ArrayList; > import java.util.List; > import org.apache.beam.sdk.Pipeline; > import org.apache.beam.sdk.io.jdbc.JdbcIO; > import org.apache.beam.sdk.options.PipelineOptionsFactory; > import org.apache.beam.sdk.transforms.Create; > import org.apache.beam.sdk.values.KV; > import org.apache.beam.sdk.values.PCollection; > public class BeamtoJDBC { > public static void main(String[] args) { > Integer[] value=new Integer[] {1,2,3,4,5}; > List> dataList = new ArrayList<>(); > int n=value.length; > int count=0; > for (int i=0; i { > dataList.add(KV.of(count,value[i])); > count=count+1; > } > > Pipeline p = > Pipeline.create(PipelineOptionsFactory.fromArgs(args).withValidation().create()); > > PCollection > data=p.apply("create data > with time",Create.of(dataList)); > data.apply(JdbcIO. >write() > > .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration > > .create("org.apache.ignite.IgniteJdbcDriver", > "jdbc:ignite://localhost:11211/") > ) > .withPreparedStatementSetter(new > JdbcIO.PreparedStatementSetter >() { > public void setParameters(KV Integer> element, PreparedStatement query) > throws SQLException { > query.setInt(1, > element.getKey()); > query.setInt(2, > element.getValue()); > } > }) > ); > p.run(); > } > } > {color:#d04437}my error message is: > " InvocationTargetException: org.apache.beam.sdk.util.UserCodeException: > java.sql.SQLException: Cannot create PoolableConnectionFactory > (Failed to establish connection.): Failed to get future result due to waiting > timed out. "{color} > {color:#14892c}I would like to know whether the connection between beam and > ignite is feasible or not?{color} > Thanks > Rick -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (BEAM-3073) Connect to Apache ignite via JdbcIO sdk
[ https://issues.apache.org/jira/browse/BEAM-3073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16212248#comment-16212248 ] Rick Lin commented on BEAM-3073: Hi, In my opinion: * The JDBC driver, org.apache.ignite.IgniteJdbcDriver, is only used to store data to Database(MySQL, PostgreSQL..) on disk via ignite. That is not for beam to output data into ignite. * Yes, i am looking forward to the SDK for ignite I/O Thanks for the update on the feature:). Rick > Connect to Apache ignite via JdbcIO sdk > --- > > Key: BEAM-3073 > URL: https://issues.apache.org/jira/browse/BEAM-3073 > Project: Beam > Issue Type: New Feature > Components: sdk-java-extensions >Reporter: Rick Lin >Assignee: Jean-Baptiste Onofré >Priority: Minor > > Hi all, > {color:#14892c}I tried to connect Apache Ignite(In-memory) via the beam's > sdk:org.apache.beam.sdk.io.jdbc.JdbcIO > Here, i am not sure if the JdbcIO sdk only is provided for some specific > Database: MySQL(disk), postgreSQL(disk)?{color} > my java test code is as follows: > import java.sql.PreparedStatement; > import java.sql.SQLException; > import java.util.ArrayList; > import java.util.List; > import org.apache.beam.sdk.Pipeline; > import org.apache.beam.sdk.io.jdbc.JdbcIO; > import org.apache.beam.sdk.options.PipelineOptionsFactory; > import org.apache.beam.sdk.transforms.Create; > import org.apache.beam.sdk.values.KV; > import org.apache.beam.sdk.values.PCollection; > public class BeamtoJDBC { > public static void main(String[] args) { > Integer[] value=new Integer[] {1,2,3,4,5}; > List> dataList = new ArrayList<>(); > int n=value.length; > int count=0; > for (int i=0; i { > dataList.add(KV.of(count,value[i])); > count=count+1; > } > > Pipeline p = > Pipeline.create(PipelineOptionsFactory.fromArgs(args).withValidation().create()); > > PCollection > data=p.apply("create data > with time",Create.of(dataList)); > data.apply(JdbcIO. >write() > > .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration > > .create("org.apache.ignite.IgniteJdbcDriver", > "jdbc:ignite://localhost:11211/") > ) > .withPreparedStatementSetter(new > JdbcIO.PreparedStatementSetter >() { > public void setParameters(KV Integer> element, PreparedStatement query) > throws SQLException { > query.setInt(1, > element.getKey()); > query.setInt(2, > element.getValue()); > } > }) > ); > p.run(); > } > } > {color:#d04437}my error message is: > " InvocationTargetException: org.apache.beam.sdk.util.UserCodeException: > java.sql.SQLException: Cannot create PoolableConnectionFactory > (Failed to establish connection.): Failed to get future result due to waiting > timed out. "{color} > {color:#14892c}I would like to know whether the connection between beam and > ignite is feasible or not?{color} > Thanks > Rick -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (BEAM-4632) KafkaIO seems to fail on streaming mode over spark runner
[ https://issues.apache.org/jira/browse/BEAM-4632?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16561540#comment-16561540 ] Rick Lin commented on BEAM-4632: By the way, my configuration setting of kafka broker is: {code:java} /kafka_broker/bin/kafka-producer-perf-test.sh \ --num-records 1000 \ --record-size 100 \ --topic kafkasink \ --throughput 1 \ --producer-props acks=0 \ bootstrap.servers=ubuntu7:9092 \ batch.size=1000{code} The display of kafka broker on console is as: ... 49992 records sent,{color:#d04437} 9998.4 records/sec{color} (0.95 MB/sec), 1.0 ms avg latency, 146.0 max latency. 50040 records sent, {color:#d04437}10008.0 records/sec{color} (0.95 MB/sec), 0.2 ms avg latency, 5.0 m ax latency. 50019 records sent, {color:#d04437}10001.8 records/sec{color} (0.95 MB/sec), 0.2 ms avg latency, 1.0 m ax latency. 50011 records sent, {color:#d04437}10002.2 records/sec{color} (0.95 MB/sec), 0.2 ms avg latency, 3.0 m ax latency. 50020 records sent, {color:#d04437}10002.0 records/sec{color} (0.95 MB/sec), 0.2 ms avg latency, 1.0 m ax latency. ... Rick > KafkaIO seems to fail on streaming mode over spark runner > - > > Key: BEAM-4632 > URL: https://issues.apache.org/jira/browse/BEAM-4632 > Project: Beam > Issue Type: Bug > Components: io-java-kafka, runner-spark >Affects Versions: 2.4.0 > Environment: Ubuntu 16.04.4 LTS >Reporter: Rick Lin >Assignee: Alexey Romanenko >Priority: Major > Attachments: .withMaxNumRecords(50).JPG, > DB_table_kafkabeamdata_count.JPG, error UnboundedDataset.java 81(0) has > different number of partitions.JPG, the error GeneratedMessageV3.JPG, the > error GeneratedMessageV3.JPG > > > Dear sir, > The following versions of related tools are set in my running program: > == > Beam 2.4.0 (Direct runner and Spark runner) > Spark 2.2.1 (local mode and standalone mode) > Kafka: 2.11-0.10.1.1 > scala: 2.11.8 > java: 1.8 > == > My programs (KafkaToKafka.java and StarterPipeline.java) are as shown on my > github: [https://github.com/LinRick/beamkafkaIO], > The description of my situation is as: > {color:#14892c}The kafka broker is working and kafkaIO.read (consumer) is > used to capture data from the assigned broker ip > ([http://ubuntu7:9092)|http://ubuntu7:9092)./].{color} > {color:#14892c}The user manual of kafkaIO SDK (on > web:[https://beam.apache.org/documentation/sdks/javadoc/2.4.0/]) indicates > that the following parameters need to be set, and then the kafkaIO can work > well.{color} > {color:#FF}.withBootstrapServers("kafka broker ip:9092"){color} > {color:#FF} .withTopic("kafkasink"){color} > {color:#FF} .withKeyDeserializer(IntegerDeserializer.class){color} > {color:#FF} .withValueDeserializer(StringDeserializer.class) {color} > When i run my program with these settings over direct runner, i can find that > my program perform well. In addition, my running program is the streaming > mode. *However, i run these codes with the same settings (kafkaIO) over spark > runner, and my running program is not the streaming mode and is shutdown*. > Here, as mentioned on the website: > [https://beam.apache.org/documentation/runners/spark/], the performing > program will automatically set streaming mode. > Unfortunately, it failed for my program. > On the other hand, If i set the parameter kafkaIO.read.withMaxNumRecords > (1000) or kafkaIO.read.withMaxReadTime (Duration second), my program will > successfully execute as the batch mode (batch processing). > The steps of performing StarterPipeline.java in my program are: > step1 mvn compile exec:java -Dexec.mainClass=com.itri.beam.StarterPipeline > -Pspark2-runner -Dexec.args="--runner=SparkRunner" > step2 mvn clean package > step3 cp -rf target/beamkafkaIO-0.1.jar /root/ > step4 cd /spark-2.2.1-bin-hadoop2.6/bin > step5 ./spark-submit --class com.itri.beam.StarterPipeline --master local[4] > /root/beamkafkaIO-0.1.jar --runner=SparkRunner > I am not sure if this issue is a bug about kafkaIO or I was wrong with some > parameter settings over spark runner ? > I really can't handle it, so I hope to get help from you. > if any further information is needed, i am glad to be informed and will > provide to you as soon as possible. > I will highly appreciate it if you can help me to deal with this issue. > i am looking forward to hearing from you. > > Sincerely yours, > > Rick > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (BEAM-4632) KafkaIO seems to fail on streaming mode over spark runner
[ https://issues.apache.org/jira/browse/BEAM-4632?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16561540#comment-16561540 ] Rick Lin edited comment on BEAM-4632 at 7/30/18 6:56 AM: - By the way, my configuration setting of kafka broker is: {code:java} /kafka_broker/bin/kafka-producer-perf-test.sh \ --num-records 1000 \ --record-size 100 \ --topic kafkasink \ --throughput 1 \ --producer-props acks=0 bootstrap.servers=ubuntu7:9092 batch.size=1000{code} The display of kafka broker on console is as: ... 49992 records sent,{color:#d04437} 9998.4 records/sec{color} (0.95 MB/sec), 1.0 ms avg latency, 146.0 max latency. 50040 records sent, {color:#d04437}10008.0 records/sec{color} (0.95 MB/sec), 0.2 ms avg latency, 5.0 m ax latency. 50019 records sent, {color:#d04437}10001.8 records/sec{color} (0.95 MB/sec), 0.2 ms avg latency, 1.0 m ax latency. 50011 records sent, {color:#d04437}10002.2 records/sec{color} (0.95 MB/sec), 0.2 ms avg latency, 3.0 m ax latency. 50020 records sent, {color:#d04437}10002.0 records/sec{color} (0.95 MB/sec), 0.2 ms avg latency, 1.0 m ax latency. ... Rick was (Author: ricklin): By the way, my configuration setting of kafka broker is: {code:java} /kafka_broker/bin/kafka-producer-perf-test.sh \ --num-records 1000 \ --record-size 100 \ --topic kafkasink \ --throughput 1 \ --producer-props acks=0 \ bootstrap.servers=ubuntu7:9092 \ batch.size=1000{code} The display of kafka broker on console is as: ... 49992 records sent,{color:#d04437} 9998.4 records/sec{color} (0.95 MB/sec), 1.0 ms avg latency, 146.0 max latency. 50040 records sent, {color:#d04437}10008.0 records/sec{color} (0.95 MB/sec), 0.2 ms avg latency, 5.0 m ax latency. 50019 records sent, {color:#d04437}10001.8 records/sec{color} (0.95 MB/sec), 0.2 ms avg latency, 1.0 m ax latency. 50011 records sent, {color:#d04437}10002.2 records/sec{color} (0.95 MB/sec), 0.2 ms avg latency, 3.0 m ax latency. 50020 records sent, {color:#d04437}10002.0 records/sec{color} (0.95 MB/sec), 0.2 ms avg latency, 1.0 m ax latency. ... Rick > KafkaIO seems to fail on streaming mode over spark runner > - > > Key: BEAM-4632 > URL: https://issues.apache.org/jira/browse/BEAM-4632 > Project: Beam > Issue Type: Bug > Components: io-java-kafka, runner-spark >Affects Versions: 2.4.0 > Environment: Ubuntu 16.04.4 LTS >Reporter: Rick Lin >Assignee: Alexey Romanenko >Priority: Major > Attachments: .withMaxNumRecords(50).JPG, > DB_table_kafkabeamdata_count.JPG, error UnboundedDataset.java 81(0) has > different number of partitions.JPG, the error GeneratedMessageV3.JPG, the > error GeneratedMessageV3.JPG > > > Dear sir, > The following versions of related tools are set in my running program: > == > Beam 2.4.0 (Direct runner and Spark runner) > Spark 2.2.1 (local mode and standalone mode) > Kafka: 2.11-0.10.1.1 > scala: 2.11.8 > java: 1.8 > == > My programs (KafkaToKafka.java and StarterPipeline.java) are as shown on my > github: [https://github.com/LinRick/beamkafkaIO], > The description of my situation is as: > {color:#14892c}The kafka broker is working and kafkaIO.read (consumer) is > used to capture data from the assigned broker ip > ([http://ubuntu7:9092)|http://ubuntu7:9092)./].{color} > {color:#14892c}The user manual of kafkaIO SDK (on > web:[https://beam.apache.org/documentation/sdks/javadoc/2.4.0/]) indicates > that the following parameters need to be set, and then the kafkaIO can work > well.{color} > {color:#FF}.withBootstrapServers("kafka broker ip:9092"){color} > {color:#FF} .withTopic("kafkasink"){color} > {color:#FF} .withKeyDeserializer(IntegerDeserializer.class){color} > {color:#FF} .withValueDeserializer(StringDeserializer.class) {color} > When i run my program with these settings over direct runner, i can find that > my program perform well. In addition, my running program is the streaming > mode. *However, i run these codes with the same settings (kafkaIO) over spark > runner, and my running program is not the streaming mode and is shutdown*. > Here, as mentioned on the website: > [https://beam.apache.org/documentation/runners/spark/], the performing > program will automatically set streaming mode. > Unfortunately, it failed for my program. > On the other hand, If i set the parameter kafkaIO.read.withMaxNumRecords > (1000) or kafkaIO.read.withMaxReadTime (Duration second), my program will > successfully execute as the batch mode (batch processing). > The steps of performing StarterPipeline.java in my program are: > step1 mvn compile exec:java -Dexec.mainClass=com.itri.beam.StarterPipeline > -Pspark2-runner
[jira] [Commented] (BEAM-4632) KafkaIO seems to fail on streaming mode over spark runner
[ https://issues.apache.org/jira/browse/BEAM-4632?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16561536#comment-16561536 ] Rick Lin commented on BEAM-4632: Hi [~aromanenko], For the last question, i only change settings of the SparkPipeline in my program, as: {code:java} SparkPipelineOptions options = PipelineOptionsFactory.fromArgs(args). withValidation().as(SparkPipelineOptions.class); options.setRunner(SparkRunner.class); Pipeline p = Pipeline.create(options); options.setMaxRecordsPerBatch(1000L); options.setSparkMaster("spark://ubuntu8:7077"); {code} , and then i perform the following command line: {noformat} mvn compile exec:java -Dexec.mainClass=com.itri.beam.StarterPipeline -Pspark2-runner -Dexec.args="--runner=SparkRunner"{noformat} There is the error: {panel:title=The error is} [ERROR] Failed to execute goal org.codehaus.mojo:exec-maven-plugin:1.6.0:java (default-cli) on project beamkafkaIO2: An exception occured while executing the Java class. org.apache.spark.SparkException: Checkpoint RDD has a different number of partitions from original RDD. Original RDD [ID: 321, num of partitions: 1]; Checkpoint RDD [ID: 324, num of partitions: 0]. -> [Help 1] {panel} Thanks Rick > KafkaIO seems to fail on streaming mode over spark runner > - > > Key: BEAM-4632 > URL: https://issues.apache.org/jira/browse/BEAM-4632 > Project: Beam > Issue Type: Bug > Components: io-java-kafka, runner-spark >Affects Versions: 2.4.0 > Environment: Ubuntu 16.04.4 LTS >Reporter: Rick Lin >Assignee: Alexey Romanenko >Priority: Major > Attachments: .withMaxNumRecords(50).JPG, > DB_table_kafkabeamdata_count.JPG, error UnboundedDataset.java 81(0) has > different number of partitions.JPG, the error GeneratedMessageV3.JPG, the > error GeneratedMessageV3.JPG > > > Dear sir, > The following versions of related tools are set in my running program: > == > Beam 2.4.0 (Direct runner and Spark runner) > Spark 2.2.1 (local mode and standalone mode) > Kafka: 2.11-0.10.1.1 > scala: 2.11.8 > java: 1.8 > == > My programs (KafkaToKafka.java and StarterPipeline.java) are as shown on my > github: [https://github.com/LinRick/beamkafkaIO], > The description of my situation is as: > {color:#14892c}The kafka broker is working and kafkaIO.read (consumer) is > used to capture data from the assigned broker ip > ([http://ubuntu7:9092)|http://ubuntu7:9092)./].{color} > {color:#14892c}The user manual of kafkaIO SDK (on > web:[https://beam.apache.org/documentation/sdks/javadoc/2.4.0/]) indicates > that the following parameters need to be set, and then the kafkaIO can work > well.{color} > {color:#FF}.withBootstrapServers("kafka broker ip:9092"){color} > {color:#FF} .withTopic("kafkasink"){color} > {color:#FF} .withKeyDeserializer(IntegerDeserializer.class){color} > {color:#FF} .withValueDeserializer(StringDeserializer.class) {color} > When i run my program with these settings over direct runner, i can find that > my program perform well. In addition, my running program is the streaming > mode. *However, i run these codes with the same settings (kafkaIO) over spark > runner, and my running program is not the streaming mode and is shutdown*. > Here, as mentioned on the website: > [https://beam.apache.org/documentation/runners/spark/], the performing > program will automatically set streaming mode. > Unfortunately, it failed for my program. > On the other hand, If i set the parameter kafkaIO.read.withMaxNumRecords > (1000) or kafkaIO.read.withMaxReadTime (Duration second), my program will > successfully execute as the batch mode (batch processing). > The steps of performing StarterPipeline.java in my program are: > step1 mvn compile exec:java -Dexec.mainClass=com.itri.beam.StarterPipeline > -Pspark2-runner -Dexec.args="--runner=SparkRunner" > step2 mvn clean package > step3 cp -rf target/beamkafkaIO-0.1.jar /root/ > step4 cd /spark-2.2.1-bin-hadoop2.6/bin > step5 ./spark-submit --class com.itri.beam.StarterPipeline --master local[4] > /root/beamkafkaIO-0.1.jar --runner=SparkRunner > I am not sure if this issue is a bug about kafkaIO or I was wrong with some > parameter settings over spark runner ? > I really can't handle it, so I hope to get help from you. > if any further information is needed, i am glad to be informed and will > provide to you as soon as possible. > I will highly appreciate it if you can help me to deal with this issue. > i am looking forward to hearing from you. > > Sincerely yours, > > Rick > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (BEAM-4632) KafkaIO seems to fail on streaming mode over spark runner
[ https://issues.apache.org/jira/browse/BEAM-4632?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16561536#comment-16561536 ] Rick Lin edited comment on BEAM-4632 at 7/30/18 7:00 AM: - Hi [~aromanenko], For the last question, i only change settings of the SparkPipeline in my program, as: {code:java} SparkPipelineOptions options = PipelineOptionsFactory.fromArgs(args). withValidation().as(SparkPipelineOptions.class); options.setRunner(SparkRunner.class); Pipeline p = Pipeline.create(options); options.setMaxRecordsPerBatch(1000L); options.setSparkMaster("spark://ubuntu8:7077"); {code} {code:java} PCollection> readData = p.apply(KafkaIO. read() .withBootstrapServers("ubuntu7:9092") .withTopic("kafkasink") .withKeyDeserializer(IntegerDeserializer.class) .withValueDeserializer(StringDeserializer.class) //.withMaxNumRecords(50) .withoutMetadata());{code} {code:java} ... ... ... p.run().waitUntilFinish();{code} , and then i perform the following command line: {noformat} mvn compile exec:java -Dexec.mainClass=com.itri.beam.StarterPipeline -Pspark2-runner -Dexec.args="--runner=SparkRunner"{noformat} There is the error: {panel:title=The error is} [ERROR] Failed to execute goal org.codehaus.mojo:exec-maven-plugin:1.6.0:java (default-cli) on project beamkafkaIO2: An exception occured while executing the Java class. org.apache.spark.SparkException: Checkpoint RDD has a different number of partitions from original RDD. Original RDD [ID: 321, num of partitions: 1]; Checkpoint RDD [ID: 324, num of partitions: 0]. -> [Help 1] {panel} Thanks Rick was (Author: ricklin): Hi [~aromanenko], For the last question, i only change settings of the SparkPipeline in my program, as: {code:java} SparkPipelineOptions options = PipelineOptionsFactory.fromArgs(args). withValidation().as(SparkPipelineOptions.class); options.setRunner(SparkRunner.class); Pipeline p = Pipeline.create(options); options.setMaxRecordsPerBatch(1000L); options.setSparkMaster("spark://ubuntu8:7077"); {code} , and then i perform the following command line: {noformat} mvn compile exec:java -Dexec.mainClass=com.itri.beam.StarterPipeline -Pspark2-runner -Dexec.args="--runner=SparkRunner"{noformat} There is the error: {panel:title=The error is} [ERROR] Failed to execute goal org.codehaus.mojo:exec-maven-plugin:1.6.0:java (default-cli) on project beamkafkaIO2: An exception occured while executing the Java class. org.apache.spark.SparkException: Checkpoint RDD has a different number of partitions from original RDD. Original RDD [ID: 321, num of partitions: 1]; Checkpoint RDD [ID: 324, num of partitions: 0]. -> [Help 1] {panel} Thanks Rick > KafkaIO seems to fail on streaming mode over spark runner > - > > Key: BEAM-4632 > URL: https://issues.apache.org/jira/browse/BEAM-4632 > Project: Beam > Issue Type: Bug > Components: io-java-kafka, runner-spark >Affects Versions: 2.4.0 > Environment: Ubuntu 16.04.4 LTS >Reporter: Rick Lin >Assignee: Alexey Romanenko >Priority: Major > Attachments: .withMaxNumRecords(50).JPG, > DB_table_kafkabeamdata_count.JPG, error UnboundedDataset.java 81(0) has > different number of partitions.JPG, the error GeneratedMessageV3.JPG, the > error GeneratedMessageV3.JPG > > > Dear sir, > The following versions of related tools are set in my running program: > == > Beam 2.4.0 (Direct runner and Spark runner) > Spark 2.2.1 (local mode and standalone mode) > Kafka: 2.11-0.10.1.1 > scala: 2.11.8 > java: 1.8 > == > My programs (KafkaToKafka.java and StarterPipeline.java) are as shown on my > github: [https://github.com/LinRick/beamkafkaIO], > The description of my situation is as: > {color:#14892c}The kafka broker is working and kafkaIO.read (consumer) is > used to capture data from the assigned broker ip > ([http://ubuntu7:9092)|http://ubuntu7:9092)./].{color} > {color:#14892c}The user manual of kafkaIO SDK (on > web:[https://beam.apache.org/documentation/sdks/javadoc/2.4.0/]) indicates > that the following parameters need to be set, and then the kafkaIO can work > well.{color} > {color:#FF}.withBootstrapServers("kafka broker ip:9092"){color} > {color:#FF} .withTopic("kafkasink"){color} > {color:#FF} .withKeyDeserializer(IntegerDeserializer.class){color} > {color:#FF} .withValueDeserializer(StringDeserializer.class) {color} > When i run my program with these settings over direct runner, i can find that > my program perform well. In addition, my running program is the streaming > mode. *However, i run these codes with the same settings (kafkaIO) over spark > runner, and my running program is not the streaming mode and is shutdown*. > Here, as
[jira] [Comment Edited] (BEAM-4632) KafkaIO seems to fail on streaming mode over spark runner
[ https://issues.apache.org/jira/browse/BEAM-4632?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16561536#comment-16561536 ] Rick Lin edited comment on BEAM-4632 at 7/30/18 7:00 AM: - Hi [~aromanenko], For the last question, i only change settings of the SparkPipeline in my program, as: {code:java} SparkPipelineOptions options = PipelineOptionsFactory.fromArgs(args). withValidation().as(SparkPipelineOptions.class); options.setRunner(SparkRunner.class); Pipeline p = Pipeline.create(options); options.setMaxRecordsPerBatch(1000L); options.setSparkMaster("spark://ubuntu8:7077"); {code} {code:java} PCollection> readData = p.apply(KafkaIO. read() .withBootstrapServers("ubuntu7:9092") .withTopic("kafkasink") .withKeyDeserializer(IntegerDeserializer.class) .withValueDeserializer(StringDeserializer.class) //.withMaxNumRecords(50) .withoutMetadata());{code} {code:java} ... ... ... p.run().waitUntilFinish();{code} , and then i perform the following command line: {noformat} mvn compile exec:java -Dexec.mainClass=com.itri.beam.StarterPipeline -Pspark2-runner -Dexec.args="--runner=SparkRunner"{noformat} There is the error: {panel:title=The error is} [ERROR] Failed to execute goal org.codehaus.mojo:exec-maven-plugin:1.6.0:java (default-cli) on project beamkafkaIO2: An exception occured while executing the Java class. org.apache.spark.SparkException: Checkpoint RDD has a different number of partitions from original RDD. Original RDD [ID: 321, num of partitions: 1]; Checkpoint RDD [ID: 324, num of partitions: 0]. -> [Help 1] {panel} Thanks Rick was (Author: ricklin): Hi [~aromanenko], For the last question, i only change settings of the SparkPipeline in my program, as: {code:java} SparkPipelineOptions options = PipelineOptionsFactory.fromArgs(args). withValidation().as(SparkPipelineOptions.class); options.setRunner(SparkRunner.class); Pipeline p = Pipeline.create(options); options.setMaxRecordsPerBatch(1000L); options.setSparkMaster("spark://ubuntu8:7077"); {code} {code:java} PCollection> readData = p.apply(KafkaIO. read() .withBootstrapServers("ubuntu7:9092") .withTopic("kafkasink") .withKeyDeserializer(IntegerDeserializer.class) .withValueDeserializer(StringDeserializer.class) //.withMaxNumRecords(50) .withoutMetadata());{code} {code:java} ... ... ... p.run().waitUntilFinish();{code} , and then i perform the following command line: {noformat} mvn compile exec:java -Dexec.mainClass=com.itri.beam.StarterPipeline -Pspark2-runner -Dexec.args="--runner=SparkRunner"{noformat} There is the error: {panel:title=The error is} [ERROR] Failed to execute goal org.codehaus.mojo:exec-maven-plugin:1.6.0:java (default-cli) on project beamkafkaIO2: An exception occured while executing the Java class. org.apache.spark.SparkException: Checkpoint RDD has a different number of partitions from original RDD. Original RDD [ID: 321, num of partitions: 1]; Checkpoint RDD [ID: 324, num of partitions: 0]. -> [Help 1] {panel} Thanks Rick > KafkaIO seems to fail on streaming mode over spark runner > - > > Key: BEAM-4632 > URL: https://issues.apache.org/jira/browse/BEAM-4632 > Project: Beam > Issue Type: Bug > Components: io-java-kafka, runner-spark >Affects Versions: 2.4.0 > Environment: Ubuntu 16.04.4 LTS >Reporter: Rick Lin >Assignee: Alexey Romanenko >Priority: Major > Attachments: .withMaxNumRecords(50).JPG, > DB_table_kafkabeamdata_count.JPG, error UnboundedDataset.java 81(0) has > different number of partitions.JPG, the error GeneratedMessageV3.JPG, the > error GeneratedMessageV3.JPG > > > Dear sir, > The following versions of related tools are set in my running program: > == > Beam 2.4.0 (Direct runner and Spark runner) > Spark 2.2.1 (local mode and standalone mode) > Kafka: 2.11-0.10.1.1 > scala: 2.11.8 > java: 1.8 > == > My programs (KafkaToKafka.java and StarterPipeline.java) are as shown on my > github: [https://github.com/LinRick/beamkafkaIO], > The description of my situation is as: > {color:#14892c}The kafka broker is working and kafkaIO.read (consumer) is > used to capture data from the assigned broker ip > ([http://ubuntu7:9092)|http://ubuntu7:9092)./].{color} > {color:#14892c}The user manual of kafkaIO SDK (on > web:[https://beam.apache.org/documentation/sdks/javadoc/2.4.0/]) indicates > that the following parameters need to be set, and then the kafkaIO can work > well.{color} > {color:#FF}.withBootstrapServers("kafka broker ip:9092"){color} > {color:#FF} .withTopic("kafkasink"){color} > {color:#FF} .withKeyDeserializer(IntegerDeserializer.class){color} > {color:#FF}
[jira] [Comment Edited] (BEAM-4632) KafkaIO seems to fail on streaming mode over spark runner
[ https://issues.apache.org/jira/browse/BEAM-4632?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16561536#comment-16561536 ] Rick Lin edited comment on BEAM-4632 at 7/30/18 8:16 AM: - Hi [~aromanenko], For the last question, i only change settings of the SparkPipeline in my program, as: {code:java} SparkPipelineOptions options = PipelineOptionsFactory.fromArgs(args). withValidation().as(SparkPipelineOptions.class); options.setRunner(SparkRunner.class); Pipeline p = Pipeline.create(options); options.setMaxRecordsPerBatch(1000L); options.setSparkMaster("spark://ubuntu8:7077"); {code} {code:java} PCollection> readData = p.apply(KafkaIO. read() .withBootstrapServers("ubuntu7:9092") .withTopic("kafkasink") .withKeyDeserializer(IntegerDeserializer.class) .withValueDeserializer(StringDeserializer.class) //.withMaxNumRecords(50) .withoutMetadata());{code} {code:java} ... ... ... p.run().waitUntilFinish();{code} , and then i perform the following command line: {noformat} mvn compile exec:java -Dexec.mainClass=com.itri.beam.StarterPipeline -Pspark2-runner -Dexec.args="--runner=SparkRunner"{noformat} There is the error: {panel:title=The error is} [ERROR] Failed to execute goal org.codehaus.mojo:exec-maven-plugin:1.6.0:java (default-cli) on project beamkafkaIO: An exception occured while executing the Java class. org.apache.spark.SparkException: Checkpoint RDD has a different number of partitions from original RDD. Original RDD [ID: 321, num of partitions: 1]; Checkpoint RDD [ID: 324, num of partitions: 0]. -> [Help 1] {panel} Thanks Rick was (Author: ricklin): Hi [~aromanenko], For the last question, i only change settings of the SparkPipeline in my program, as: {code:java} SparkPipelineOptions options = PipelineOptionsFactory.fromArgs(args). withValidation().as(SparkPipelineOptions.class); options.setRunner(SparkRunner.class); Pipeline p = Pipeline.create(options); options.setMaxRecordsPerBatch(1000L); options.setSparkMaster("spark://ubuntu8:7077"); {code} {code:java} PCollection> readData = p.apply(KafkaIO. read() .withBootstrapServers("ubuntu7:9092") .withTopic("kafkasink") .withKeyDeserializer(IntegerDeserializer.class) .withValueDeserializer(StringDeserializer.class) //.withMaxNumRecords(50) .withoutMetadata());{code} {code:java} ... ... ... p.run().waitUntilFinish();{code} , and then i perform the following command line: {noformat} mvn compile exec:java -Dexec.mainClass=com.itri.beam.StarterPipeline -Pspark2-runner -Dexec.args="--runner=SparkRunner"{noformat} There is the error: {panel:title=The error is} [ERROR] Failed to execute goal org.codehaus.mojo:exec-maven-plugin:1.6.0:java (default-cli) on project beamkafkaIO2: An exception occured while executing the Java class. org.apache.spark.SparkException: Checkpoint RDD has a different number of partitions from original RDD. Original RDD [ID: 321, num of partitions: 1]; Checkpoint RDD [ID: 324, num of partitions: 0]. -> [Help 1] {panel} Thanks Rick > KafkaIO seems to fail on streaming mode over spark runner > - > > Key: BEAM-4632 > URL: https://issues.apache.org/jira/browse/BEAM-4632 > Project: Beam > Issue Type: Bug > Components: io-java-kafka, runner-spark >Affects Versions: 2.4.0 > Environment: Ubuntu 16.04.4 LTS >Reporter: Rick Lin >Assignee: Alexey Romanenko >Priority: Major > Attachments: .withMaxNumRecords(50).JPG, > DB_table_kafkabeamdata_count.JPG, error UnboundedDataset.java 81(0) has > different number of partitions.JPG, the error GeneratedMessageV3.JPG, the > error GeneratedMessageV3.JPG > > > Dear sir, > The following versions of related tools are set in my running program: > == > Beam 2.4.0 (Direct runner and Spark runner) > Spark 2.2.1 (local mode and standalone mode) > Kafka: 2.11-0.10.1.1 > scala: 2.11.8 > java: 1.8 > == > My programs (KafkaToKafka.java and StarterPipeline.java) are as shown on my > github: [https://github.com/LinRick/beamkafkaIO], > The description of my situation is as: > {color:#14892c}The kafka broker is working and kafkaIO.read (consumer) is > used to capture data from the assigned broker ip > ([http://ubuntu7:9092)|http://ubuntu7:9092)./].{color} > {color:#14892c}The user manual of kafkaIO SDK (on > web:[https://beam.apache.org/documentation/sdks/javadoc/2.4.0/]) indicates > that the following parameters need to be set, and then the kafkaIO can work > well.{color} > {color:#FF}.withBootstrapServers("kafka broker ip:9092"){color} > {color:#FF} .withTopic("kafkasink"){color} > {color:#FF} .withKeyDeserializer(IntegerDeserializer.class){color} > {color:#FF} .withValueDeserializer(StringDeserializer.class)
[jira] [Created] (BEAM-3770) The problem of kafkaIO sdk for data latency
Rick Lin created BEAM-3770: -- Summary: The problem of kafkaIO sdk for data latency Key: BEAM-3770 URL: https://issues.apache.org/jira/browse/BEAM-3770 Project: Beam Issue Type: Improvement Components: io-java-kafka Affects Versions: 2.0.0 Environment: For repeating my situation, my running environment is: OS: Ubuntn 14.04.3 LTS JAVA: JDK 1.7 Beam 2.0.0 (with Direct runner) Kafka 2.10-0.10.1.1 Maven 3.5.0, in which dependencies are listed in pom.xml: org.apache.beam beam-sdks-java-core 2.0.0 org.apache.beam beam-runners-direct-java 2.0.0 runtime org.apache.beam beam-sdks-java-io-kafka 2.0.0 org.apache.kafka kafka-clients 0.10.0.1 Reporter: Rick Lin Assignee: Raghu Angadi Fix For: 2.0.0 Dear all, I am using the kafkaIO sdk in my project (Beam 2.0.0 with Direct runner). With using this sdk, there are a situation about *data* *latency*, and the description of situation is in the following. The data come from kafak with a fixed speed: 100 data size/ 1 sec. I create a fixed window within 1 sec without delay. I found that the data size is 70, 80, 104, or greater than or equal to 104. After one day, the data latency happens in my running time, and the data size will be only 10 in each window. *In order to clearly explain it, I also provide my code in the following.* " PipelineOptions readOptions = PipelineOptionsFactory._create_(); *final* Pipeline p = Pipeline._create_(readOptions); PCollection>> readData = p.apply(KafkaIO. _read_() .withBootstrapServers("127.0.0.1:9092") .withTopic("kafkasink") .withKeyDeserializer(StringDeserializer.*class*) .withValueDeserializer(StringDeserializer.*class*) .withoutMetadata()) .apply(ParDo._of_(*new* +DoFn , TimestampedValue >>()+ { @ProcessElement *public* *void* test(ProcessContext c) *throws* ParseException { String element = c.element().getValue(); *try* { JsonNode arrNode = *new* ObjectMapper().readTree(element); String t = arrNode.path("v").findValue("Timestamp").textValue(); DateTimeFormatter formatter = DateTimeFormatter._ofPattern_("MM/dd/ HH:mm:ss."); LocalDateTime dateTime = LocalDateTime._parse_(t, formatter); java.time.Instant java_instant = dateTime.atZone(ZoneId._systemDefault_()).toInstant(); Instant timestamp = *new* Instant(java_instant.toEpochMilli()); c.output(TimestampedValue._of_(c.element(), timestamp)); } *catch* (JsonGenerationException e) { e.printStackTrace(); } *catch* (JsonMappingException e) { e.printStackTrace(); } *catch* (IOException e) { e.printStackTrace(); } }})); PCollection >> readDivideData = readData.apply( Window. >> _into_(FixedWindows._of_(Duration._standardSeconds_(1)) .withOffset(Duration.*_ZERO_*)) .triggering(AfterWatermark._pastEndOfWindow_() .withLateFirings(AfterProcessingTime._pastFirstElementInPane_() .plusDelayOf(Duration.*_ZERO_*))) .withAllowedLateness(Duration.*_ZERO_*) .discardingFiredPanes());" *In addition, the running result is as shown in the following.* "data-size=104 coming-data-time=2018-02-27 02:00:49.117 window-time=2018-02-27 02:00:49.999 data-size=78 coming-data-time=2018-02-27 02:00:50.318 window-time=2018-02-27 02:00:50.999 data-size=104 coming-data-time=2018-02-27 02:00:51.102 window-time=2018-02-27 02:00:51.999 After one day: data-size=10 coming-data-time=2018-02-28 02:05:48.217 window-time=2018-03-01 10:35:16.999 " If you have any idea about the problem (data latency), I am looking forward to hearing from you. Thanks Rick -- This message was sent by Atlassian JIRA (v7.6.3#76005)