[jira] [Created] (BAHIR-221) support Redis dimension table in Flink

2019-12-17 Thread zl (Jira)
zl created BAHIR-221:


 Summary: support Redis dimension table in Flink
 Key: BAHIR-221
 URL: https://issues.apache.org/jira/browse/BAHIR-221
 Project: Bahir
  Issue Type: New Feature
  Components: Flink Streaming Connectors
Reporter: zl


As the most popular in-memory database,Redis is widly used, and in many cases 
the data we need is stored in Redis. Temporal table is a good way for Flink to 
access data in redis,but it seems that Redis connector in project bahir-flink 
doesn't provide a temporal table implementation,and I would like to work on it.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (BAHIR-231) unit test failed in ActiveMQConnectorITCase

2020-05-22 Thread zl (Jira)
zl created BAHIR-231:


 Summary: unit test failed in ActiveMQConnectorITCase
 Key: BAHIR-231
 URL: https://issues.apache.org/jira/browse/BAHIR-231
 Project: Bahir
  Issue Type: Bug
  Components: Flink Streaming Connectors
Affects Versions: Flink-1.0
Reporter: zl


there are some problems with the test methods in 
org.apache.flink.streaming.connectors.activemq.ActiveMQConnectorITCase.

*-* *problem:* method amqTopologyWithQueue and amqTopologyWithTopic will never 
end
*- reason:* the parallelism of flink task should set to 1

*- problem:* method amqTopologyWithCheckpointing failed, here are the exception 
stack:
java.lang.NullPointerException at 
org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase.addId(MessageAcknowledgingSourceBase.java:197)
 at 
org.apache.flink.streaming.connectors.activemq.AMQSource.run(AMQSource.java:229)
 at 
org.apache.flink.streaming.connectors.activemq.ActiveMQConnectorITCase$3.run(ActiveMQConnectorITCase.java:189)
 at java.lang.Thread.run(Thread.java:748)
java.lang.IllegalStateException: The AMQSource has not been properly 
initialized.
 at org.apache.flink.util.Preconditions.checkState(Preconditions.java:195) at 
org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase.snapshotState(MessageAcknowledgingSourceBase.java:208)
 at 
org.apache.flink.streaming.connectors.activemq.ActiveMQConnectorITCase.amqTopologyWithCheckpointing(ActiveMQConnectorITCase.java:203)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498) at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
 at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
 at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
 at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
 at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
 at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
 at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) at 
org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) at 
org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) at 
org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) at 
org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) 
at org.junit.runners.ParentRunner.run(ParentRunner.java:363) at 
org.junit.runner.JUnitCore.run(JUnitCore.java:137) at 
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
 at 
com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
 at 
com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
 at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
*- reason:* before calling snapshotState method, we should call snapshotState 
method first



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (BAHIR-231) unit test failed in ActiveMQConnectorITCase

2020-05-22 Thread zl (Jira)


 [ 
https://issues.apache.org/jira/browse/BAHIR-231?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zl updated BAHIR-231:
-
Description: 
there are some problems with the test methods in 
org.apache.flink.streaming.connectors.activemq.ActiveMQConnectorITCase.
 

*problem:* method amqTopologyWithQueue and amqTopologyWithTopic will never end

*reason:* the parallelism of flink task should set to 1
 

*problem:* method amqTopologyWithCheckpointing failed, here are the exception 
stack:
 java.lang.NullPointerException at 
org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase.addId(MessageAcknowledgingSourceBase.java:197)
 at 
org.apache.flink.streaming.connectors.activemq.AMQSource.run(AMQSource.java:229)
 at 
org.apache.flink.streaming.connectors.activemq.ActiveMQConnectorITCase$3.run(ActiveMQConnectorITCase.java:189)
 at java.lang.Thread.run(Thread.java:748)
 java.lang.IllegalStateException: The AMQSource has not been properly 
initialized.
 at org.apache.flink.util.Preconditions.checkState(Preconditions.java:195) at 
org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase.snapshotState(MessageAcknowledgingSourceBase.java:208)
 at 
org.apache.flink.streaming.connectors.activemq.ActiveMQConnectorITCase.amqTopologyWithCheckpointing(ActiveMQConnectorITCase.java:203)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498) at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
 at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
 at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
 at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
 at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
 at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
 at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) at 
