[jira] [Created] (BEAM-2952) How to use KV.OrderByKey

2017-09-12 Thread Rick Lin (JIRA)
Rick Lin created BEAM-2952:
--

 Summary: How to use KV.OrderByKey
 Key: BEAM-2952
 URL: https://issues.apache.org/jira/browse/BEAM-2952
 Project: Beam
  Issue Type: New Feature
  Components: examples-java
Reporter: Rick Lin
Assignee: Reuven Lax
 Fix For: 2.1.0


Hi all,
I  have a question how to use the beam java sdk: KV.OrderByKey
My java code is as:
int[] key=new int[] {2,1,3,4,5};
double[] value=new double[] {1.0,1.0,1.0,1.0,1.0};
List> KVlist = new ArrayList<>();
List> KVtest = new ArrayList<>();
int n=value.length;
for (int i=0; i> t1=p.apply("create data",Create.of(KVlist));
p.run;

Thanks

Rick




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (BEAM-3073) Connect to Apache ignite via JdbcIO sdk

2017-10-18 Thread Rick Lin (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-3073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16208867#comment-16208867
 ] 

Rick Lin edited comment on BEAM-3073 at 10/18/17 7:02 AM:
--

There are details about my run environment

*OS:*
Distributor ID: Ubuntu
Description:Ubuntu 14.04.5 LTS
Release:14.04
Codename:   trusty
*Port:*
11211 (LISTEN)
*pom.xml:*

org.apache.ignite
ignite-core
${ignite.version}


Here, I have not import any class about ignite package
In addition, the ignite is running.
Thanks
Rick




was (Author: ricklin):
There are details about my run environment

*OS:*
Distributor ID: Ubuntu
Description:Ubuntu 14.04.5 LTS
Release:14.04
Codename:   trusty
*Port:*
11211 (LISTEN)
*pom.xml:*

org.apache.ignite
ignite-core
${ignite.version}


Here, I have not import any class about ignite package

Thanks
Rick



> Connect to Apache ignite via JdbcIO sdk
> ---
>
> Key: BEAM-3073
> URL: https://issues.apache.org/jira/browse/BEAM-3073
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-java-extensions
>Reporter: Rick Lin
>Assignee: Jean-Baptiste Onofré
>Priority: Minor
>
> Hi all,
> {color:#14892c}I tried to connect Apache Ignite(In-memory) via the beam's 
> sdk:org.apache.beam.sdk.io.jdbc.JdbcIO
> Here, i am not sure if the JdbcIO sdk only is provided for some specific 
> Database: MySQL(disk), postgreSQL(disk)?{color}
> my java test code is as follows:
> import java.sql.PreparedStatement;
> import java.sql.SQLException;
> import java.util.ArrayList;
> import java.util.List;
> import org.apache.beam.sdk.Pipeline;
> import org.apache.beam.sdk.io.jdbc.JdbcIO;
> import org.apache.beam.sdk.options.PipelineOptionsFactory;
> import org.apache.beam.sdk.transforms.Create;
> import org.apache.beam.sdk.values.KV;
> import org.apache.beam.sdk.values.PCollection;
> public class BeamtoJDBC {
>   public static void main(String[] args) {
>   Integer[] value=new Integer[] {1,2,3,4,5};
>   List> dataList = new ArrayList<>();
>   int n=value.length;
>   int count=0;
>   for (int i=0; i   {
>   dataList.add(KV.of(count,value[i]));
>   count=count+1;  
>   }
>   
>   Pipeline p = 
> Pipeline.create(PipelineOptionsFactory.fromArgs(args).withValidation().create());
>   
>   PCollection> data=p.apply("create data 
> with time",Create.of(dataList));
>   data.apply(JdbcIO.>write()
>   
> .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration
>   
> .create("org.apache.ignite.IgniteJdbcDriver", 
> "jdbc:ignite://localhost:11211/")
>   )   
>   .withPreparedStatementSetter(new 
> JdbcIO.PreparedStatementSetter>() {
>   public void setParameters(KV Integer> element, PreparedStatement query)
>   throws SQLException {
>   query.setInt(1, 
> element.getKey());
>   query.setInt(2, 
> element.getValue());
>   }
>   })
>   );
>   p.run();
>   }
> }
> {color:#d04437}my error message is: 
> " InvocationTargetException: org.apache.beam.sdk.util.UserCodeException: 
> java.sql.SQLException: Cannot create PoolableConnectionFactory 
> (Failed to establish connection.): Failed to get future result due to waiting 
> timed out. "{color}
> {color:#14892c}I would like to know whether the connection between beam and 
> ignite is feasible or not?{color}
> Thanks
> Rick



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (BEAM-2952) How to use KV.OrderByKey

2017-10-17 Thread Rick Lin (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2952?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rick Lin updated BEAM-2952:
---
Priority: Minor  (was: Major)

> How to use KV.OrderByKey
> 
>
> Key: BEAM-2952
> URL: https://issues.apache.org/jira/browse/BEAM-2952
> Project: Beam
>  Issue Type: New Feature
>  Components: examples-java
>Reporter: Rick Lin
>Assignee: Reuven Lax
>Priority: Minor
> Fix For: Not applicable
>
>
> Hi all,
> I  have a question how to use the beam java sdk: KV.OrderByKey
> My java code is as:
> int[] key=new int[] {2,1,3,4,5};
> double[] value=new double[] {1.0,1.0,1.0,1.0,1.0};
> List> KVlist = new ArrayList<>();
> List> KVtest = new ArrayList<>();
> int n=value.length;
> for (int i=0; i   KVlist.add(KV.of(i, value[i]));
>   System.out.println(KVlist.get(i));
>   }
> PipelineOptions options = PipelineOptionsFactory.create();
> Pipeline p = Pipeline.create(options);
> PCollection> t1=p.apply("create data",Create.of(KVlist));
> p.run;
> Thanks
> Rick



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (BEAM-3073) Connect to Apache ignite via JdbcIO sdk

2017-10-17 Thread Rick Lin (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3073?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rick Lin updated BEAM-3073:
---
Description: 
Hi all,
{color:#14892c}I tried to connect Apache Ignite(In-memory) via the beam's 
sdk:org.apache.beam.sdk.io.jdbc.JdbcIO
Here, i am not sure if the JdbcIO sdk only is provided for some specific 
Database: MySQL(disk), postgreSQL(disk)?{color}
my java test code is as follows:
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.jdbc.JdbcIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;

public class BeamtoJDBC {
public static void main(String[] args) {
Integer[] value=new Integer[] {1,2,3,4,5};
List> dataList = new ArrayList<>();
int n=value.length;
int count=0;
for (int i=0; i> data=p.apply("create data 
with time",Create.of(dataList));

data.apply(JdbcIO.>write()

.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration

.create("org.apache.ignite.IgniteJdbcDriver", "jdbc:ignite://localhost:11211/")
)   
.withPreparedStatementSetter(new 
JdbcIO.PreparedStatementSetter>() {
public void setParameters(KV element, PreparedStatement query)
throws SQLException {
query.setInt(1, 
element.getKey());
query.setInt(2, 
element.getValue());
}
})
);
p.run();

}
}

{color:#d04437}my error message is: 
" InvocationTargetException: org.apache.beam.sdk.util.UserCodeException: 
java.sql.SQLException: Cannot create PoolableConnectionFactory 
(Failed to establish connection.): Failed to get future result due to waiting 
timed out. "{color}

{color:#14892c}I would like to know whether the connection between beam and 
ignite is feasible or not?{color}

Thanks

Rick


  was:
Hi all,
{color:#14892c}I tried to connect Apache Ignite(In-memory) via the beam's 
sdk:org.apache.beam.sdk.io.jdbc.JdbcIO
Here, i am not sure if the JdbcIO sdk only is provided for some specific 
Database: MySQL(disk), postgreSQL(disk)?{color}
my java test code is as follows:
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.jdbc.JdbcIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;

public class BeamtoJDBC {
public static void main(String[] args) {
Integer[] value=new Integer[] {1,2,3,4,5};
List> dataList = new ArrayList<>();
int n=value.length;
int count=0;
for (int i=0; i> data=p.apply("create data 
with time",Create.of(dataList));

data.apply(JdbcIO.>write()

.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration

.create("org.apache.ignite.IgniteJdbcDriver", "jdbc:ignite://localhost:11211/")
)   
.withPreparedStatementSetter(new 
JdbcIO.PreparedStatementSetter>() {
public void setParameters(KV element, PreparedStatement query)
  

[jira] [Created] (BEAM-3073) Connect to Apache ignite via JdbcIO sdk

2017-10-17 Thread Rick Lin (JIRA)
Rick Lin created BEAM-3073:
--

 Summary: Connect to Apache ignite via JdbcIO sdk
 Key: BEAM-3073
 URL: https://issues.apache.org/jira/browse/BEAM-3073
 Project: Beam
  Issue Type: New Feature
  Components: sdk-java-extensions
Reporter: Rick Lin
Assignee: Reuven Lax
Priority: Minor


Hi all,
{color:#14892c}I tried to connect Apache Ignite(In-memory) via the beam's 
sdk:org.apache.beam.sdk.io.jdbc.JdbcIO
Here, i am not sure if the JdbcIO sdk only is provided for some specific 
Database: MySQL(disk), postgreSQL(disk)?{color}
my java test code is as follows:
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.jdbc.JdbcIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;

public class BeamtoJDBC {
public static void main(String[] args) {
Integer[] value=new Integer[] {1,2,3,4,5};
List> dataList = new ArrayList<>();
int n=value.length;
int count=0;
for (int i=0; i> data=p.apply("create data 
with time",Create.of(dataList));

data.apply(JdbcIO.>write()

.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration

.create("org.apache.ignite.IgniteJdbcDriver", "jdbc:ignite://localhost:11211/")
)   
.withPreparedStatementSetter(new 
JdbcIO.PreparedStatementSetter>() {
public void setParameters(KV element, PreparedStatement query)
throws SQLException {
query.setInt(1, 
element.getKey());
query.setInt(2, 
element.getValue());
}
})
);
p.run();

}
}

{color:#d04437}my error message is: 
" InvocationTargetException: org.apache.beam.sdk.util.UserCodeException: 
java.sql.SQLException: Cannot create PoolableConnectionFactory 
(Failed to establish connection.): Failed to get future result due to waiting 
timed out. "{color}

{color:#14892c}I would like to know whether the connection between beam and 
ignite is feasi{color}ble or not?

Thanks

Rick




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-3073) Connect to Apache ignite via JdbcIO sdk

2017-10-18 Thread Rick Lin (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-3073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16208867#comment-16208867
 ] 

Rick Lin commented on BEAM-3073:


There are details about my run environment

*OS:*
Distributor ID: Ubuntu
Description:Ubuntu 14.04.5 LTS
Release:14.04
Codename:   trusty
*Port:*
11211 (LISTEN)
*pom.xml:*

org.apache.ignite
ignite-core
${ignite.version}


Here, I have not import any class about ignite package

Thanks
Rick



> Connect to Apache ignite via JdbcIO sdk
> ---
>
> Key: BEAM-3073
> URL: https://issues.apache.org/jira/browse/BEAM-3073
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-java-extensions
>Reporter: Rick Lin
>Assignee: Jean-Baptiste Onofré
>Priority: Minor
>
> Hi all,
> {color:#14892c}I tried to connect Apache Ignite(In-memory) via the beam's 
> sdk:org.apache.beam.sdk.io.jdbc.JdbcIO
> Here, i am not sure if the JdbcIO sdk only is provided for some specific 
> Database: MySQL(disk), postgreSQL(disk)?{color}
> my java test code is as follows:
> import java.sql.PreparedStatement;
> import java.sql.SQLException;
> import java.util.ArrayList;
> import java.util.List;
> import org.apache.beam.sdk.Pipeline;
> import org.apache.beam.sdk.io.jdbc.JdbcIO;
> import org.apache.beam.sdk.options.PipelineOptionsFactory;
> import org.apache.beam.sdk.transforms.Create;
> import org.apache.beam.sdk.values.KV;
> import org.apache.beam.sdk.values.PCollection;
> public class BeamtoJDBC {
>   public static void main(String[] args) {
>   Integer[] value=new Integer[] {1,2,3,4,5};
>   List> dataList = new ArrayList<>();
>   int n=value.length;
>   int count=0;
>   for (int i=0; i   {
>   dataList.add(KV.of(count,value[i]));
>   count=count+1;  
>   }
>   
>   Pipeline p = 
> Pipeline.create(PipelineOptionsFactory.fromArgs(args).withValidation().create());
>   
>   PCollection> data=p.apply("create data 
> with time",Create.of(dataList));
>   data.apply(JdbcIO.>write()
>   
> .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration
>   
> .create("org.apache.ignite.IgniteJdbcDriver", 
> "jdbc:ignite://localhost:11211/")
>   )   
>   .withPreparedStatementSetter(new 
> JdbcIO.PreparedStatementSetter>() {
>   public void setParameters(KV Integer> element, PreparedStatement query)
>   throws SQLException {
>   query.setInt(1, 
> element.getKey());
>   query.setInt(2, 
> element.getValue());
>   }
>   })
>   );
>   p.run();
>   }
> }
> {color:#d04437}my error message is: 
> " InvocationTargetException: org.apache.beam.sdk.util.UserCodeException: 
> java.sql.SQLException: Cannot create PoolableConnectionFactory 
> (Failed to establish connection.): Failed to get future result due to waiting 
> timed out. "{color}
> {color:#14892c}I would like to know whether the connection between beam and 
> ignite is feasible or not?{color}
> Thanks
> Rick



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (BEAM-3210) The problem about the use of waitUntilFinish() in DirectRunner

2017-11-16 Thread Rick Lin (JIRA)
Rick Lin created BEAM-3210:
--

 Summary: The problem about the use of waitUntilFinish() in 
DirectRunner
 Key: BEAM-3210
 URL: https://issues.apache.org/jira/browse/BEAM-3210
 Project: Beam
  Issue Type: Bug
  Components: runner-direct
Affects Versions: 2.1.0
 Environment: Ubuntn 14.04.3 LTS
JDK 1.8
Beam 2.1.0
Maven 3.5.0
Reporter: Rick Lin
Assignee: Thomas Groh
 Fix For: 2.1.0


Dear sir,

The description of waitUntilFinish() is "waits until the pipeline finishes and 
returns the final status."

In my project, a static variable is used to record a PCollection context, where 
the static variable is a data list type.

For this, I considered the "p.run().waitUntilFinish()"  to wait until the 
pipeline finishes to avoid the loss of record in the data list.

Unfortunately, there is a problem that the data list{color:#d04437} *sometimes* 
{color}may record the "null" value instead of the realistic value

In order to clearly explain, i provide my java code in the following.
{color:#14892c}"import java.io.IOException;
import java.util.ArrayList;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Mean;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.DoFn.ProcessContext;
import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
public class BeamTestStatic extends Thread {
  public static ArrayList myList = new ArrayList();

  public static class StaticTest extends DoFn {
@ProcessElement  
    public void test(ProcessContext c) {
  myList.add(c.element());  
    }   
  }
 public static void main(String[] args) throws IOException {
StaticTest testa=new StaticTest();
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline p = Pipeline.create(options);
  PCollection data=p.apply("Rawdata",
Create.of(1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0,9.0,10.0,));
PCollection listtest= data.apply(ParDo.of(testa));
  p.run().waitUntilFinish();

  System.out.println("mylist_size_a="+myList.size());
 
for (int i = 0; i < myList.size(); i++) {
System.out.println("mylist_data="+myList.get(i));
}
"{color}
In addition, the result of my code is:
{color:#205081}"mylist_size_a=10
mylist_data=null
mylist_data=4.0
mylist_data=5.0
mylist_data=9.0
mylist_data=6.0
mylist_data=1.0
mylist_data=7.0
mylist_data=8.0
mylist_data=10.0
mylist_data=3.0"{color}

If you have any further information, I am glad to be informed.

Thanks

Rick




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (BEAM-4631) kafkIO should run the streaming mode over spark runner

2018-06-24 Thread Rick Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4631?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rick Lin updated BEAM-4631:
---
Summary: kafkIO should run the streaming mode over spark runner  (was: 
kafkIO should be the streaming mode over spark runner)

> kafkIO should run the streaming mode over spark runner
> --
>
> Key: BEAM-4631
> URL: https://issues.apache.org/jira/browse/BEAM-4631
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-kafka, runner-spark
>Affects Versions: 2.4.0
> Environment: Ubuntu 16.04.4 LTS
>Reporter: Rick Lin
>Assignee: Raghu Angadi
>Priority: Major
> Fix For: 2.4.0
>
>
> Dear sir,
> The following versions of related tools are set in my running program:
> ==
> Beam 2.4.0 (Direct runner and Spark runner)
> Spark 2.2.1 (local mode and standalone mode)
> Kafka: 2.11-0.10.1.1
> scala: 2.11.8
> java: 1.8
> ==
> My programs (KafkaToKafka.java and StarterPipeline.java) are as shown on my 
> github: [https://github.com/LinRick/beamkafkaIO],
> The description of my situation is as:
> {color:#14892c}The kafka broker is working and kafkaIO.read (consumer) is 
> used to capture data from the assigned broker ip 
> ([http://ubuntu7:9092)|http://ubuntu7:9092)./].{color}
> {color:#14892c}The user manual of kafkaIO SDK (on 
> web:[https://beam.apache.org/documentation/sdks/javadoc/2.4.0/])  indicates 
> that the following parameters need to be set, and then the kafkaIO can work 
> well.{color}
>  {color:#FF}.withBootstrapServers("kafka broker ip:9092"){color}
> {color:#FF} .withTopic("kafkasink"){color}
> {color:#FF} .withKeyDeserializer(IntegerDeserializer.class){color}
> {color:#FF} .withValueDeserializer(StringDeserializer.class) {color}
> When i run my program with these settings over direct runner, i can find that 
> my program perform well. In addition, my running program is the streaming 
> mode. *However, i run these codes with the same settings (kafkaIO) over spark 
> runner, and my running program is not the streaming mode and is shutdown*. 
> Here, as mentioned on the website: 
> [https://beam.apache.org/documentation/runners/spark/], the performing 
> program will automatically set streaming mode. 
> Unfortunately, it failed for my program.
> On the other hand, If i set the parameter  kafkaIO.read.withMaxNumRecords 
> (1000) or  kafkaIO.read.withMaxReadTime (Duration second), my program will 
> successfully execute as the batch mode (batch processing).
> The steps of performing StarterPipeline.java in my program are:
> step1 mvn compile exec:java -Dexec.mainClass=com.itri.beam.StarterPipeline 
> -Pspark2-runner -Dexec.args="--runner=SparkRunner"
> step2 mvn clean package
> step3 cp -rf target/beamkafkaIO-0.1.jar /root/
> step4 cd /spark-2.2.1-bin-hadoop2.6/bin
> step5 ./spark-submit --class com.itri.beam.StarterPipeline --master local[4] 
> /root/beamkafkaIO-0.1.jar --runner=SparkRunner
> I am not sure if this issue is a bug about kafkaIO or I was wrong with some 
> parameter settings over spark runner ?
> I really can't handle it, so I hope to get help from you.
> if any further information is needed, i am glad to be informed and will 
> provide to you as soon as possible.
> I will highly appreciate it if you can help me to deal with this issue.
> i am looking forward to hearing from you.
>  
> Sincerely yours,
>  
> Rick
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (BEAM-4632) kafkIO should be the streaming mode over spark runner

2018-06-23 Thread Rick Lin (JIRA)
Rick Lin created BEAM-4632:
--

 Summary: kafkIO should be the streaming mode over spark runner
 Key: BEAM-4632
 URL: https://issues.apache.org/jira/browse/BEAM-4632
 Project: Beam
  Issue Type: Bug
  Components: io-java-kafka, runner-spark
Affects Versions: 2.4.0
 Environment: Ubuntu 16.04.4 LTS
Reporter: Rick Lin
Assignee: Raghu Angadi
 Fix For: 2.4.0


Dear sir,

The following versions of related tools are set in my running program:

==

Beam 2.4.0 (Direct runner and Spark runner)

Spark 2.2.1 (local mode and standalone mode)

Kafka: 2.11-0.10.1.1

scala: 2.11.8

java: 1.8

==

My programs (KafkaToKafka.java and StarterPipeline.java) are as shown on my 
github: [https://github.com/LinRick/beamkafkaIO],

The description of my situation is as:

{color:#14892c}The kafka broker is working and kafkaIO.read (consumer) is used 
to capture data from the assigned broker ip 
([http://ubuntu7:9092)|http://ubuntu7:9092)./].{color}

{color:#14892c}The user manual of kafkaIO SDK (on 
web:[https://beam.apache.org/documentation/sdks/javadoc/2.4.0/])  indicates 
that the following parameters need to be set, and then the kafkaIO can work 
well.{color}

 {color:#FF}.withBootstrapServers("kafka broker ip:9092"){color}
{color:#FF} .withTopic("kafkasink"){color}
{color:#FF} .withKeyDeserializer(IntegerDeserializer.class){color}
{color:#FF} .withValueDeserializer(StringDeserializer.class) {color}

When i run my program with these settings over direct runner, i can find that 
my program perform well. In addition, my running program is the streaming mode. 
*However, i run these codes with the same settings (kafkaIO) over spark runner, 
and my running program is not the streaming mode and is shutdown*. Here, as 
mentioned on the website: 
[https://beam.apache.org/documentation/runners/spark/], the performing program 
will automatically set streaming mode. 

Unfortunately, it failed for my program.

On the other hand, If i set the parameter  kafkaIO.read.withMaxNumRecords 
(1000) or  kafkaIO.read.withMaxReadTime (Duration second), my program will 
successfully execute as the batch mode (batch processing).

The steps of performing StarterPipeline.java in my program are:

step1 mvn compile exec:java -Dexec.mainClass=com.itri.beam.StarterPipeline 
-Pspark2-runner -Dexec.args="--runner=SparkRunner"
step2 mvn clean package
step3 cp -rf target/beamkafkaIO-0.1.jar /root/
step4 cd /spark-2.2.1-bin-hadoop2.6/bin
step5 ./spark-submit --class com.itri.beam.StarterPipeline --master local[4] 
/root/beamkafkaIO-0.1.jar --runner=SparkRunner

I am not sure if this issue is a bug about kafkaIO or I was wrong with some 
parameter settings over spark runner ?

I really can't handle it, so I hope to get help from you.

if any further information is needed, i am glad to be informed and will provide 
to you as soon as possible.

I will highly appreciate it if you can help me to deal with this issue.


i am looking forward to hearing from you.
 
Sincerely yours,
 
Rick

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (BEAM-4631) kafkIO should be the streaming mode over spark runner

2018-06-23 Thread Rick Lin (JIRA)
Rick Lin created BEAM-4631:
--

 Summary: kafkIO should be the streaming mode over spark runner
 Key: BEAM-4631
 URL: https://issues.apache.org/jira/browse/BEAM-4631
 Project: Beam
  Issue Type: Bug
  Components: io-java-kafka, runner-spark
Affects Versions: 2.4.0
 Environment: Ubuntu 16.04.4 LTS
Reporter: Rick Lin
Assignee: Raghu Angadi
 Fix For: 2.4.0


Dear sir,

The following versions of related tools are set in my running program:

==

Beam 2.4.0 (Direct runner and Spark runner)

Spark 2.2.1 (local mode and standalone mode)

Kafka: 2.11-0.10.1.1

scala: 2.11.8

java: 1.8

==

My programs (KafkaToKafka.java and StarterPipeline.java) are as shown on my 
github: [https://github.com/LinRick/beamkafkaIO],

The description of my situation is as:

{color:#14892c}The kafka broker is working and kafkaIO.read (consumer) is used 
to capture data from the assigned broker ip 
([http://ubuntu7:9092)|http://ubuntu7:9092)./].{color}

{color:#14892c}The user manual of kafkaIO SDK (on 
web:[https://beam.apache.org/documentation/sdks/javadoc/2.4.0/])  indicates 
that the following parameters need to be set, and then the kafkaIO can work 
well.{color}

 {color:#FF}.withBootstrapServers("kafka broker ip:9092"){color}
{color:#FF} .withTopic("kafkasink"){color}
{color:#FF} .withKeyDeserializer(IntegerDeserializer.class){color}
{color:#FF} .withValueDeserializer(StringDeserializer.class) {color}

When i run my program with these settings over direct runner, i can find that 
my program perform well. In addition, my running program is the streaming mode. 
*However, i run these codes with the same settings (kafkaIO) over spark runner, 
and my running program is not the streaming mode and is shutdown*. Here, as 
mentioned on the website: 
[https://beam.apache.org/documentation/runners/spark/], the performing program 
will automatically set streaming mode. 

Unfortunately, it failed for my program.

On the other hand, If i set the parameter  kafkaIO.read.withMaxNumRecords 
(1000) or  kafkaIO.read.withMaxReadTime (Duration second), my program will 
successfully execute as the batch mode (batch processing).

The steps of performing StarterPipeline.java in my program are:

step1 mvn compile exec:java -Dexec.mainClass=com.itri.beam.StarterPipeline 
-Pspark2-runner -Dexec.args="--runner=SparkRunner"
step2 mvn clean package
step3 cp -rf target/beamkafkaIO-0.1.jar /root/
step4 cd /spark-2.2.1-bin-hadoop2.6/bin
step5 ./spark-submit --class com.itri.beam.StarterPipeline --master local[4] 
/root/beamkafkaIO-0.1.jar --runner=SparkRunner

I am not sure if this issue is a bug about kafkaIO or I was wrong with some 
parameter settings over spark runner ?

I really can't handle it, so I hope to get help from you.

if any further information is needed, i am glad to be informed and will provide 
to you as soon as possible.

I will highly appreciate it if you can help me to deal with this issue.


i am looking forward to hearing from you.
 
Sincerely yours,
 
Rick

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (BEAM-4632) KafkaIO seems to fail on streaming mode over spark runner

2018-06-26 Thread Rick Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4632?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rick Lin updated BEAM-4632:
---
Attachment: the error GeneratedMessageV3.JPG

> KafkaIO seems to fail on streaming mode over spark runner
> -
>
> Key: BEAM-4632
> URL: https://issues.apache.org/jira/browse/BEAM-4632
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-kafka, runner-spark
>Affects Versions: 2.4.0
> Environment: Ubuntu 16.04.4 LTS
>Reporter: Rick Lin
>Assignee: Alexey Romanenko
>Priority: Major
> Attachments: the error GeneratedMessageV3.JPG
>
>
> Dear sir,
> The following versions of related tools are set in my running program:
> ==
> Beam 2.4.0 (Direct runner and Spark runner)
> Spark 2.2.1 (local mode and standalone mode)
> Kafka: 2.11-0.10.1.1
> scala: 2.11.8
> java: 1.8
> ==
> My programs (KafkaToKafka.java and StarterPipeline.java) are as shown on my 
> github: [https://github.com/LinRick/beamkafkaIO],
> The description of my situation is as:
> {color:#14892c}The kafka broker is working and kafkaIO.read (consumer) is 
> used to capture data from the assigned broker ip 
> ([http://ubuntu7:9092)|http://ubuntu7:9092)./].{color}
> {color:#14892c}The user manual of kafkaIO SDK (on 
> web:[https://beam.apache.org/documentation/sdks/javadoc/2.4.0/])  indicates 
> that the following parameters need to be set, and then the kafkaIO can work 
> well.{color}
>  {color:#FF}.withBootstrapServers("kafka broker ip:9092"){color}
> {color:#FF} .withTopic("kafkasink"){color}
> {color:#FF} .withKeyDeserializer(IntegerDeserializer.class){color}
> {color:#FF} .withValueDeserializer(StringDeserializer.class) {color}
> When i run my program with these settings over direct runner, i can find that 
> my program perform well. In addition, my running program is the streaming 
> mode. *However, i run these codes with the same settings (kafkaIO) over spark 
> runner, and my running program is not the streaming mode and is shutdown*. 
> Here, as mentioned on the website: 
> [https://beam.apache.org/documentation/runners/spark/], the performing 
> program will automatically set streaming mode. 
> Unfortunately, it failed for my program.
> On the other hand, If i set the parameter  kafkaIO.read.withMaxNumRecords 
> (1000) or  kafkaIO.read.withMaxReadTime (Duration second), my program will 
> successfully execute as the batch mode (batch processing).
> The steps of performing StarterPipeline.java in my program are:
> step1 mvn compile exec:java -Dexec.mainClass=com.itri.beam.StarterPipeline 
> -Pspark2-runner -Dexec.args="--runner=SparkRunner"
> step2 mvn clean package
> step3 cp -rf target/beamkafkaIO-0.1.jar /root/
> step4 cd /spark-2.2.1-bin-hadoop2.6/bin
> step5 ./spark-submit --class com.itri.beam.StarterPipeline --master local[4] 
> /root/beamkafkaIO-0.1.jar --runner=SparkRunner
> I am not sure if this issue is a bug about kafkaIO or I was wrong with some 
> parameter settings over spark runner ?
> I really can't handle it, so I hope to get help from you.
> if any further information is needed, i am glad to be informed and will 
> provide to you as soon as possible.
> I will highly appreciate it if you can help me to deal with this issue.
> i am looking forward to hearing from you.
>  
> Sincerely yours,
>  
> Rick
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (BEAM-4632) KafkaIO seems to fail on streaming mode over spark runner

2018-06-26 Thread Rick Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4632?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rick Lin updated BEAM-4632:
---
Attachment: DB_table_kafkabeamdata_count.JPG

> KafkaIO seems to fail on streaming mode over spark runner
> -
>
> Key: BEAM-4632
> URL: https://issues.apache.org/jira/browse/BEAM-4632
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-kafka, runner-spark
>Affects Versions: 2.4.0
> Environment: Ubuntu 16.04.4 LTS
>Reporter: Rick Lin
>Assignee: Alexey Romanenko
>Priority: Major
> Attachments: DB_table_kafkabeamdata_count.JPG, the error 
> GeneratedMessageV3.JPG, the error GeneratedMessageV3.JPG
>
>
> Dear sir,
> The following versions of related tools are set in my running program:
> ==
> Beam 2.4.0 (Direct runner and Spark runner)
> Spark 2.2.1 (local mode and standalone mode)
> Kafka: 2.11-0.10.1.1
> scala: 2.11.8
> java: 1.8
> ==
> My programs (KafkaToKafka.java and StarterPipeline.java) are as shown on my 
> github: [https://github.com/LinRick/beamkafkaIO],
> The description of my situation is as:
> {color:#14892c}The kafka broker is working and kafkaIO.read (consumer) is 
> used to capture data from the assigned broker ip 
> ([http://ubuntu7:9092)|http://ubuntu7:9092)./].{color}
> {color:#14892c}The user manual of kafkaIO SDK (on 
> web:[https://beam.apache.org/documentation/sdks/javadoc/2.4.0/])  indicates 
> that the following parameters need to be set, and then the kafkaIO can work 
> well.{color}
>  {color:#FF}.withBootstrapServers("kafka broker ip:9092"){color}
> {color:#FF} .withTopic("kafkasink"){color}
> {color:#FF} .withKeyDeserializer(IntegerDeserializer.class){color}
> {color:#FF} .withValueDeserializer(StringDeserializer.class) {color}
> When i run my program with these settings over direct runner, i can find that 
> my program perform well. In addition, my running program is the streaming 
> mode. *However, i run these codes with the same settings (kafkaIO) over spark 
> runner, and my running program is not the streaming mode and is shutdown*. 
> Here, as mentioned on the website: 
> [https://beam.apache.org/documentation/runners/spark/], the performing 
> program will automatically set streaming mode. 
> Unfortunately, it failed for my program.
> On the other hand, If i set the parameter  kafkaIO.read.withMaxNumRecords 
> (1000) or  kafkaIO.read.withMaxReadTime (Duration second), my program will 
> successfully execute as the batch mode (batch processing).
> The steps of performing StarterPipeline.java in my program are:
> step1 mvn compile exec:java -Dexec.mainClass=com.itri.beam.StarterPipeline 
> -Pspark2-runner -Dexec.args="--runner=SparkRunner"
> step2 mvn clean package
> step3 cp -rf target/beamkafkaIO-0.1.jar /root/
> step4 cd /spark-2.2.1-bin-hadoop2.6/bin
> step5 ./spark-submit --class com.itri.beam.StarterPipeline --master local[4] 
> /root/beamkafkaIO-0.1.jar --runner=SparkRunner
> I am not sure if this issue is a bug about kafkaIO or I was wrong with some 
> parameter settings over spark runner ?
> I really can't handle it, so I hope to get help from you.
> if any further information is needed, i am glad to be informed and will 
> provide to you as soon as possible.
> I will highly appreciate it if you can help me to deal with this issue.
> i am looking forward to hearing from you.
>  
> Sincerely yours,
>  
> Rick
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (BEAM-4632) KafkaIO seems to fail on streaming mode over spark runner

2018-06-26 Thread Rick Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4632?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rick Lin updated BEAM-4632:
---
Attachment: the error GeneratedMessageV3.JPG

> KafkaIO seems to fail on streaming mode over spark runner
> -
>
> Key: BEAM-4632
> URL: https://issues.apache.org/jira/browse/BEAM-4632
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-kafka, runner-spark
>Affects Versions: 2.4.0
> Environment: Ubuntu 16.04.4 LTS
>Reporter: Rick Lin
>Assignee: Alexey Romanenko
>Priority: Major
> Attachments: the error GeneratedMessageV3.JPG, the error 
> GeneratedMessageV3.JPG
>
>
> Dear sir,
> The following versions of related tools are set in my running program:
> ==
> Beam 2.4.0 (Direct runner and Spark runner)
> Spark 2.2.1 (local mode and standalone mode)
> Kafka: 2.11-0.10.1.1
> scala: 2.11.8
> java: 1.8
> ==
> My programs (KafkaToKafka.java and StarterPipeline.java) are as shown on my 
> github: [https://github.com/LinRick/beamkafkaIO],
> The description of my situation is as:
> {color:#14892c}The kafka broker is working and kafkaIO.read (consumer) is 
> used to capture data from the assigned broker ip 
> ([http://ubuntu7:9092)|http://ubuntu7:9092)./].{color}
> {color:#14892c}The user manual of kafkaIO SDK (on 
> web:[https://beam.apache.org/documentation/sdks/javadoc/2.4.0/])  indicates 
> that the following parameters need to be set, and then the kafkaIO can work 
> well.{color}
>  {color:#FF}.withBootstrapServers("kafka broker ip:9092"){color}
> {color:#FF} .withTopic("kafkasink"){color}
> {color:#FF} .withKeyDeserializer(IntegerDeserializer.class){color}
> {color:#FF} .withValueDeserializer(StringDeserializer.class) {color}
> When i run my program with these settings over direct runner, i can find that 
> my program perform well. In addition, my running program is the streaming 
> mode. *However, i run these codes with the same settings (kafkaIO) over spark 
> runner, and my running program is not the streaming mode and is shutdown*. 
> Here, as mentioned on the website: 
> [https://beam.apache.org/documentation/runners/spark/], the performing 
> program will automatically set streaming mode. 
> Unfortunately, it failed for my program.
> On the other hand, If i set the parameter  kafkaIO.read.withMaxNumRecords 
> (1000) or  kafkaIO.read.withMaxReadTime (Duration second), my program will 
> successfully execute as the batch mode (batch processing).
> The steps of performing StarterPipeline.java in my program are:
> step1 mvn compile exec:java -Dexec.mainClass=com.itri.beam.StarterPipeline 
> -Pspark2-runner -Dexec.args="--runner=SparkRunner"
> step2 mvn clean package
> step3 cp -rf target/beamkafkaIO-0.1.jar /root/
> step4 cd /spark-2.2.1-bin-hadoop2.6/bin
> step5 ./spark-submit --class com.itri.beam.StarterPipeline --master local[4] 
> /root/beamkafkaIO-0.1.jar --runner=SparkRunner
> I am not sure if this issue is a bug about kafkaIO or I was wrong with some 
> parameter settings over spark runner ?
> I really can't handle it, so I hope to get help from you.
> if any further information is needed, i am glad to be informed and will 
> provide to you as soon as possible.
> I will highly appreciate it if you can help me to deal with this issue.
> i am looking forward to hearing from you.
>  
> Sincerely yours,
>  
> Rick
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (BEAM-4632) KafkaIO seems to fail on streaming mode over spark runner

2018-06-26 Thread Rick Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4632?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rick Lin updated BEAM-4632:
---
Attachment: .withMaxNumRecords(50).JPG

> KafkaIO seems to fail on streaming mode over spark runner
> -
>
> Key: BEAM-4632
> URL: https://issues.apache.org/jira/browse/BEAM-4632
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-kafka, runner-spark
>Affects Versions: 2.4.0
> Environment: Ubuntu 16.04.4 LTS
>Reporter: Rick Lin
>Assignee: Alexey Romanenko
>Priority: Major
> Attachments: .withMaxNumRecords(50).JPG, 
> DB_table_kafkabeamdata_count.JPG, the error GeneratedMessageV3.JPG, the error 
> GeneratedMessageV3.JPG
>
>
> Dear sir,
> The following versions of related tools are set in my running program:
> ==
> Beam 2.4.0 (Direct runner and Spark runner)
> Spark 2.2.1 (local mode and standalone mode)
> Kafka: 2.11-0.10.1.1
> scala: 2.11.8
> java: 1.8
> ==
> My programs (KafkaToKafka.java and StarterPipeline.java) are as shown on my 
> github: [https://github.com/LinRick/beamkafkaIO],
> The description of my situation is as:
> {color:#14892c}The kafka broker is working and kafkaIO.read (consumer) is 
> used to capture data from the assigned broker ip 
> ([http://ubuntu7:9092)|http://ubuntu7:9092)./].{color}
> {color:#14892c}The user manual of kafkaIO SDK (on 
> web:[https://beam.apache.org/documentation/sdks/javadoc/2.4.0/])  indicates 
> that the following parameters need to be set, and then the kafkaIO can work 
> well.{color}
>  {color:#FF}.withBootstrapServers("kafka broker ip:9092"){color}
> {color:#FF} .withTopic("kafkasink"){color}
> {color:#FF} .withKeyDeserializer(IntegerDeserializer.class){color}
> {color:#FF} .withValueDeserializer(StringDeserializer.class) {color}
> When i run my program with these settings over direct runner, i can find that 
> my program perform well. In addition, my running program is the streaming 
> mode. *However, i run these codes with the same settings (kafkaIO) over spark 
> runner, and my running program is not the streaming mode and is shutdown*. 
> Here, as mentioned on the website: 
> [https://beam.apache.org/documentation/runners/spark/], the performing 
> program will automatically set streaming mode. 
> Unfortunately, it failed for my program.
> On the other hand, If i set the parameter  kafkaIO.read.withMaxNumRecords 
> (1000) or  kafkaIO.read.withMaxReadTime (Duration second), my program will 
> successfully execute as the batch mode (batch processing).
> The steps of performing StarterPipeline.java in my program are:
> step1 mvn compile exec:java -Dexec.mainClass=com.itri.beam.StarterPipeline 
> -Pspark2-runner -Dexec.args="--runner=SparkRunner"
> step2 mvn clean package
> step3 cp -rf target/beamkafkaIO-0.1.jar /root/
> step4 cd /spark-2.2.1-bin-hadoop2.6/bin
> step5 ./spark-submit --class com.itri.beam.StarterPipeline --master local[4] 
> /root/beamkafkaIO-0.1.jar --runner=SparkRunner
> I am not sure if this issue is a bug about kafkaIO or I was wrong with some 
> parameter settings over spark runner ?
> I really can't handle it, so I hope to get help from you.
> if any further information is needed, i am glad to be informed and will 
> provide to you as soon as possible.
> I will highly appreciate it if you can help me to deal with this issue.
> i am looking forward to hearing from you.
>  
> Sincerely yours,
>  
> Rick
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-4632) KafkaIO seems to fail on streaming mode over spark runner

2018-06-26 Thread Rick Lin (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-4632?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16524523#comment-16524523
 ] 

Rick Lin commented on BEAM-4632:


Dear all,

Thanks very much for your attention to this problem.

 

Hi [~aromanenko],

Yes, this project have been changed *beam.version* *from 2.5.0-SNAPSHOT  to 
2.4.0* and *spark2.jackson.version from 2.9.5 to  2.8.9* to run my project 
(updated on my Github). 

When running my pipeline with p.run().waitUntilFinish() for 
StarterPipeline.java program,{color:#d04437} *KafkaIO can be on streaming mode 
(spark runner with local[4])*.{color}

The output of my pipeline is to write the amount of data into the PostgreSQL 
(raw_c42a25f4bd3d74429dbeb6162e60e5c7/kafkabeamdata) each second, as follows:
{quote}{color:#205081}countData.apply(JdbcIO.>write(){color}
{color:#205081} 
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create({color}
{color:#205081} "org.postgresql.Driver",{color}
{color:#205081} 
"jdbc:postgresql://ubuntu7:5432/raw_c42a25f4bd3d74429dbeb6162e60e5c7"){color}
{color:#205081} .withUsername("postgres"){color}
{color:#205081} .withPassword("postgres")){color}
{color:#205081} .withStatement("insert into kafkabeamdata (count) 
values(?)"){color}
{color:#205081} .withPreparedStatementSetter(new 
JdbcIO.PreparedStatementSetter>() {{color}
{color:#205081} @Override{color}
{color:#205081} public void setParameters(KV element, 
PreparedStatement query){color}
{color:#205081} throws SQLException {{color}
{color:#205081} double count = element.getValue().doubleValue();{color}
{color:#205081} query.setDouble(1, count);{color}
{color:#205081} }{color}
{color:#205081} }));{color}
{quote}
The following figure shows that the amount of data can be wrote into the DB,as:

!DB_table_kafkabeamdata_count.JPG!

In the above table, we can see many zero values (count), and that means there 
is no data in most windows with the applied window/triggering/Watermark:

".apply(Window.>into(FixedWindows.of(Duration.standardSeconds(1)))
 .triggering(AfterWatermark.pastEndOfWindow()
 
.withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.ZERO)))
 .withAllowedLateness(Duration.ZERO)
 .discardingFiredPanes())".

In this project, I hope that the count of data can mostly be equal than the 
amount of data generated in kafka producer. For example, the figure shows when 
using setting "kafkaIO.Read.withMaxNumRecords(50)" in the pipeline, as:

!.withMaxNumRecords(50).JPG!

{color:#d04437}The table shows that there is a expected quantity of streaming 
data in each window.{color}

{color:#d04437}If I would like to realize the situation on a streaming mode, 
{color}

{color:#d04437}what can i do related settings for spark runner (set standalone 
mode), spark pipeline (set MaxRecordsPerBatch), and kafkaIO.Read ?{color}

 

On the other hand, there is another error:

"Caused by: java.lang.ClassNotFoundException: 
com.google.protobuf.GeneratedMessageV3"

as shown in attachments

!the error GeneratedMessageV3.JPG|width=1156,height=249!

For deal with this error, I have added the required dependency (added on my 
Github), as:

"{color:#14892c}3.4.0{color}

{color:#14892c}{color}
{color:#14892c} {color}
{color:#14892c} com.google.protobuf{color}
{color:#14892c} protobuf-java{color}
{color:#14892c} ${protobuf-java.version}{color}
{color:#14892c} {color}{color:#33}"{color}

Rick

> KafkaIO seems to fail on streaming mode over spark runner
> -
>
> Key: BEAM-4632
> URL: https://issues.apache.org/jira/browse/BEAM-4632
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-kafka, runner-spark
>Affects Versions: 2.4.0
> Environment: Ubuntu 16.04.4 LTS
>Reporter: Rick Lin
>Assignee: Alexey Romanenko
>Priority: Major
> Attachments: .withMaxNumRecords(50).JPG, 
> DB_table_kafkabeamdata_count.JPG, the error GeneratedMessageV3.JPG, the error 
> GeneratedMessageV3.JPG
>
>
> Dear sir,
> The following versions of related tools are set in my running program:
> ==
> Beam 2.4.0 (Direct runner and Spark runner)
> Spark 2.2.1 (local mode and standalone mode)
> Kafka: 2.11-0.10.1.1
> scala: 2.11.8
> java: 1.8
> ==
> My programs (KafkaToKafka.java and StarterPipeline.java) are as shown on my 
> github: [https://github.com/LinRick/beamkafkaIO],
> The description of my situation is as:
> {color:#14892c}The kafka broker is working and kafkaIO.read (consumer) is 
> used to capture data from the assigned broker ip 
> ([http://ubuntu7:9092)|http://ubuntu7:9092)./].{color}
> {color:#14892c}The user manual of kafkaIO SDK (on 
> web:[https://beam.apache.org/documentation/sdks/javadoc/2.4.0/])  indicates 
> that the following parameters need to be set, and then 

[jira] [Comment Edited] (BEAM-4632) KafkaIO seems to fail on streaming mode over spark runner

2018-06-27 Thread Rick Lin (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-4632?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16524523#comment-16524523
 ] 

Rick Lin edited comment on BEAM-4632 at 6/27/18 6:25 AM:
-

Dear all,

Thanks very much for your attention to this problem.

 

Hi [~aromanenko],

Yes, this project have been changed *beam.version* *from 2.5.0-SNAPSHOT  to 
2.4.0* and *spark2.jackson.version from 2.9.5 to  2.8.9* to run my project 
(updated on my Github). 

When running my pipeline with p.run().waitUntilFinish() for 
StarterPipeline.java program,{color:#d04437} *KafkaIO can be on streaming mode 
(spark runner with local[4])*.{color}

The output of my pipeline is to write the amount of data into the PostgreSQL 
(raw_c42a25f4bd3d74429dbeb6162e60e5c7/kafkabeamdata) each second, as follows:
{quote}{color:#205081}countData.apply(JdbcIO.>write(){color}
 {color:#205081} 
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create({color}
 {color:#205081} "org.postgresql.Driver",{color}
 {color:#205081} 
"jdbc:postgresql://ubuntu7:5432/raw_c42a25f4bd3d74429dbeb6162e60e5c7"){color}
 {color:#205081} .withUsername("postgres"){color}
 {color:#205081} .withPassword("postgres")){color}
 {color:#205081} .withStatement("insert into kafkabeamdata (count) 
values(?)"){color}
 {color:#205081} .withPreparedStatementSetter(new 
JdbcIO.PreparedStatementSetter>() {{color}
 {color:#205081} @Override{color}
 {color:#205081} public void setParameters(KV element, 
PreparedStatement query){color}
 {color:#205081} throws SQLException {{color}
 {color:#205081} double count = element.getValue().doubleValue();{color}
 {color:#205081} query.setDouble(1, count);{color}
 {color:#205081} }{color}
 {color:#205081} }));{color}
{quote}
The following figure shows that the amount of data can be wrote into the DB,as:

!DB_table_kafkabeamdata_count.JPG!

In the above table, we can see many zero values (count), and that means there 
is no data in most windows with the applied window/triggering/Watermark:
{quote}".apply(Window.>into(FixedWindows.of(Duration.standardSeconds(1)))
 .triggering(AfterWatermark.pastEndOfWindow()
 
.withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.ZERO)))
 .withAllowedLateness(Duration.ZERO)
 .discardingFiredPanes())".
{quote}
In this project, I hope that the count of data can mostly be equal than the 
amount of data generated in kafka producer. For example, the figure shows when 
using setting "kafkaIO.Read.withMaxNumRecords(50)" in the pipeline, as:

!.withMaxNumRecords(50).JPG!

{color:#d04437}The table shows that there is a expected quantity of streaming 
data in each window.{color}

{color:#d04437}If I would like to realize the situation on a streaming mode, 
{color}

{color:#d04437}what can i do related settings for spark runner (set standalone 
mode), spark pipeline (set MaxRecordsPerBatch), and kafkaIO.Read ?{color}

 

On the other hand, there is another error:

"Caused by: java.lang.ClassNotFoundException: 
com.google.protobuf.GeneratedMessageV3"

as shown in attachments

!the error GeneratedMessageV3.JPG|width=1156,height=249!

For dealing with this error, I have added the required dependency (added on my 
Github), as:

"{color:#14892c}3.4.0{color}

{color:#14892c}{color}
 {color:#14892c} {color}
 {color:#14892c} com.google.protobuf{color}
 {color:#14892c} protobuf-java{color}
 {color:#14892c} ${protobuf-java.version}{color}
 {color:#14892c} {color}{color:#33}"{color}

Rick


was (Author: ricklin):
Dear all,

Thanks very much for your attention to this problem.

 

Hi [~aromanenko],

Yes, this project have been changed *beam.version* *from 2.5.0-SNAPSHOT  to 
2.4.0* and *spark2.jackson.version from 2.9.5 to  2.8.9* to run my project 
(updated on my Github). 

When running my pipeline with p.run().waitUntilFinish() for 
StarterPipeline.java program,{color:#d04437} *KafkaIO can be on streaming mode 
(spark runner with local[4])*.{color}

The output of my pipeline is to write the amount of data into the PostgreSQL 
(raw_c42a25f4bd3d74429dbeb6162e60e5c7/kafkabeamdata) each second, as follows:
{quote}{color:#205081}countData.apply(JdbcIO.>write(){color}
{color:#205081} 
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create({color}
{color:#205081} "org.postgresql.Driver",{color}
{color:#205081} 
"jdbc:postgresql://ubuntu7:5432/raw_c42a25f4bd3d74429dbeb6162e60e5c7"){color}
{color:#205081} .withUsername("postgres"){color}
{color:#205081} .withPassword("postgres")){color}
{color:#205081} .withStatement("insert into kafkabeamdata (count) 
values(?)"){color}
{color:#205081} .withPreparedStatementSetter(new 
JdbcIO.PreparedStatementSetter>() {{color}
{color:#205081} @Override{color}
{color:#205081} public void setParameters(KV element, 
PreparedStatement query){color}
{color:#205081} throws SQLException {{color}
{color:#205081} double count = element.getValue().doubleValue();{color}

[jira] [Commented] (BEAM-4632) KafkaIO seems to fail on streaming mode over spark runner

2018-06-27 Thread Rick Lin (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-4632?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16524674#comment-16524674
 ] 

Rick Lin commented on BEAM-4632:


Dear [~aromanenko],

I have tried running my project with Spark runner (standalone mode) to 
{color:#d04437}capture more data from kafka{color} into each window, in which 
the settings of driver/master/worker nodes respectively are:

Driver node (ubuntu8):

spark-defaults.conf
{quote}spark.driver.memory 10g
spark.executor.memory 2g
spark.executor.instances 4
{quote}
master node (ubuntu8):
{quote}export SPARK_MASTER_IP="ubuntu8"
export SPARK_MASTER_WEBUI_PORT=8082
{quote}
The settings of two worker nodes  (ubuntu8 and ubuntu9) are the same as the 
master node.
||ExecutorID||Worker||Cores||Memory||State||Logs||
|1|[worker-20180627140534-ubuntu9-39922|http://10.236.1.9:8081/]|4|2048|KILLED|[stdout|http://10.236.1.9:8081/logPage?appId=app-20180627141506-0002=1=stdout]
 
[stderr|http://10.236.1.9:8081/logPage?appId=app-20180627141506-0002=1=stderr]|
|0|[worker-20180627141225-ubuntu8-33234|http://10.236.1.8:8081/]|4|2048|KILLED|[stdout|http://10.236.1.8:8081/logPage?appId=app-20180627141506-0002=0=stdout]
 
[stderr|http://10.236.1.8:8081/logPage?appId=app-20180627141506-0002=0=stderr]|

 

In addition, my pipeline is set as:
{quote}Pipeline p = Pipeline.create(options);
 {color:#d04437}options.setMaxRecordsPerBatch(1000L);{color}
 {color:#d04437}options.setSparkMaster("spark://ubuntu8:7077");{color}

...

PCollection> readData = p.apply(KafkaIO.read()
 .withBootstrapServers("ubuntu7:9092")
 .withTopic("kafkasink")
 .withKeyDeserializer(IntegerDeserializer.class)
 .withValueDeserializer(StringDeserializer.class) 
 .withoutMetadata());

...

p.run().waitUntilFinish();
{quote}
and uses the following command line to run project:

./spark-submit --class com.itri.beam.StarterPipeline --master 
spark://ubuntu8:7077 /root/beamkafkaIO-0.1.jar --runner=SparkRunner

After a while, my program is broken, where this error is as shown in 
attachments (error UnboundedDataset.java 81(0) has different number of 
partitions.JPG)

"UnboundedDataset.java:81(0) has different number of partitions from original 
RDD MapPartitionsRDD[698] at updateStateByKey at 
SparkGroupAlsoByWindowViaWindowSet.java:612(2)"

Although only using one work node (ubuntu9), the similar error still appears.

 

Best,

Rick

 

 

> KafkaIO seems to fail on streaming mode over spark runner
> -
>
> Key: BEAM-4632
> URL: https://issues.apache.org/jira/browse/BEAM-4632
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-kafka, runner-spark
>Affects Versions: 2.4.0
> Environment: Ubuntu 16.04.4 LTS
>Reporter: Rick Lin
>Assignee: Alexey Romanenko
>Priority: Major
> Attachments: .withMaxNumRecords(50).JPG, 
> DB_table_kafkabeamdata_count.JPG, error UnboundedDataset.java 81(0) has 
> different number of partitions.JPG, the error GeneratedMessageV3.JPG, the 
> error GeneratedMessageV3.JPG
>
>
> Dear sir,
> The following versions of related tools are set in my running program:
> ==
> Beam 2.4.0 (Direct runner and Spark runner)
> Spark 2.2.1 (local mode and standalone mode)
> Kafka: 2.11-0.10.1.1
> scala: 2.11.8
> java: 1.8
> ==
> My programs (KafkaToKafka.java and StarterPipeline.java) are as shown on my 
> github: [https://github.com/LinRick/beamkafkaIO],
> The description of my situation is as:
> {color:#14892c}The kafka broker is working and kafkaIO.read (consumer) is 
> used to capture data from the assigned broker ip 
> ([http://ubuntu7:9092)|http://ubuntu7:9092)./].{color}
> {color:#14892c}The user manual of kafkaIO SDK (on 
> web:[https://beam.apache.org/documentation/sdks/javadoc/2.4.0/])  indicates 
> that the following parameters need to be set, and then the kafkaIO can work 
> well.{color}
>  {color:#FF}.withBootstrapServers("kafka broker ip:9092"){color}
> {color:#FF} .withTopic("kafkasink"){color}
> {color:#FF} .withKeyDeserializer(IntegerDeserializer.class){color}
> {color:#FF} .withValueDeserializer(StringDeserializer.class) {color}
> When i run my program with these settings over direct runner, i can find that 
> my program perform well. In addition, my running program is the streaming 
> mode. *However, i run these codes with the same settings (kafkaIO) over spark 
> runner, and my running program is not the streaming mode and is shutdown*. 
> Here, as mentioned on the website: 
> [https://beam.apache.org/documentation/runners/spark/], the performing 
> program will automatically set streaming mode. 
> Unfortunately, it failed for my program.
> On the other hand, If i set the parameter  kafkaIO.read.withMaxNumRecords 
> (1000) or  kafkaIO.read.withMaxReadTime (Duration 

[jira] [Updated] (BEAM-4632) KafkaIO seems to fail on streaming mode over spark runner

2018-06-27 Thread Rick Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4632?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rick Lin updated BEAM-4632:
---
Attachment: error UnboundedDataset.java 81(0) has different number of 
partitions.JPG

> KafkaIO seems to fail on streaming mode over spark runner
> -
>
> Key: BEAM-4632
> URL: https://issues.apache.org/jira/browse/BEAM-4632
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-kafka, runner-spark
>Affects Versions: 2.4.0
> Environment: Ubuntu 16.04.4 LTS
>Reporter: Rick Lin
>Assignee: Alexey Romanenko
>Priority: Major
> Attachments: .withMaxNumRecords(50).JPG, 
> DB_table_kafkabeamdata_count.JPG, error UnboundedDataset.java 81(0) has 
> different number of partitions.JPG, the error GeneratedMessageV3.JPG, the 
> error GeneratedMessageV3.JPG
>
>
> Dear sir,
> The following versions of related tools are set in my running program:
> ==
> Beam 2.4.0 (Direct runner and Spark runner)
> Spark 2.2.1 (local mode and standalone mode)
> Kafka: 2.11-0.10.1.1
> scala: 2.11.8
> java: 1.8
> ==
> My programs (KafkaToKafka.java and StarterPipeline.java) are as shown on my 
> github: [https://github.com/LinRick/beamkafkaIO],
> The description of my situation is as:
> {color:#14892c}The kafka broker is working and kafkaIO.read (consumer) is 
> used to capture data from the assigned broker ip 
> ([http://ubuntu7:9092)|http://ubuntu7:9092)./].{color}
> {color:#14892c}The user manual of kafkaIO SDK (on 
> web:[https://beam.apache.org/documentation/sdks/javadoc/2.4.0/])  indicates 
> that the following parameters need to be set, and then the kafkaIO can work 
> well.{color}
>  {color:#FF}.withBootstrapServers("kafka broker ip:9092"){color}
> {color:#FF} .withTopic("kafkasink"){color}
> {color:#FF} .withKeyDeserializer(IntegerDeserializer.class){color}
> {color:#FF} .withValueDeserializer(StringDeserializer.class) {color}
> When i run my program with these settings over direct runner, i can find that 
> my program perform well. In addition, my running program is the streaming 
> mode. *However, i run these codes with the same settings (kafkaIO) over spark 
> runner, and my running program is not the streaming mode and is shutdown*. 
> Here, as mentioned on the website: 
> [https://beam.apache.org/documentation/runners/spark/], the performing 
> program will automatically set streaming mode. 
> Unfortunately, it failed for my program.
> On the other hand, If i set the parameter  kafkaIO.read.withMaxNumRecords 
> (1000) or  kafkaIO.read.withMaxReadTime (Duration second), my program will 
> successfully execute as the batch mode (batch processing).
> The steps of performing StarterPipeline.java in my program are:
> step1 mvn compile exec:java -Dexec.mainClass=com.itri.beam.StarterPipeline 
> -Pspark2-runner -Dexec.args="--runner=SparkRunner"
> step2 mvn clean package
> step3 cp -rf target/beamkafkaIO-0.1.jar /root/
> step4 cd /spark-2.2.1-bin-hadoop2.6/bin
> step5 ./spark-submit --class com.itri.beam.StarterPipeline --master local[4] 
> /root/beamkafkaIO-0.1.jar --runner=SparkRunner
> I am not sure if this issue is a bug about kafkaIO or I was wrong with some 
> parameter settings over spark runner ?
> I really can't handle it, so I hope to get help from you.
> if any further information is needed, i am glad to be informed and will 
> provide to you as soon as possible.
> I will highly appreciate it if you can help me to deal with this issue.
> i am looking forward to hearing from you.
>  
> Sincerely yours,
>  
> Rick
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (BEAM-4632) KafkaIO seems to fail on streaming mode over spark runner

2018-06-27 Thread Rick Lin (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-4632?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16524674#comment-16524674
 ] 

Rick Lin edited comment on BEAM-4632 at 6/27/18 7:04 AM:
-

Dear [~aromanenko],

I have tried running my project with Spark runner (standalone mode) to 
{color:#d04437}capture more data from kafka{color} into each window, in which 
the settings of driver/master/worker nodes respectively are:

Driver node (ubuntu8):

spark-defaults.conf
{quote}spark.driver.memory 10g
 spark.executor.memory 2g
 spark.executor.instances 4
{quote}
master node (ubuntu8):
{quote}export SPARK_MASTER_IP="ubuntu8"
 export SPARK_MASTER_WEBUI_PORT=8082
{quote}
The settings of two worker nodes  (ubuntu8 and ubuntu9) are the same as the 
master node.
||ExecutorID||Worker||Cores||Memory||State||Logs||
|1|ubuntu9|4|2048|KILLED| |
|0|ubuntu8|4|2048|KILLED| |

 

In addition, my pipeline is set as:
{quote}Pipeline p = Pipeline.create(options);
 {color:#d04437}options.setMaxRecordsPerBatch(1000L);{color}
 {color:#d04437}options.setSparkMaster("spark://ubuntu8:7077");{color}

...

PCollection> readData = p.apply(KafkaIO.read()
 .withBootstrapServers("ubuntu7:9092")
 .withTopic("kafkasink")
 .withKeyDeserializer(IntegerDeserializer.class)
 .withValueDeserializer(StringDeserializer.class) 
 .withoutMetadata());

...

p.run().waitUntilFinish();
{quote}
and uses the following command line to run project:

./spark-submit --class com.itri.beam.StarterPipeline --master 
spark://ubuntu8:7077 /root/beamkafkaIO-0.1.jar --runner=SparkRunner

After a while, my program is broken, where this error is as shown in 
attachments (error UnboundedDataset.java 81(0) has different number of 
partitions.JPG)

"UnboundedDataset.java:81(0) has different number of partitions from original 
RDD MapPartitionsRDD[698] at updateStateByKey at 
SparkGroupAlsoByWindowViaWindowSet.java:612(2)"

Although only using one work node (ubuntu9), the similar error still appears.

 

Best,

Rick

 

 


was (Author: ricklin):
Dear [~aromanenko],

I have tried running my project with Spark runner (standalone mode) to 
{color:#d04437}capture more data from kafka{color} into each window, in which 
the settings of driver/master/worker nodes respectively are:

Driver node (ubuntu8):

spark-defaults.conf
{quote}spark.driver.memory 10g
spark.executor.memory 2g
spark.executor.instances 4
{quote}
master node (ubuntu8):
{quote}export SPARK_MASTER_IP="ubuntu8"
export SPARK_MASTER_WEBUI_PORT=8082
{quote}
The settings of two worker nodes  (ubuntu8 and ubuntu9) are the same as the 
master node.
||ExecutorID||Worker||Cores||Memory||State||Logs||
|1|[worker-20180627140534-ubuntu9-39922|http://10.236.1.9:8081/]|4|2048|KILLED|[stdout|http://10.236.1.9:8081/logPage?appId=app-20180627141506-0002=1=stdout]
 
[stderr|http://10.236.1.9:8081/logPage?appId=app-20180627141506-0002=1=stderr]|
|0|[worker-20180627141225-ubuntu8-33234|http://10.236.1.8:8081/]|4|2048|KILLED|[stdout|http://10.236.1.8:8081/logPage?appId=app-20180627141506-0002=0=stdout]
 
[stderr|http://10.236.1.8:8081/logPage?appId=app-20180627141506-0002=0=stderr]|

 

In addition, my pipeline is set as:
{quote}Pipeline p = Pipeline.create(options);
 {color:#d04437}options.setMaxRecordsPerBatch(1000L);{color}
 {color:#d04437}options.setSparkMaster("spark://ubuntu8:7077");{color}

...

PCollection> readData = p.apply(KafkaIO.read()
 .withBootstrapServers("ubuntu7:9092")
 .withTopic("kafkasink")
 .withKeyDeserializer(IntegerDeserializer.class)
 .withValueDeserializer(StringDeserializer.class) 
 .withoutMetadata());

...

p.run().waitUntilFinish();
{quote}
and uses the following command line to run project:

./spark-submit --class com.itri.beam.StarterPipeline --master 
spark://ubuntu8:7077 /root/beamkafkaIO-0.1.jar --runner=SparkRunner

After a while, my program is broken, where this error is as shown in 
attachments (error UnboundedDataset.java 81(0) has different number of 
partitions.JPG)

"UnboundedDataset.java:81(0) has different number of partitions from original 
RDD MapPartitionsRDD[698] at updateStateByKey at 
SparkGroupAlsoByWindowViaWindowSet.java:612(2)"

Although only using one work node (ubuntu9), the similar error still appears.

 

Best,

Rick

 

 

> KafkaIO seems to fail on streaming mode over spark runner
> -
>
> Key: BEAM-4632
> URL: https://issues.apache.org/jira/browse/BEAM-4632
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-kafka, runner-spark
>Affects Versions: 2.4.0
> Environment: Ubuntu 16.04.4 LTS
>Reporter: Rick Lin
>Assignee: Alexey Romanenko
>Priority: Major
> Attachments: .withMaxNumRecords(50).JPG, 
> DB_table_kafkabeamdata_count.JPG, error UnboundedDataset.java 81(0) has 
> different number of partitions.JPG, the error 

[jira] [Commented] (BEAM-3073) Connect to Apache ignite via JdbcIO sdk

2017-10-19 Thread Rick Lin (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-3073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16212010#comment-16212010
 ] 

Rick Lin commented on BEAM-3073:


Hi,
Apache Ignite is similar to Redis or Hazelcast (in-memory key-value database).
I think that the JdbcIO sdk is not suitable for Ignite.
So, developing the in-memory IO sdk for Ignite is needed.
Thanks

> Connect to Apache ignite via JdbcIO sdk
> ---
>
> Key: BEAM-3073
> URL: https://issues.apache.org/jira/browse/BEAM-3073
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-java-extensions
>Reporter: Rick Lin
>Assignee: Jean-Baptiste Onofré
>Priority: Minor
>
> Hi all,
> {color:#14892c}I tried to connect Apache Ignite(In-memory) via the beam's 
> sdk:org.apache.beam.sdk.io.jdbc.JdbcIO
> Here, i am not sure if the JdbcIO sdk only is provided for some specific 
> Database: MySQL(disk), postgreSQL(disk)?{color}
> my java test code is as follows:
> import java.sql.PreparedStatement;
> import java.sql.SQLException;
> import java.util.ArrayList;
> import java.util.List;
> import org.apache.beam.sdk.Pipeline;
> import org.apache.beam.sdk.io.jdbc.JdbcIO;
> import org.apache.beam.sdk.options.PipelineOptionsFactory;
> import org.apache.beam.sdk.transforms.Create;
> import org.apache.beam.sdk.values.KV;
> import org.apache.beam.sdk.values.PCollection;
> public class BeamtoJDBC {
>   public static void main(String[] args) {
>   Integer[] value=new Integer[] {1,2,3,4,5};
>   List> dataList = new ArrayList<>();
>   int n=value.length;
>   int count=0;
>   for (int i=0; i   {
>   dataList.add(KV.of(count,value[i]));
>   count=count+1;  
>   }
>   
>   Pipeline p = 
> Pipeline.create(PipelineOptionsFactory.fromArgs(args).withValidation().create());
>   
>   PCollection> data=p.apply("create data 
> with time",Create.of(dataList));
>   data.apply(JdbcIO.>write()
>   
> .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration
>   
> .create("org.apache.ignite.IgniteJdbcDriver", 
> "jdbc:ignite://localhost:11211/")
>   )   
>   .withPreparedStatementSetter(new 
> JdbcIO.PreparedStatementSetter>() {
>   public void setParameters(KV Integer> element, PreparedStatement query)
>   throws SQLException {
>   query.setInt(1, 
> element.getKey());
>   query.setInt(2, 
> element.getValue());
>   }
>   })
>   );
>   p.run();
>   }
> }
> {color:#d04437}my error message is: 
> " InvocationTargetException: org.apache.beam.sdk.util.UserCodeException: 
> java.sql.SQLException: Cannot create PoolableConnectionFactory 
> (Failed to establish connection.): Failed to get future result due to waiting 
> timed out. "{color}
> {color:#14892c}I would like to know whether the connection between beam and 
> ignite is feasible or not?{color}
> Thanks
> Rick



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-3073) Connect to Apache ignite via JdbcIO sdk

2017-10-20 Thread Rick Lin (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-3073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16212248#comment-16212248
 ] 

Rick Lin commented on BEAM-3073:


Hi,
In my opinion:

* The JDBC driver, org.apache.ignite.IgniteJdbcDriver, is only used to store 
data to Database(MySQL, PostgreSQL..) on disk via ignite. That is not for beam 
to output data into ignite.
* Yes, i am looking forward to the SDK for ignite I/O

Thanks for the update on the feature:).
Rick


> Connect to Apache ignite via JdbcIO sdk
> ---
>
> Key: BEAM-3073
> URL: https://issues.apache.org/jira/browse/BEAM-3073
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-java-extensions
>Reporter: Rick Lin
>Assignee: Jean-Baptiste Onofré
>Priority: Minor
>
> Hi all,
> {color:#14892c}I tried to connect Apache Ignite(In-memory) via the beam's 
> sdk:org.apache.beam.sdk.io.jdbc.JdbcIO
> Here, i am not sure if the JdbcIO sdk only is provided for some specific 
> Database: MySQL(disk), postgreSQL(disk)?{color}
> my java test code is as follows:
> import java.sql.PreparedStatement;
> import java.sql.SQLException;
> import java.util.ArrayList;
> import java.util.List;
> import org.apache.beam.sdk.Pipeline;
> import org.apache.beam.sdk.io.jdbc.JdbcIO;
> import org.apache.beam.sdk.options.PipelineOptionsFactory;
> import org.apache.beam.sdk.transforms.Create;
> import org.apache.beam.sdk.values.KV;
> import org.apache.beam.sdk.values.PCollection;
> public class BeamtoJDBC {
>   public static void main(String[] args) {
>   Integer[] value=new Integer[] {1,2,3,4,5};
>   List> dataList = new ArrayList<>();
>   int n=value.length;
>   int count=0;
>   for (int i=0; i   {
>   dataList.add(KV.of(count,value[i]));
>   count=count+1;  
>   }
>   
>   Pipeline p = 
> Pipeline.create(PipelineOptionsFactory.fromArgs(args).withValidation().create());
>   
>   PCollection> data=p.apply("create data 
> with time",Create.of(dataList));
>   data.apply(JdbcIO.>write()
>   
> .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration
>   
> .create("org.apache.ignite.IgniteJdbcDriver", 
> "jdbc:ignite://localhost:11211/")
>   )   
>   .withPreparedStatementSetter(new 
> JdbcIO.PreparedStatementSetter>() {
>   public void setParameters(KV Integer> element, PreparedStatement query)
>   throws SQLException {
>   query.setInt(1, 
> element.getKey());
>   query.setInt(2, 
> element.getValue());
>   }
>   })
>   );
>   p.run();
>   }
> }
> {color:#d04437}my error message is: 
> " InvocationTargetException: org.apache.beam.sdk.util.UserCodeException: 
> java.sql.SQLException: Cannot create PoolableConnectionFactory 
> (Failed to establish connection.): Failed to get future result due to waiting 
> timed out. "{color}
> {color:#14892c}I would like to know whether the connection between beam and 
> ignite is feasible or not?{color}
> Thanks
> Rick



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-4632) KafkaIO seems to fail on streaming mode over spark runner

2018-07-30 Thread Rick Lin (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-4632?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16561540#comment-16561540
 ] 

Rick Lin commented on BEAM-4632:


By the way, my configuration setting of kafka broker is:

 
{code:java}
/kafka_broker/bin/kafka-producer-perf-test.sh \
--num-records 1000 \
--record-size 100 \
--topic kafkasink \
--throughput 1 \
--producer-props acks=0  \
bootstrap.servers=ubuntu7:9092  \
batch.size=1000{code}
The display of kafka broker on console is as:

...

49992 records sent,{color:#d04437} 9998.4 records/sec{color} (0.95 MB/sec), 1.0 
ms avg latency, 146.0 max latency.
50040 records sent, {color:#d04437}10008.0 records/sec{color} (0.95 MB/sec), 
0.2 ms avg latency, 5.0 m ax latency.
50019 records sent, {color:#d04437}10001.8 records/sec{color} (0.95 MB/sec), 
0.2 ms avg latency, 1.0 m ax latency.
50011 records sent, {color:#d04437}10002.2 records/sec{color} (0.95 MB/sec), 
0.2 ms avg latency, 3.0 m ax latency.
50020 records sent, {color:#d04437}10002.0 records/sec{color} (0.95 MB/sec), 
0.2 ms avg latency, 1.0 m ax latency.

...

Rick

 

 

> KafkaIO seems to fail on streaming mode over spark runner
> -
>
> Key: BEAM-4632
> URL: https://issues.apache.org/jira/browse/BEAM-4632
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-kafka, runner-spark
>Affects Versions: 2.4.0
> Environment: Ubuntu 16.04.4 LTS
>Reporter: Rick Lin
>Assignee: Alexey Romanenko
>Priority: Major
> Attachments: .withMaxNumRecords(50).JPG, 
> DB_table_kafkabeamdata_count.JPG, error UnboundedDataset.java 81(0) has 
> different number of partitions.JPG, the error GeneratedMessageV3.JPG, the 
> error GeneratedMessageV3.JPG
>
>
> Dear sir,
> The following versions of related tools are set in my running program:
> ==
> Beam 2.4.0 (Direct runner and Spark runner)
> Spark 2.2.1 (local mode and standalone mode)
> Kafka: 2.11-0.10.1.1
> scala: 2.11.8
> java: 1.8
> ==
> My programs (KafkaToKafka.java and StarterPipeline.java) are as shown on my 
> github: [https://github.com/LinRick/beamkafkaIO],
> The description of my situation is as:
> {color:#14892c}The kafka broker is working and kafkaIO.read (consumer) is 
> used to capture data from the assigned broker ip 
> ([http://ubuntu7:9092)|http://ubuntu7:9092)./].{color}
> {color:#14892c}The user manual of kafkaIO SDK (on 
> web:[https://beam.apache.org/documentation/sdks/javadoc/2.4.0/])  indicates 
> that the following parameters need to be set, and then the kafkaIO can work 
> well.{color}
>  {color:#FF}.withBootstrapServers("kafka broker ip:9092"){color}
> {color:#FF} .withTopic("kafkasink"){color}
> {color:#FF} .withKeyDeserializer(IntegerDeserializer.class){color}
> {color:#FF} .withValueDeserializer(StringDeserializer.class) {color}
> When i run my program with these settings over direct runner, i can find that 
> my program perform well. In addition, my running program is the streaming 
> mode. *However, i run these codes with the same settings (kafkaIO) over spark 
> runner, and my running program is not the streaming mode and is shutdown*. 
> Here, as mentioned on the website: 
> [https://beam.apache.org/documentation/runners/spark/], the performing 
> program will automatically set streaming mode. 
> Unfortunately, it failed for my program.
> On the other hand, If i set the parameter  kafkaIO.read.withMaxNumRecords 
> (1000) or  kafkaIO.read.withMaxReadTime (Duration second), my program will 
> successfully execute as the batch mode (batch processing).
> The steps of performing StarterPipeline.java in my program are:
> step1 mvn compile exec:java -Dexec.mainClass=com.itri.beam.StarterPipeline 
> -Pspark2-runner -Dexec.args="--runner=SparkRunner"
> step2 mvn clean package
> step3 cp -rf target/beamkafkaIO-0.1.jar /root/
> step4 cd /spark-2.2.1-bin-hadoop2.6/bin
> step5 ./spark-submit --class com.itri.beam.StarterPipeline --master local[4] 
> /root/beamkafkaIO-0.1.jar --runner=SparkRunner
> I am not sure if this issue is a bug about kafkaIO or I was wrong with some 
> parameter settings over spark runner ?
> I really can't handle it, so I hope to get help from you.
> if any further information is needed, i am glad to be informed and will 
> provide to you as soon as possible.
> I will highly appreciate it if you can help me to deal with this issue.
> i am looking forward to hearing from you.
>  
> Sincerely yours,
>  
> Rick
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (BEAM-4632) KafkaIO seems to fail on streaming mode over spark runner

2018-07-30 Thread Rick Lin (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-4632?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16561540#comment-16561540
 ] 

Rick Lin edited comment on BEAM-4632 at 7/30/18 6:56 AM:
-

By the way, my configuration setting of kafka broker is:

 
{code:java}
/kafka_broker/bin/kafka-producer-perf-test.sh \
--num-records 1000 \
--record-size 100 \
--topic kafkasink \
--throughput 1 \
--producer-props acks=0 bootstrap.servers=ubuntu7:9092 batch.size=1000{code}
The display of kafka broker on console is as:

...

49992 records sent,{color:#d04437} 9998.4 records/sec{color} (0.95 MB/sec), 1.0 
ms avg latency, 146.0 max latency.
 50040 records sent, {color:#d04437}10008.0 records/sec{color} (0.95 MB/sec), 
0.2 ms avg latency, 5.0 m ax latency.
 50019 records sent, {color:#d04437}10001.8 records/sec{color} (0.95 MB/sec), 
0.2 ms avg latency, 1.0 m ax latency.
 50011 records sent, {color:#d04437}10002.2 records/sec{color} (0.95 MB/sec), 
0.2 ms avg latency, 3.0 m ax latency.
 50020 records sent, {color:#d04437}10002.0 records/sec{color} (0.95 MB/sec), 
0.2 ms avg latency, 1.0 m ax latency.

...

Rick

 

 


was (Author: ricklin):
By the way, my configuration setting of kafka broker is:

 
{code:java}
/kafka_broker/bin/kafka-producer-perf-test.sh \
--num-records 1000 \
--record-size 100 \
--topic kafkasink \
--throughput 1 \
--producer-props acks=0  \
bootstrap.servers=ubuntu7:9092  \
batch.size=1000{code}
The display of kafka broker on console is as:

...

49992 records sent,{color:#d04437} 9998.4 records/sec{color} (0.95 MB/sec), 1.0 
ms avg latency, 146.0 max latency.
50040 records sent, {color:#d04437}10008.0 records/sec{color} (0.95 MB/sec), 
0.2 ms avg latency, 5.0 m ax latency.
50019 records sent, {color:#d04437}10001.8 records/sec{color} (0.95 MB/sec), 
0.2 ms avg latency, 1.0 m ax latency.
50011 records sent, {color:#d04437}10002.2 records/sec{color} (0.95 MB/sec), 
0.2 ms avg latency, 3.0 m ax latency.
50020 records sent, {color:#d04437}10002.0 records/sec{color} (0.95 MB/sec), 
0.2 ms avg latency, 1.0 m ax latency.

...

Rick

 

 

> KafkaIO seems to fail on streaming mode over spark runner
> -
>
> Key: BEAM-4632
> URL: https://issues.apache.org/jira/browse/BEAM-4632
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-kafka, runner-spark
>Affects Versions: 2.4.0
> Environment: Ubuntu 16.04.4 LTS
>Reporter: Rick Lin
>Assignee: Alexey Romanenko
>Priority: Major
> Attachments: .withMaxNumRecords(50).JPG, 
> DB_table_kafkabeamdata_count.JPG, error UnboundedDataset.java 81(0) has 
> different number of partitions.JPG, the error GeneratedMessageV3.JPG, the 
> error GeneratedMessageV3.JPG
>
>
> Dear sir,
> The following versions of related tools are set in my running program:
> ==
> Beam 2.4.0 (Direct runner and Spark runner)
> Spark 2.2.1 (local mode and standalone mode)
> Kafka: 2.11-0.10.1.1
> scala: 2.11.8
> java: 1.8
> ==
> My programs (KafkaToKafka.java and StarterPipeline.java) are as shown on my 
> github: [https://github.com/LinRick/beamkafkaIO],
> The description of my situation is as:
> {color:#14892c}The kafka broker is working and kafkaIO.read (consumer) is 
> used to capture data from the assigned broker ip 
> ([http://ubuntu7:9092)|http://ubuntu7:9092)./].{color}
> {color:#14892c}The user manual of kafkaIO SDK (on 
> web:[https://beam.apache.org/documentation/sdks/javadoc/2.4.0/])  indicates 
> that the following parameters need to be set, and then the kafkaIO can work 
> well.{color}
>  {color:#FF}.withBootstrapServers("kafka broker ip:9092"){color}
> {color:#FF} .withTopic("kafkasink"){color}
> {color:#FF} .withKeyDeserializer(IntegerDeserializer.class){color}
> {color:#FF} .withValueDeserializer(StringDeserializer.class) {color}
> When i run my program with these settings over direct runner, i can find that 
> my program perform well. In addition, my running program is the streaming 
> mode. *However, i run these codes with the same settings (kafkaIO) over spark 
> runner, and my running program is not the streaming mode and is shutdown*. 
> Here, as mentioned on the website: 
> [https://beam.apache.org/documentation/runners/spark/], the performing 
> program will automatically set streaming mode. 
> Unfortunately, it failed for my program.
> On the other hand, If i set the parameter  kafkaIO.read.withMaxNumRecords 
> (1000) or  kafkaIO.read.withMaxReadTime (Duration second), my program will 
> successfully execute as the batch mode (batch processing).
> The steps of performing StarterPipeline.java in my program are:
> step1 mvn compile exec:java -Dexec.mainClass=com.itri.beam.StarterPipeline 
> -Pspark2-runner 

[jira] [Commented] (BEAM-4632) KafkaIO seems to fail on streaming mode over spark runner

2018-07-30 Thread Rick Lin (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-4632?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16561536#comment-16561536
 ] 

Rick Lin commented on BEAM-4632:


Hi [~aromanenko],

For the last question, i only change settings of the SparkPipeline in my 
program, as:
{code:java}
SparkPipelineOptions options = PipelineOptionsFactory.fromArgs(args).
 withValidation().as(SparkPipelineOptions.class); 
 options.setRunner(SparkRunner.class); 
 Pipeline p = Pipeline.create(options); 
 options.setMaxRecordsPerBatch(1000L); 

 options.setSparkMaster("spark://ubuntu8:7077");
{code}
, and then i perform the following command line:
{noformat}
mvn compile exec:java -Dexec.mainClass=com.itri.beam.StarterPipeline 
-Pspark2-runner -Dexec.args="--runner=SparkRunner"{noformat}
There is the error:

 
{panel:title=The error is}
[ERROR] Failed to execute goal org.codehaus.mojo:exec-maven-plugin:1.6.0:java 
(default-cli) on project beamkafkaIO2: An exception occured while executing the 
Java class. org.apache.spark.SparkException: Checkpoint RDD has a different 
number of partitions from original RDD. Original RDD [ID: 321, num of 
partitions: 1]; Checkpoint RDD [ID: 324, num of partitions: 0]. -> [Help 1]
{panel}
Thanks

Rick

 

> KafkaIO seems to fail on streaming mode over spark runner
> -
>
> Key: BEAM-4632
> URL: https://issues.apache.org/jira/browse/BEAM-4632
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-kafka, runner-spark
>Affects Versions: 2.4.0
> Environment: Ubuntu 16.04.4 LTS
>Reporter: Rick Lin
>Assignee: Alexey Romanenko
>Priority: Major
> Attachments: .withMaxNumRecords(50).JPG, 
> DB_table_kafkabeamdata_count.JPG, error UnboundedDataset.java 81(0) has 
> different number of partitions.JPG, the error GeneratedMessageV3.JPG, the 
> error GeneratedMessageV3.JPG
>
>
> Dear sir,
> The following versions of related tools are set in my running program:
> ==
> Beam 2.4.0 (Direct runner and Spark runner)
> Spark 2.2.1 (local mode and standalone mode)
> Kafka: 2.11-0.10.1.1
> scala: 2.11.8
> java: 1.8
> ==
> My programs (KafkaToKafka.java and StarterPipeline.java) are as shown on my 
> github: [https://github.com/LinRick/beamkafkaIO],
> The description of my situation is as:
> {color:#14892c}The kafka broker is working and kafkaIO.read (consumer) is 
> used to capture data from the assigned broker ip 
> ([http://ubuntu7:9092)|http://ubuntu7:9092)./].{color}
> {color:#14892c}The user manual of kafkaIO SDK (on 
> web:[https://beam.apache.org/documentation/sdks/javadoc/2.4.0/])  indicates 
> that the following parameters need to be set, and then the kafkaIO can work 
> well.{color}
>  {color:#FF}.withBootstrapServers("kafka broker ip:9092"){color}
> {color:#FF} .withTopic("kafkasink"){color}
> {color:#FF} .withKeyDeserializer(IntegerDeserializer.class){color}
> {color:#FF} .withValueDeserializer(StringDeserializer.class) {color}
> When i run my program with these settings over direct runner, i can find that 
> my program perform well. In addition, my running program is the streaming 
> mode. *However, i run these codes with the same settings (kafkaIO) over spark 
> runner, and my running program is not the streaming mode and is shutdown*. 
> Here, as mentioned on the website: 
> [https://beam.apache.org/documentation/runners/spark/], the performing 
> program will automatically set streaming mode. 
> Unfortunately, it failed for my program.
> On the other hand, If i set the parameter  kafkaIO.read.withMaxNumRecords 
> (1000) or  kafkaIO.read.withMaxReadTime (Duration second), my program will 
> successfully execute as the batch mode (batch processing).
> The steps of performing StarterPipeline.java in my program are:
> step1 mvn compile exec:java -Dexec.mainClass=com.itri.beam.StarterPipeline 
> -Pspark2-runner -Dexec.args="--runner=SparkRunner"
> step2 mvn clean package
> step3 cp -rf target/beamkafkaIO-0.1.jar /root/
> step4 cd /spark-2.2.1-bin-hadoop2.6/bin
> step5 ./spark-submit --class com.itri.beam.StarterPipeline --master local[4] 
> /root/beamkafkaIO-0.1.jar --runner=SparkRunner
> I am not sure if this issue is a bug about kafkaIO or I was wrong with some 
> parameter settings over spark runner ?
> I really can't handle it, so I hope to get help from you.
> if any further information is needed, i am glad to be informed and will 
> provide to you as soon as possible.
> I will highly appreciate it if you can help me to deal with this issue.
> i am looking forward to hearing from you.
>  
> Sincerely yours,
>  
> Rick
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (BEAM-4632) KafkaIO seems to fail on streaming mode over spark runner

2018-07-30 Thread Rick Lin (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-4632?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16561536#comment-16561536
 ] 

Rick Lin edited comment on BEAM-4632 at 7/30/18 7:00 AM:
-

Hi [~aromanenko],

For the last question, i only change settings of the SparkPipeline in my 
program, as:
{code:java}
SparkPipelineOptions options = PipelineOptionsFactory.fromArgs(args).
 withValidation().as(SparkPipelineOptions.class); 
 options.setRunner(SparkRunner.class); 
 Pipeline p = Pipeline.create(options); 
 options.setMaxRecordsPerBatch(1000L); 

 options.setSparkMaster("spark://ubuntu8:7077");
{code}
{code:java}
PCollection> readData = p.apply(KafkaIO.
read()
.withBootstrapServers("ubuntu7:9092") 
.withTopic("kafkasink") 
.withKeyDeserializer(IntegerDeserializer.class) 
.withValueDeserializer(StringDeserializer.class) 
//.withMaxNumRecords(50) 
.withoutMetadata());{code}
{code:java}
...
...
...
p.run().waitUntilFinish();{code}
, and then i perform the following command line:
{noformat}
mvn compile exec:java -Dexec.mainClass=com.itri.beam.StarterPipeline 
-Pspark2-runner -Dexec.args="--runner=SparkRunner"{noformat}
There is the error:

 
{panel:title=The error is}
[ERROR] Failed to execute goal org.codehaus.mojo:exec-maven-plugin:1.6.0:java 
(default-cli) on project beamkafkaIO2: An exception occured while executing the 
Java class. org.apache.spark.SparkException: Checkpoint RDD has a different 
number of partitions from original RDD. Original RDD [ID: 321, num of 
partitions: 1]; Checkpoint RDD [ID: 324, num of partitions: 0]. -> [Help 1]
{panel}
Thanks

Rick

 


was (Author: ricklin):
Hi [~aromanenko],

For the last question, i only change settings of the SparkPipeline in my 
program, as:
{code:java}
SparkPipelineOptions options = PipelineOptionsFactory.fromArgs(args).
 withValidation().as(SparkPipelineOptions.class); 
 options.setRunner(SparkRunner.class); 
 Pipeline p = Pipeline.create(options); 
 options.setMaxRecordsPerBatch(1000L); 

 options.setSparkMaster("spark://ubuntu8:7077");
{code}
, and then i perform the following command line:
{noformat}
mvn compile exec:java -Dexec.mainClass=com.itri.beam.StarterPipeline 
-Pspark2-runner -Dexec.args="--runner=SparkRunner"{noformat}
There is the error:

 
{panel:title=The error is}
[ERROR] Failed to execute goal org.codehaus.mojo:exec-maven-plugin:1.6.0:java 
(default-cli) on project beamkafkaIO2: An exception occured while executing the 
Java class. org.apache.spark.SparkException: Checkpoint RDD has a different 
number of partitions from original RDD. Original RDD [ID: 321, num of 
partitions: 1]; Checkpoint RDD [ID: 324, num of partitions: 0]. -> [Help 1]
{panel}
Thanks

Rick

 

> KafkaIO seems to fail on streaming mode over spark runner
> -
>
> Key: BEAM-4632
> URL: https://issues.apache.org/jira/browse/BEAM-4632
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-kafka, runner-spark
>Affects Versions: 2.4.0
> Environment: Ubuntu 16.04.4 LTS
>Reporter: Rick Lin
>Assignee: Alexey Romanenko
>Priority: Major
> Attachments: .withMaxNumRecords(50).JPG, 
> DB_table_kafkabeamdata_count.JPG, error UnboundedDataset.java 81(0) has 
> different number of partitions.JPG, the error GeneratedMessageV3.JPG, the 
> error GeneratedMessageV3.JPG
>
>
> Dear sir,
> The following versions of related tools are set in my running program:
> ==
> Beam 2.4.0 (Direct runner and Spark runner)
> Spark 2.2.1 (local mode and standalone mode)
> Kafka: 2.11-0.10.1.1
> scala: 2.11.8
> java: 1.8
> ==
> My programs (KafkaToKafka.java and StarterPipeline.java) are as shown on my 
> github: [https://github.com/LinRick/beamkafkaIO],
> The description of my situation is as:
> {color:#14892c}The kafka broker is working and kafkaIO.read (consumer) is 
> used to capture data from the assigned broker ip 
> ([http://ubuntu7:9092)|http://ubuntu7:9092)./].{color}
> {color:#14892c}The user manual of kafkaIO SDK (on 
> web:[https://beam.apache.org/documentation/sdks/javadoc/2.4.0/])  indicates 
> that the following parameters need to be set, and then the kafkaIO can work 
> well.{color}
>  {color:#FF}.withBootstrapServers("kafka broker ip:9092"){color}
> {color:#FF} .withTopic("kafkasink"){color}
> {color:#FF} .withKeyDeserializer(IntegerDeserializer.class){color}
> {color:#FF} .withValueDeserializer(StringDeserializer.class) {color}
> When i run my program with these settings over direct runner, i can find that 
> my program perform well. In addition, my running program is the streaming 
> mode. *However, i run these codes with the same settings (kafkaIO) over spark 
> runner, and my running program is not the streaming mode and is shutdown*. 
> Here, as 

[jira] [Comment Edited] (BEAM-4632) KafkaIO seems to fail on streaming mode over spark runner

2018-07-30 Thread Rick Lin (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-4632?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16561536#comment-16561536
 ] 

Rick Lin edited comment on BEAM-4632 at 7/30/18 7:00 AM:
-

Hi [~aromanenko],

For the last question, i only change settings of the SparkPipeline in my 
program, as:
{code:java}
SparkPipelineOptions options = PipelineOptionsFactory.fromArgs(args).
 withValidation().as(SparkPipelineOptions.class); 
 options.setRunner(SparkRunner.class); 
 Pipeline p = Pipeline.create(options); 
 options.setMaxRecordsPerBatch(1000L); 

 options.setSparkMaster("spark://ubuntu8:7077");
{code}
{code:java}
PCollection> readData = p.apply(KafkaIO.
read()
.withBootstrapServers("ubuntu7:9092") 
.withTopic("kafkasink") 
.withKeyDeserializer(IntegerDeserializer.class) 
.withValueDeserializer(StringDeserializer.class) 
//.withMaxNumRecords(50) 
.withoutMetadata());{code}
{code:java}
...
...
...
p.run().waitUntilFinish();{code}
, and then i perform the following command line:
{noformat}
mvn compile exec:java -Dexec.mainClass=com.itri.beam.StarterPipeline 
-Pspark2-runner -Dexec.args="--runner=SparkRunner"{noformat}
There is the error:

 
{panel:title=The error is}
[ERROR] Failed to execute goal org.codehaus.mojo:exec-maven-plugin:1.6.0:java 
(default-cli) on project beamkafkaIO2: An exception occured while executing the 
Java class. org.apache.spark.SparkException: Checkpoint RDD has a different 
number of partitions from original RDD. Original RDD [ID: 321, num of 
partitions: 1]; Checkpoint RDD [ID: 324, num of partitions: 0]. -> [Help 1]
{panel}
Thanks

Rick

 


was (Author: ricklin):
Hi [~aromanenko],

For the last question, i only change settings of the SparkPipeline in my 
program, as:
{code:java}
SparkPipelineOptions options = PipelineOptionsFactory.fromArgs(args).
 withValidation().as(SparkPipelineOptions.class); 
 options.setRunner(SparkRunner.class); 
 Pipeline p = Pipeline.create(options); 
 options.setMaxRecordsPerBatch(1000L); 

 options.setSparkMaster("spark://ubuntu8:7077");
{code}
{code:java}
PCollection> readData = p.apply(KafkaIO.
read()
.withBootstrapServers("ubuntu7:9092") 
.withTopic("kafkasink") 
.withKeyDeserializer(IntegerDeserializer.class) 
.withValueDeserializer(StringDeserializer.class) 
//.withMaxNumRecords(50) 
.withoutMetadata());{code}
{code:java}
...
...
...
p.run().waitUntilFinish();{code}
, and then i perform the following command line:
{noformat}
mvn compile exec:java -Dexec.mainClass=com.itri.beam.StarterPipeline 
-Pspark2-runner -Dexec.args="--runner=SparkRunner"{noformat}
There is the error:

 
{panel:title=The error is}
[ERROR] Failed to execute goal org.codehaus.mojo:exec-maven-plugin:1.6.0:java 
(default-cli) on project beamkafkaIO2: An exception occured while executing the 
Java class. org.apache.spark.SparkException: Checkpoint RDD has a different 
number of partitions from original RDD. Original RDD [ID: 321, num of 
partitions: 1]; Checkpoint RDD [ID: 324, num of partitions: 0]. -> [Help 1]
{panel}
Thanks

Rick

 

> KafkaIO seems to fail on streaming mode over spark runner
> -
>
> Key: BEAM-4632
> URL: https://issues.apache.org/jira/browse/BEAM-4632
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-kafka, runner-spark
>Affects Versions: 2.4.0
> Environment: Ubuntu 16.04.4 LTS
>Reporter: Rick Lin
>Assignee: Alexey Romanenko
>Priority: Major
> Attachments: .withMaxNumRecords(50).JPG, 
> DB_table_kafkabeamdata_count.JPG, error UnboundedDataset.java 81(0) has 
> different number of partitions.JPG, the error GeneratedMessageV3.JPG, the 
> error GeneratedMessageV3.JPG
>
>
> Dear sir,
> The following versions of related tools are set in my running program:
> ==
> Beam 2.4.0 (Direct runner and Spark runner)
> Spark 2.2.1 (local mode and standalone mode)
> Kafka: 2.11-0.10.1.1
> scala: 2.11.8
> java: 1.8
> ==
> My programs (KafkaToKafka.java and StarterPipeline.java) are as shown on my 
> github: [https://github.com/LinRick/beamkafkaIO],
> The description of my situation is as:
> {color:#14892c}The kafka broker is working and kafkaIO.read (consumer) is 
> used to capture data from the assigned broker ip 
> ([http://ubuntu7:9092)|http://ubuntu7:9092)./].{color}
> {color:#14892c}The user manual of kafkaIO SDK (on 
> web:[https://beam.apache.org/documentation/sdks/javadoc/2.4.0/])  indicates 
> that the following parameters need to be set, and then the kafkaIO can work 
> well.{color}
>  {color:#FF}.withBootstrapServers("kafka broker ip:9092"){color}
> {color:#FF} .withTopic("kafkasink"){color}
> {color:#FF} .withKeyDeserializer(IntegerDeserializer.class){color}
> {color:#FF} 

[jira] [Comment Edited] (BEAM-4632) KafkaIO seems to fail on streaming mode over spark runner

2018-07-30 Thread Rick Lin (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-4632?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16561536#comment-16561536
 ] 

Rick Lin edited comment on BEAM-4632 at 7/30/18 8:16 AM:
-

Hi [~aromanenko],

For the last question, i only change settings of the SparkPipeline in my 
program, as:
{code:java}
SparkPipelineOptions options = PipelineOptionsFactory.fromArgs(args).
 withValidation().as(SparkPipelineOptions.class); 
 options.setRunner(SparkRunner.class); 
 Pipeline p = Pipeline.create(options); 
 options.setMaxRecordsPerBatch(1000L); 

 options.setSparkMaster("spark://ubuntu8:7077");
{code}
{code:java}
PCollection> readData = p.apply(KafkaIO.
read()
.withBootstrapServers("ubuntu7:9092") 
.withTopic("kafkasink") 
.withKeyDeserializer(IntegerDeserializer.class) 
.withValueDeserializer(StringDeserializer.class) 
//.withMaxNumRecords(50) 
.withoutMetadata());{code}
{code:java}
...
...
...
p.run().waitUntilFinish();{code}
, and then i perform the following command line:
{noformat}
mvn compile exec:java -Dexec.mainClass=com.itri.beam.StarterPipeline 
-Pspark2-runner -Dexec.args="--runner=SparkRunner"{noformat}
There is the error:

 
{panel:title=The error is}
[ERROR] Failed to execute goal org.codehaus.mojo:exec-maven-plugin:1.6.0:java 
(default-cli) on project beamkafkaIO: An exception occured while executing the 
Java class. org.apache.spark.SparkException: Checkpoint RDD has a different 
number of partitions from original RDD. Original RDD [ID: 321, num of 
partitions: 1]; Checkpoint RDD [ID: 324, num of partitions: 0]. -> [Help 1]
{panel}
Thanks

Rick

 


was (Author: ricklin):
Hi [~aromanenko],

For the last question, i only change settings of the SparkPipeline in my 
program, as:
{code:java}
SparkPipelineOptions options = PipelineOptionsFactory.fromArgs(args).
 withValidation().as(SparkPipelineOptions.class); 
 options.setRunner(SparkRunner.class); 
 Pipeline p = Pipeline.create(options); 
 options.setMaxRecordsPerBatch(1000L); 

 options.setSparkMaster("spark://ubuntu8:7077");
{code}
{code:java}
PCollection> readData = p.apply(KafkaIO.
read()
.withBootstrapServers("ubuntu7:9092") 
.withTopic("kafkasink") 
.withKeyDeserializer(IntegerDeserializer.class) 
.withValueDeserializer(StringDeserializer.class) 
//.withMaxNumRecords(50) 
.withoutMetadata());{code}
{code:java}
...
...
...
p.run().waitUntilFinish();{code}
, and then i perform the following command line:
{noformat}
mvn compile exec:java -Dexec.mainClass=com.itri.beam.StarterPipeline 
-Pspark2-runner -Dexec.args="--runner=SparkRunner"{noformat}
There is the error:

 
{panel:title=The error is}
[ERROR] Failed to execute goal org.codehaus.mojo:exec-maven-plugin:1.6.0:java 
(default-cli) on project beamkafkaIO2: An exception occured while executing the 
Java class. org.apache.spark.SparkException: Checkpoint RDD has a different 
number of partitions from original RDD. Original RDD [ID: 321, num of 
partitions: 1]; Checkpoint RDD [ID: 324, num of partitions: 0]. -> [Help 1]
{panel}
Thanks

Rick

 

> KafkaIO seems to fail on streaming mode over spark runner
> -
>
> Key: BEAM-4632
> URL: https://issues.apache.org/jira/browse/BEAM-4632
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-kafka, runner-spark
>Affects Versions: 2.4.0
> Environment: Ubuntu 16.04.4 LTS
>Reporter: Rick Lin
>Assignee: Alexey Romanenko
>Priority: Major
> Attachments: .withMaxNumRecords(50).JPG, 
> DB_table_kafkabeamdata_count.JPG, error UnboundedDataset.java 81(0) has 
> different number of partitions.JPG, the error GeneratedMessageV3.JPG, the 
> error GeneratedMessageV3.JPG
>
>
> Dear sir,
> The following versions of related tools are set in my running program:
> ==
> Beam 2.4.0 (Direct runner and Spark runner)
> Spark 2.2.1 (local mode and standalone mode)
> Kafka: 2.11-0.10.1.1
> scala: 2.11.8
> java: 1.8
> ==
> My programs (KafkaToKafka.java and StarterPipeline.java) are as shown on my 
> github: [https://github.com/LinRick/beamkafkaIO],
> The description of my situation is as:
> {color:#14892c}The kafka broker is working and kafkaIO.read (consumer) is 
> used to capture data from the assigned broker ip 
> ([http://ubuntu7:9092)|http://ubuntu7:9092)./].{color}
> {color:#14892c}The user manual of kafkaIO SDK (on 
> web:[https://beam.apache.org/documentation/sdks/javadoc/2.4.0/])  indicates 
> that the following parameters need to be set, and then the kafkaIO can work 
> well.{color}
>  {color:#FF}.withBootstrapServers("kafka broker ip:9092"){color}
> {color:#FF} .withTopic("kafkasink"){color}
> {color:#FF} .withKeyDeserializer(IntegerDeserializer.class){color}
> {color:#FF} .withValueDeserializer(StringDeserializer.class) 

[jira] [Created] (BEAM-3770) The problem of kafkaIO sdk for data latency

2018-03-01 Thread Rick Lin (JIRA)
Rick Lin created BEAM-3770:
--

 Summary: The problem of kafkaIO sdk for data latency
 Key: BEAM-3770
 URL: https://issues.apache.org/jira/browse/BEAM-3770
 Project: Beam
  Issue Type: Improvement
  Components: io-java-kafka
Affects Versions: 2.0.0
 Environment: For repeating my situation, my running environment is:
OS: Ubuntn 14.04.3 LTS
JAVA: JDK 1.7
Beam 2.0.0 (with Direct runner)
Kafka 2.10-0.10.1.1
Maven 3.5.0, in which dependencies are listed in pom.xml:

  org.apache.beam
  beam-sdks-java-core
  2.0.0
    

   org.apache.beam
  beam-runners-direct-java
  2.0.0
  runtime


org.apache.beam
   beam-sdks-java-io-kafka
   2.0.0   



   org.apache.kafka
   kafka-clients
   0.10.0.1

Reporter: Rick Lin
Assignee: Raghu Angadi
 Fix For: 2.0.0


Dear all,

 I am using the kafkaIO sdk in my project (Beam 2.0.0 with Direct runner).

 With using this sdk, there are a situation about *data* *latency*, and the 
description of situation is in the following.

 The data come from kafak with a fixed speed: 100 data size/ 1 sec.

 I create a fixed window within 1 sec without delay. I found that the data size 
is 70, 80, 104, or greater than or equal to 104.

 After one day, the data latency happens in my running time, and the data size 
will be only 10 in each window.

 *In order to clearly explain it, I also provide my code in the following.* 

" PipelineOptions readOptions = PipelineOptionsFactory._create_();

*final* Pipeline p = Pipeline._create_(readOptions);

 PCollection>> readData =

  p.apply(KafkaIO._read_()   

 .withBootstrapServers("127.0.0.1:9092")

 .withTopic("kafkasink")

 .withKeyDeserializer(StringDeserializer.*class*)

 .withValueDeserializer(StringDeserializer.*class*)

 .withoutMetadata())

 .apply(ParDo._of_(*new* +DoFn, 
TimestampedValue>>()+ {

    @ProcessElement

    *public* *void* test(ProcessContext c) *throws* ParseException {

    String element = c.element().getValue();

    *try* {

  JsonNode arrNode = *new* ObjectMapper().readTree(element);

  String t = arrNode.path("v").findValue("Timestamp").textValue();

  DateTimeFormatter formatter = 
DateTimeFormatter._ofPattern_("MM/dd/ HH:mm:ss.");

     LocalDateTime dateTime = LocalDateTime._parse_(t, formatter);

     java.time.Instant java_instant = 
dateTime.atZone(ZoneId._systemDefault_()).toInstant();

     Instant timestamp  = *new* Instant(java_instant.toEpochMilli());

  c.output(TimestampedValue._of_(c.element(), timestamp));

    } *catch* (JsonGenerationException e) {

    e.printStackTrace();

    } *catch* (JsonMappingException e) {

    e.printStackTrace();

  } *catch* (IOException e) {

    e.printStackTrace();

  }

    }}));

 PCollection>> readDivideData = 
readData.apply(

  Window.>> 
_into_(FixedWindows._of_(Duration._standardSeconds_(1))

  .withOffset(Duration.*_ZERO_*))

  .triggering(AfterWatermark._pastEndOfWindow_()   

 .withLateFirings(AfterProcessingTime._pastFirstElementInPane_()

   .plusDelayOf(Duration.*_ZERO_*)))

  .withAllowedLateness(Duration.*_ZERO_*)

  .discardingFiredPanes());"

 *In addition, the running result is as shown in the following.*

"data-size=104

coming-data-time=2018-02-27 02:00:49.117

window-time=2018-02-27 02:00:49.999

 data-size=78

coming-data-time=2018-02-27 02:00:50.318

window-time=2018-02-27 02:00:50.999

 data-size=104

coming-data-time=2018-02-27 02:00:51.102

window-time=2018-02-27 02:00:51.999

 After one day:

data-size=10

coming-data-time=2018-02-28 02:05:48.217

window-time=2018-03-01 10:35:16.999 "

If you have any idea about the problem (data latency), I am looking forward to 
hearing from you.

Thanks

Rick



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)