org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) at 
org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) at 
org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) at 
org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) 
at org.junit.runners.ParentRunner.run(ParentRunner.java:363) at 
org.junit.runner.JUnitCore.run(JUnitCore.java:137) at 
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
 at 
com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
 at 
com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
 at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
 * 
 -- reason:* before calling snapshotState method, we should call snapshotState 
method first

  was:
there are some problems with the test methods in 
org.apache.flink.streaming.connectors.activemq.ActiveMQConnectorITCase.

*-* *problem:* method amqTopologyWithQueue and amqTopologyWithTopic will never 
end
*- reason:* the parallelism of flink task should set to 1

*- problem:* method amqTopologyWithCheckpointing failed, here are the exception 
stack:
java.lang.NullPointerException at 
org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase.addId(MessageAcknowledgingSourceBase.java:197)
 at 
org.apache.flink.streaming.connectors.activemq.AMQSource.run(AMQSource.java:229)
 at 
org.apache.flink.streaming.connectors.activemq.ActiveMQConnectorITCase$3.run(ActiveMQConnectorITCase.java:189)
 at java.lang.Thread.run(Thread.java:748)
java.lang.IllegalStateException: The AMQSource has not been properly 
initialized.
 at org.apache.flink.util.Preconditions.checkState(Preconditions.java:195) at 
org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase.snapshotState(MessageAcknowledgingSourceBase.java:208)
 at 
org.apache.flink.streaming.connectors.activemq.ActiveMQConnectorITCase.amqTopologyWithCheckpointing(ActiveMQConnectorITCase.java:203)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.ref

[jira] [Updated] (BAHIR-231) unit test failed in ActiveMQConnectorITCase

2020-05-22 Thread zl (Jira)


 [ 
https://issues.apache.org/jira/browse/BAHIR-231?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zl updated BAHIR-231:
-
Description: 
there are some problems with the test methods in 
org.apache.flink.streaming.connectors.activemq.ActiveMQConnectorITCase.
 

*problem:* method amqTopologyWithQueue and amqTopologyWithTopic will never end

*reason:* the parallelism of flink task should set to 1
 

*problem:* method amqTopologyWithCheckpointing failed, here are the exception 
stack:
 java.lang.NullPointerException at 
org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase.addId(MessageAcknowledgingSourceBase.java:197)
 at 
org.apache.flink.streaming.connectors.activemq.AMQSource.run(AMQSource.java:229)
 at 
org.apache.flink.streaming.connectors.activemq.ActiveMQConnectorITCase$3.run(ActiveMQConnectorITCase.java:189)
 at java.lang.Thread.run(Thread.java:748)
 java.lang.IllegalStateException: The AMQSource has not been properly 
initialized.
 at org.apache.flink.util.Preconditions.checkState(Preconditions.java:195) at 
org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase.snapshotState(MessageAcknowledgingSourceBase.java:208)
 at 
org.apache.flink.streaming.connectors.activemq.ActiveMQConnectorITCase.amqTopologyWithCheckpointing(ActiveMQConnectorITCase.java:203)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498) at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
 at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
 at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
 at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
 at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
 at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
 at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) at 
org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) at 
org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) at 
org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) at 
org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) 
at org.junit.runners.ParentRunner.run(ParentRunner.java:363) at 
org.junit.runner.JUnitCore.run(JUnitCore.java:137) at 
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
 at 
com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
 at 
com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
 at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)

*reason:* before calling snapshotState method, we should call snapshotState 
method first

  was:
there are some problems with the test methods in 
org.apache.flink.streaming.connectors.activemq.ActiveMQConnectorITCase.
 

*problem:* method amqTopologyWithQueue and amqTopologyWithTopic will never end

*reason:* the parallelism of flink task should set to 1
 

*problem:* method amqTopologyWithCheckpointing failed, here are the exception 
stack:
 java.lang.NullPointerException at 
org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase.addId(MessageAcknowledgingSourceBase.java:197)
 at 
org.apache.flink.streaming.connectors.activemq.AMQSource.run(AMQSource.java:229)
 at 
org.apache.flink.streaming.connectors.activemq.ActiveMQConnectorITCase$3.run(ActiveMQConnectorITCase.java:189)
 at java.lang.Thread.run(Thread.java:748)
 java.lang.IllegalStateException: The AMQSource has not been properly 
initialized.
 at org.apache.flink.util.Preconditions.checkState(Preconditions.java:195) at 
org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase.snapshotState(MessageAcknowledgingSourceBase.java:208)
 at 
org.apache.flink.streaming.connectors.activemq.ActiveMQConnectorITCase.amqTopologyWithCheckpointing(ActiveMQConnectorITCase.java:203)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Met

[jira] [Created] (BAHIR-232) Support Flink Table API & SQL for flink-connector-activemq

2020-05-22 Thread zl (Jira)
zl created BAHIR-232:


 Summary: Support Flink Table API & SQL for flink-connector-activemq
 Key: BAHIR-232
 URL: https://issues.apache.org/jira/browse/BAHIR-232
 Project: Bahir
  Issue Type: Improvement
  Components: Flink Streaming Connectors
Affects Versions: Flink-1.0
Reporter: zl


flink-connector-activemq does not support Flink Table API & SQL, based on the 
the existing code, it is not very difficult to support this feature, we just 
need to provide 4 classes: AMQTableSource、AMQTableSink、AMQTableSourceFactory 
and AMQTableSourceFactory. Then we can connect activemq by the following way:
{code:java}
// code placeholder
{code}
String TABLE_CREATE_SQL = "CREATE TABLE books (" + " id int, " + " title 
varchar, " + " author varchar, " + " price double, " + " qty int " + ") with (" 
+ " 'connector.type' = 'activemq', " + " 'connector.broker-url' = 
'vm://localhost?broker.persistent=false', " + " 'connector.destination-type' = 
'QUEUE', " + " 'connector.destination-name' = 'source_queue' " + ")";String 
INITIALIZE_TABLE_SQL = "INSERT INTO books VALUES\n" + "(1001, 'Java public for 
dummies', 'Tan Ah Teck', 11.11, 11),\n" + "(1002, 'More Java for dummies', 'Tan 
Ah Teck', 22.22, 22),\n" + "(1003, 'More Java for more dummies', 'Mohammad 
Ali', 33.33, 33),\n" + "(1004, 'A Cup of Java', 'Kumar', 44.44, 44),\n" + 
"(1005, 'A Teaspoon of Java', 'Kevin Jones', 55.55, 55),\n" + "(1006, 'A 
Teaspoon of Java 1.4', 'Kevin Jones', 66.66, 66),\n" + "(1007, 'A Teaspoon of 
Java 1.5', 'Kevin Jones', 77.77, 77),\n" + "(1008, 'A Teaspoon of Java 1.6', 
'Kevin Jones', 88.88, 88),\n" + "(1009, 'A Teaspoon of Java 1.7', 'Kevin 
Jones', 99.99, 99),\n" + "(1010, 'A Teaspoon of Java 1.8', 'Kevin Jones', 
33.33, 100)";String QUERY_TABLE_SQL = "SELECT * FROM books";// create activemq 
source table tEnv.sqlUpdate(TABLE_CREATE_SQL);// produce event to activemq 
tEnv.sqlUpdate(INITIALIZE_TABLE_SQL);// consume from activemq Table table = 
tEnv.sqlQuery(QUERY_TABLE_SQL);



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (BAHIR-232) Support Flink Table API & SQL for flink-connector-activemq

2020-05-22 Thread zl (Jira)


 [ 
https://issues.apache.org/jira/browse/BAHIR-232?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zl updated BAHIR-232:
-
Description: 
flink-connector-activemq does not support Flink Table API & SQL, based on the 
the existing code, it is not very difficult to support this feature, we just 
need to provide 4 classes: AMQTableSource、AMQTableSink、AMQTableSourceFactory 
and AMQTableSourceFactory. Then we can connect activemq by the following way:
{code:java}
String TABLE_CREATE_SQL = "CREATE TABLE books (" + " id int, " + " title 
varchar, " + " author varchar, " + " price double, " + " qty int " + ") with (" 
+ " 'connector.type' = 'activemq', " + " 'connector.broker-url' = 
'vm://localhost?broker.persistent=false', " + " 'connector.destination-type' = 
'QUEUE', " + " 'connector.destination-name' = 'source_queue' " + ")";String 
INITIALIZE_TABLE_SQL = "INSERT INTO books VALUES\n" + "(1001, 'Java public for 
dummies', 'Tan Ah Teck', 11.11, 11),\n" + "(1002, 'More Java for dummies', 'Tan 
Ah Teck', 22.22, 22),\n" + "(1003, 'More Java for more dummies', 'Mohammad 
Ali', 33.33, 33),\n" + "(1004, 'A Cup of Java', 'Kumar', 44.44, 44),\n" + 
"(1005, 'A Teaspoon of Java', 'Kevin Jones', 55.55, 55),\n" + "(1006, 'A 
Teaspoon of Java 1.4', 'Kevin Jones', 66.66, 66),\n" + "(1007, 'A Teaspoon of 
Java 1.5', 'Kevin Jones', 77.77, 77),\n" + "(1008, 'A Teaspoon of Java 1.6', 
'Kevin Jones', 88.88, 88),\n" + "(1009, 'A Teaspoon of Java 1.7', 'Kevin 
Jones', 99.99, 99),\n" + "(1010, 'A Teaspoon of Java 1.8', 'Kevin Jones', 
33.33, 100)";String QUERY_TABLE_SQL = "SELECT * FROM books";// create activemq 
source table tEnv.sqlUpdate(TABLE_CREATE_SQL);// produce event to activemq 
tEnv.sqlUpdate(INITIALIZE_TABLE_SQL);// consume from activemq Table table = 
tEnv.sqlQuery(QUERY_TABLE_SQL);
{code}
 

  was:
flink-connector-activemq does not support Flink Table API & SQL, based on the 
the existing code, it is not very difficult to support this feature, we just 
need to provide 4 classes: AMQTableSource、AMQTableSink、AMQTableSourceFactory 
and AMQTableSourceFactory. Then we can connect activemq by the following way:
{code:java}
// code placeholder
{code}
String TABLE_CREATE_SQL = "CREATE TABLE books (" + " id int, " + " title 
varchar, " + " author varchar, " + " price double, " + " qty int " + ") with (" 
+ " 'connector.type' = 'activemq', " + " 'connector.broker-url' = 
'vm://localhost?broker.persistent=false', " + " 'connector.destination-type' = 
'QUEUE', " + " 'connector.destination-name' = 'source_queue' " + ")";String 
INITIALIZE_TABLE_SQL = "INSERT INTO books VALUES\n" + "(1001, 'Java public for 
dummies', 'Tan Ah Teck', 11.11, 11),\n" + "(1002, 'More Java for dummies', 'Tan 
Ah Teck', 22.22, 22),\n" + "(1003, 'More Java for more dummies', 'Mohammad 
Ali', 33.33, 33),\n" + "(1004, 'A Cup of Java', 'Kumar', 44.44, 44),\n" + 
"(1005, 'A Teaspoon of Java', 'Kevin Jones', 55.55, 55),\n" + "(1006, 'A 
Teaspoon of Java 1.4', 'Kevin Jones', 66.66, 66),\n" + "(1007, 'A Teaspoon of 
Java 1.5', 'Kevin Jones', 77.77, 77),\n" + "(1008, 'A Teaspoon of Java 1.6', 
'Kevin Jones', 88.88, 88),\n" + "(1009, 'A Teaspoon of Java 1.7', 'Kevin 
Jones', 99.99, 99),\n" + "(1010, 'A Teaspoon of Java 1.8', 'Kevin Jones', 
33.33, 100)";String QUERY_TABLE_SQL = "SELECT * FROM books";// create activemq 
source table tEnv.sqlUpdate(TABLE_CREATE_SQL);// produce event to activemq 
tEnv.sqlUpdate(INITIALIZE_TABLE_SQL);// consume from activemq Table table = 
tEnv.sqlQuery(QUERY_TABLE_SQL);


> Support Flink Table API & SQL for flink-connector-activemq
> --
>
> Key: BAHIR-232
> URL: https://issues.apache.org/jira/browse/BAHIR-232
> Project: Bahir
>  Issue Type: Improvement
>  Components: Flink Streaming Connectors
>Affects Versions: Flink-1.0
>Reporter: zl
>Priority: Major
>
> flink-connector-activemq does not support Flink Table API & SQL, based on the 
> the existing code, it is not very difficult to support this feature, we just 
> need to provide 4 classes: AMQTableSource、AMQTableSink、AMQTableSourceFactory 
> and AMQTableSourceFactory. Then we can connect activemq by the following way:
> {code:java}
> String TABLE_CREATE_SQL = "CREATE TABLE books (" + " id int, " + " title 
> varchar, " + " author varchar, " + " price double, " + " qty int " + ") with 
> (" + " 'connector.type' = 'activemq', " + " 'connector.broker-url' = 
> 'vm://localhost?broker.persistent=false', " + " 'connector.destination-type' 
> = 'QUEUE', " + " 'connector.destination-name' = 'source_queue' " + ")";String 
> INITIALIZE_TABLE_SQL = "INSERT INTO books VALUES\n" + "(1001, 'Java public 
> for dummies', 'Tan Ah Teck', 11.11, 11),\n" + "(1002, 'More Java for 
> dummies', 'Tan Ah Teck', 22.22, 22),\n" + "(1003, 'More Java for more 
> dummies', 'Mohammad Ali', 33.33, 33),\n" + "(1004, 'A Cup of

[jira] [Updated] (BAHIR-232) Support Flink Table API & SQL for flink-connector-activemq

2020-05-22 Thread zl (Jira)


 [ 
https://issues.apache.org/jira/browse/BAHIR-232?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zl updated BAHIR-232:
-
Description: 
flink-connector-activemq does not support Flink Table API & SQL, based on the 
the existing code, it is not very difficult to support this feature, we just 
need to provide 4 classes: AMQTableSource、AMQTableSink、AMQTableSourceFactory 
and AMQTableSourceFactory. Then we can connect activemq by the following way:
{code:java}
String TABLE_CREATE_SQL = "CREATE TABLE books (" +
" id int, " +
" title varchar, " +
" author varchar, " +
" price double, " +
" qty int " +
") with (" +
" 'connector.type' = 'activemq', " +
" 'connector.broker-url' = 'vm://localhost?broker.persistent=false', " +
" 'connector.destination-type' = 'QUEUE', " +
" 'connector.destination-name' = 'source_queue' " +
")";

String INITIALIZE_TABLE_SQL = "INSERT INTO books VALUES\n" +
"(1001, 'Java public for dummies', 'Tan Ah Teck', 11.11, 11),\n" +
"(1002, 'More Java for dummies', 'Tan Ah Teck', 22.22, 22),\n" +
"(1003, 'More Java for more dummies', 'Mohammad Ali', 33.33, 33),\n" +
"(1004, 'A Cup of Java', 'Kumar', 44.44, 44),\n" +
"(1005, 'A Teaspoon of Java', 'Kevin Jones', 55.55, 55),\n" +
"(1006, 'A Teaspoon of Java 1.4', 'Kevin Jones', 66.66, 66),\n" +
"(1007, 'A Teaspoon of Java 1.5', 'Kevin Jones', 77.77, 77),\n" +
"(1008, 'A Teaspoon of Java 1.6', 'Kevin Jones', 88.88, 88),\n" +
"(1009, 'A Teaspoon of Java 1.7', 'Kevin Jones', 99.99, 99),\n" +
"(1010, 'A Teaspoon of Java 1.8', 'Kevin Jones', 33.33, 100)";

String QUERY_TABLE_SQL = "SELECT * FROM books";

// create activemq source table
tEnv.sqlUpdate(TABLE_CREATE_SQL);

// produce event to activemq
tEnv.sqlUpdate(INITIALIZE_TABLE_SQL);

// consume from activemq
Table table = tEnv.sqlQuery(QUERY_TABLE_SQL);
{code}
 

  was:
flink-connector-activemq does not support Flink Table API & SQL, based on the 
the existing code, it is not very difficult to support this feature, we just 
need to provide 4 classes: AMQTableSource、AMQTableSink、AMQTableSourceFactory 
and AMQTableSourceFactory. Then we can connect activemq by the following way:
{code:java}
String TABLE_CREATE_SQL = "CREATE TABLE books (" + " id int, " + " title 
varchar, " + " author varchar, " + " price double, " + " qty int " + ") with (" 
+ " 'connector.type' = 'activemq', " + " 'connector.broker-url' = 
'vm://localhost?broker.persistent=false', " + " 'connector.destination-type' = 
'QUEUE', " + " 'connector.destination-name' = 'source_queue' " + ")";String 
INITIALIZE_TABLE_SQL = "INSERT INTO books VALUES\n" + "(1001, 'Java public for 
dummies', 'Tan Ah Teck', 11.11, 11),\n" + "(1002, 'More Java for dummies', 'Tan 
Ah Teck', 22.22, 22),\n" + "(1003, 'More Java for more dummies', 'Mohammad 
Ali', 33.33, 33),\n" + "(1004, 'A Cup of Java', 'Kumar', 44.44, 44),\n" + 
"(1005, 'A Teaspoon of Java', 'Kevin Jones', 55.55, 55),\n" + "(1006, 'A 
Teaspoon of Java 1.4', 'Kevin Jones', 66.66, 66),\n" + "(1007, 'A Teaspoon of 
Java 1.5', 'Kevin Jones', 77.77, 77),\n" + "(1008, 'A Teaspoon of Java 1.6', 
'Kevin Jones', 88.88, 88),\n" + "(1009, 'A Teaspoon of Java 1.7', 'Kevin 
Jones', 99.99, 99),\n" + "(1010, 'A Teaspoon of Java 1.8', 'Kevin Jones', 
33.33, 100)";String QUERY_TABLE_SQL = "SELECT * FROM books";// create activemq 
source table tEnv.sqlUpdate(TABLE_CREATE_SQL);// produce event to activemq 
tEnv.sqlUpdate(INITIALIZE_TABLE_SQL);// consume from activemq Table table = 
tEnv.sqlQuery(QUERY_TABLE_SQL);
{code}
 


> Support Flink Table API & SQL for flink-connector-activemq
> --
>
> Key: BAHIR-232
> URL: https://issues.apache.org/jira/browse/BAHIR-232
> Project: Bahir
>  Issue Type: Improvement
>  Components: Flink Streaming Connectors
>Affects Versions: Flink-1.0
>Reporter: zl
>Priority: Major
>
> flink-connector-activemq does not support Flink Table API & SQL, based on the 
> the existing code, it is not very difficult to support this feature, we just 
> need to provide 4 classes: AMQTableSource、AMQTableSink、AMQTableSourceFactory 
> and AMQTableSourceFactory. Then we can connect activemq by the following way:
> {code:java}
> String TABLE_CREATE_SQL = "CREATE TABLE books (" +
> " id int, " +
> " title varchar, " +
> " author varchar, " +
> " price double, " +
> " qty int " +
> ") with (" +
> " 'connector.type' = 'activemq', " +
> " 'connector.broker-url' = 'vm://localhost?broker.persistent=false', 
> " +
> " 'connector.destination-type' = 'QUEUE', " +
> " 'connector.destination-name' = 'source_queue' " +
> ")";
> String INITIALIZE_TABLE_SQL = "INSERT INTO boo

[jira] [Updated] (BAHIR-232) Support Flink Table API & SQL for flink-connector-activemq

2020-05-22 Thread zl (Jira)


 [ 
https://issues.apache.org/jira/browse/BAHIR-232?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zl updated BAHIR-232:
-
Description: 
flink-connector-activemq does not support Flink Table API & SQL, based on the 
the existing code, it is not very difficult to support this feature, we just 
need to provide 4 classes: AMQTableSource、AMQTableSink、AMQTableSourceFactory 
and AMQTableSinkFactory. Then we can connect activemq by the following way:
{code:java}
String TABLE_CREATE_SQL = "CREATE TABLE books (" +
" id int, " +
" title varchar, " +
" author varchar, " +
" price double, " +
" qty int " +
") with (" +
" 'connector.type' = 'activemq', " +
" 'connector.broker-url' = 'vm://localhost?broker.persistent=false', " +
" 'connector.destination-type' = 'QUEUE', " +
" 'connector.destination-name' = 'source_queue' " +
")";

String INITIALIZE_TABLE_SQL = "INSERT INTO books VALUES\n" +
"(1001, 'Java public for dummies', 'Tan Ah Teck', 11.11, 11),\n" +
"(1002, 'More Java for dummies', 'Tan Ah Teck', 22.22, 22),\n" +
"(1003, 'More Java for more dummies', 'Mohammad Ali', 33.33, 33),\n" +
"(1004, 'A Cup of Java', 'Kumar', 44.44, 44),\n" +
"(1005, 'A Teaspoon of Java', 'Kevin Jones', 55.55, 55),\n" +
"(1006, 'A Teaspoon of Java 1.4', 'Kevin Jones', 66.66, 66),\n" +
"(1007, 'A Teaspoon of Java 1.5', 'Kevin Jones', 77.77, 77),\n" +
"(1008, 'A Teaspoon of Java 1.6', 'Kevin Jones', 88.88, 88),\n" +
"(1009, 'A Teaspoon of Java 1.7', 'Kevin Jones', 99.99, 99),\n" +
"(1010, 'A Teaspoon of Java 1.8', 'Kevin Jones', 33.33, 100)";

String QUERY_TABLE_SQL = "SELECT * FROM books";

// create activemq source table
tEnv.sqlUpdate(TABLE_CREATE_SQL);

// produce event to activemq
tEnv.sqlUpdate(INITIALIZE_TABLE_SQL);

// consume from activemq
Table table = tEnv.sqlQuery(QUERY_TABLE_SQL);
{code}
 

  was:
flink-connector-activemq does not support Flink Table API & SQL, based on the 
the existing code, it is not very difficult to support this feature, we just 
need to provide 4 classes: AMQTableSource、AMQTableSink、AMQTableSourceFactory 
and AMQTableSourceFactory. Then we can connect activemq by the following way:
{code:java}
String TABLE_CREATE_SQL = "CREATE TABLE books (" +
" id int, " +
" title varchar, " +
" author varchar, " +
" price double, " +
" qty int " +
") with (" +
" 'connector.type' = 'activemq', " +
" 'connector.broker-url' = 'vm://localhost?broker.persistent=false', " +
" 'connector.destination-type' = 'QUEUE', " +
" 'connector.destination-name' = 'source_queue' " +
")";

String INITIALIZE_TABLE_SQL = "INSERT INTO books VALUES\n" +
"(1001, 'Java public for dummies', 'Tan Ah Teck', 11.11, 11),\n" +
"(1002, 'More Java for dummies', 'Tan Ah Teck', 22.22, 22),\n" +
"(1003, 'More Java for more dummies', 'Mohammad Ali', 33.33, 33),\n" +
"(1004, 'A Cup of Java', 'Kumar', 44.44, 44),\n" +
"(1005, 'A Teaspoon of Java', 'Kevin Jones', 55.55, 55),\n" +
"(1006, 'A Teaspoon of Java 1.4', 'Kevin Jones', 66.66, 66),\n" +
"(1007, 'A Teaspoon of Java 1.5', 'Kevin Jones', 77.77, 77),\n" +
"(1008, 'A Teaspoon of Java 1.6', 'Kevin Jones', 88.88, 88),\n" +
"(1009, 'A Teaspoon of Java 1.7', 'Kevin Jones', 99.99, 99),\n" +
"(1010, 'A Teaspoon of Java 1.8', 'Kevin Jones', 33.33, 100)";

String QUERY_TABLE_SQL = "SELECT * FROM books";

// create activemq source table
tEnv.sqlUpdate(TABLE_CREATE_SQL);

// produce event to activemq
tEnv.sqlUpdate(INITIALIZE_TABLE_SQL);

// consume from activemq
Table table = tEnv.sqlQuery(QUERY_TABLE_SQL);
{code}
 


> Support Flink Table API & SQL for flink-connector-activemq
> --
>
> Key: BAHIR-232
> URL: https://issues.apache.org/jira/browse/BAHIR-232
> Project: Bahir
>  Issue Type: Improvement
>  Components: Flink Streaming Connectors
>Affects Versions: Flink-1.0
>Reporter: zl
>Priority: Major
>
> flink-connector-activemq does not support Flink Table API & SQL, based on the 
> the existing code, it is not very difficult to support this feature, we just 
> need to provide 4 classes: AMQTableSource、AMQTableSink、AMQTableSourceFactory 
> and AMQTableSinkFactory. Then we can connect activemq by the following way:
> {code:java}
> String TABLE_CREATE_SQL = "CREATE TABLE books (" +
> " id int, " +
> " title varchar, " +
> " author varchar, " +
> " price double, " +
> " qty int " +
> ") with (" +
> " 'connector.type' = 'activemq', " +
> " 'connector.broker-url' = 'vm://localhost?broker.persistent=false', 
> " +
> " 'connector.d