[jira] [Created] (BAHIR-221) support Redis dimension table in Flink
zl created BAHIR-221: Summary: support Redis dimension table in Flink Key: BAHIR-221 URL: https://issues.apache.org/jira/browse/BAHIR-221 Project: Bahir Issue Type: New Feature Components: Flink Streaming Connectors Reporter: zl As the most popular in-memory database,Redis is widly used, and in many cases the data we need is stored in Redis. Temporal table is a good way for Flink to access data in redis,but it seems that Redis connector in project bahir-flink doesn't provide a temporal table implementation,and I would like to work on it. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (BAHIR-231) unit test failed in ActiveMQConnectorITCase
zl created BAHIR-231: Summary: unit test failed in ActiveMQConnectorITCase Key: BAHIR-231 URL: https://issues.apache.org/jira/browse/BAHIR-231 Project: Bahir Issue Type: Bug Components: Flink Streaming Connectors Affects Versions: Flink-1.0 Reporter: zl there are some problems with the test methods in org.apache.flink.streaming.connectors.activemq.ActiveMQConnectorITCase. *-* *problem:* method amqTopologyWithQueue and amqTopologyWithTopic will never end *- reason:* the parallelism of flink task should set to 1 *- problem:* method amqTopologyWithCheckpointing failed, here are the exception stack: java.lang.NullPointerException at org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase.addId(MessageAcknowledgingSourceBase.java:197) at org.apache.flink.streaming.connectors.activemq.AMQSource.run(AMQSource.java:229) at org.apache.flink.streaming.connectors.activemq.ActiveMQConnectorITCase$3.run(ActiveMQConnectorITCase.java:189) at java.lang.Thread.run(Thread.java:748) java.lang.IllegalStateException: The AMQSource has not been properly initialized. at org.apache.flink.util.Preconditions.checkState(Preconditions.java:195) at org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase.snapshotState(MessageAcknowledgingSourceBase.java:208) at org.apache.flink.streaming.connectors.activemq.ActiveMQConnectorITCase.amqTopologyWithCheckpointing(ActiveMQConnectorITCase.java:203) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.runners.ParentRunner.run(ParentRunner.java:363) at org.junit.runner.JUnitCore.run(JUnitCore.java:137) at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47) at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242) at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70) *- reason:* before calling snapshotState method, we should call snapshotState method first -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (BAHIR-231) unit test failed in ActiveMQConnectorITCase
[ https://issues.apache.org/jira/browse/BAHIR-231?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zl updated BAHIR-231: - Description: there are some problems with the test methods in org.apache.flink.streaming.connectors.activemq.ActiveMQConnectorITCase. *problem:* method amqTopologyWithQueue and amqTopologyWithTopic will never end *reason:* the parallelism of flink task should set to 1 *problem:* method amqTopologyWithCheckpointing failed, here are the exception stack: java.lang.NullPointerException at org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase.addId(MessageAcknowledgingSourceBase.java:197) at org.apache.flink.streaming.connectors.activemq.AMQSource.run(AMQSource.java:229) at org.apache.flink.streaming.connectors.activemq.ActiveMQConnectorITCase$3.run(ActiveMQConnectorITCase.java:189) at java.lang.Thread.run(Thread.java:748) java.lang.IllegalStateException: The AMQSource has not been properly initialized. at org.apache.flink.util.Preconditions.checkState(Preconditions.java:195) at org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase.snapshotState(MessageAcknowledgingSourceBase.java:208) at org.apache.flink.streaming.connectors.activemq.ActiveMQConnectorITCase.amqTopologyWithCheckpointing(ActiveMQConnectorITCase.java:203) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.runners.ParentRunner.run(ParentRunner.java:363) at org.junit.runner.JUnitCore.run(JUnitCore.java:137) at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47) at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242) at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70) * -- reason:* before calling snapshotState method, we should call snapshotState method first was: there are some problems with the test methods in org.apache.flink.streaming.connectors.activemq.ActiveMQConnectorITCase. *-* *problem:* method amqTopologyWithQueue and amqTopologyWithTopic will never end *- reason:* the parallelism of flink task should set to 1 *- problem:* method amqTopologyWithCheckpointing failed, here are the exception stack: java.lang.NullPointerException at org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase.addId(MessageAcknowledgingSourceBase.java:197) at org.apache.flink.streaming.connectors.activemq.AMQSource.run(AMQSource.java:229) at org.apache.flink.streaming.connectors.activemq.ActiveMQConnectorITCase$3.run(ActiveMQConnectorITCase.java:189) at java.lang.Thread.run(Thread.java:748) java.lang.IllegalStateException: The AMQSource has not been properly initialized. at org.apache.flink.util.Preconditions.checkState(Preconditions.java:195) at org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase.snapshotState(MessageAcknowledgingSourceBase.java:208) at org.apache.flink.streaming.connectors.activemq.ActiveMQConnectorITCase.amqTopologyWithCheckpointing(ActiveMQConnectorITCase.java:203) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.ref
[jira] [Updated] (BAHIR-231) unit test failed in ActiveMQConnectorITCase
[ https://issues.apache.org/jira/browse/BAHIR-231?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zl updated BAHIR-231: - Description: there are some problems with the test methods in org.apache.flink.streaming.connectors.activemq.ActiveMQConnectorITCase. *problem:* method amqTopologyWithQueue and amqTopologyWithTopic will never end *reason:* the parallelism of flink task should set to 1 *problem:* method amqTopologyWithCheckpointing failed, here are the exception stack: java.lang.NullPointerException at org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase.addId(MessageAcknowledgingSourceBase.java:197) at org.apache.flink.streaming.connectors.activemq.AMQSource.run(AMQSource.java:229) at org.apache.flink.streaming.connectors.activemq.ActiveMQConnectorITCase$3.run(ActiveMQConnectorITCase.java:189) at java.lang.Thread.run(Thread.java:748) java.lang.IllegalStateException: The AMQSource has not been properly initialized. at org.apache.flink.util.Preconditions.checkState(Preconditions.java:195) at org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase.snapshotState(MessageAcknowledgingSourceBase.java:208) at org.apache.flink.streaming.connectors.activemq.ActiveMQConnectorITCase.amqTopologyWithCheckpointing(ActiveMQConnectorITCase.java:203) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.runners.ParentRunner.run(ParentRunner.java:363) at org.junit.runner.JUnitCore.run(JUnitCore.java:137) at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47) at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242) at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70) *reason:* before calling snapshotState method, we should call snapshotState method first was: there are some problems with the test methods in org.apache.flink.streaming.connectors.activemq.ActiveMQConnectorITCase. *problem:* method amqTopologyWithQueue and amqTopologyWithTopic will never end *reason:* the parallelism of flink task should set to 1 *problem:* method amqTopologyWithCheckpointing failed, here are the exception stack: java.lang.NullPointerException at org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase.addId(MessageAcknowledgingSourceBase.java:197) at org.apache.flink.streaming.connectors.activemq.AMQSource.run(AMQSource.java:229) at org.apache.flink.streaming.connectors.activemq.ActiveMQConnectorITCase$3.run(ActiveMQConnectorITCase.java:189) at java.lang.Thread.run(Thread.java:748) java.lang.IllegalStateException: The AMQSource has not been properly initialized. at org.apache.flink.util.Preconditions.checkState(Preconditions.java:195) at org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase.snapshotState(MessageAcknowledgingSourceBase.java:208) at org.apache.flink.streaming.connectors.activemq.ActiveMQConnectorITCase.amqTopologyWithCheckpointing(ActiveMQConnectorITCase.java:203) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Met
[jira] [Created] (BAHIR-232) Support Flink Table API & SQL for flink-connector-activemq
zl created BAHIR-232: Summary: Support Flink Table API & SQL for flink-connector-activemq Key: BAHIR-232 URL: https://issues.apache.org/jira/browse/BAHIR-232 Project: Bahir Issue Type: Improvement Components: Flink Streaming Connectors Affects Versions: Flink-1.0 Reporter: zl flink-connector-activemq does not support Flink Table API & SQL, based on the the existing code, it is not very difficult to support this feature, we just need to provide 4 classes: AMQTableSource、AMQTableSink、AMQTableSourceFactory and AMQTableSourceFactory. Then we can connect activemq by the following way: {code:java} // code placeholder {code} String TABLE_CREATE_SQL = "CREATE TABLE books (" + " id int, " + " title varchar, " + " author varchar, " + " price double, " + " qty int " + ") with (" + " 'connector.type' = 'activemq', " + " 'connector.broker-url' = 'vm://localhost?broker.persistent=false', " + " 'connector.destination-type' = 'QUEUE', " + " 'connector.destination-name' = 'source_queue' " + ")";String INITIALIZE_TABLE_SQL = "INSERT INTO books VALUES\n" + "(1001, 'Java public for dummies', 'Tan Ah Teck', 11.11, 11),\n" + "(1002, 'More Java for dummies', 'Tan Ah Teck', 22.22, 22),\n" + "(1003, 'More Java for more dummies', 'Mohammad Ali', 33.33, 33),\n" + "(1004, 'A Cup of Java', 'Kumar', 44.44, 44),\n" + "(1005, 'A Teaspoon of Java', 'Kevin Jones', 55.55, 55),\n" + "(1006, 'A Teaspoon of Java 1.4', 'Kevin Jones', 66.66, 66),\n" + "(1007, 'A Teaspoon of Java 1.5', 'Kevin Jones', 77.77, 77),\n" + "(1008, 'A Teaspoon of Java 1.6', 'Kevin Jones', 88.88, 88),\n" + "(1009, 'A Teaspoon of Java 1.7', 'Kevin Jones', 99.99, 99),\n" + "(1010, 'A Teaspoon of Java 1.8', 'Kevin Jones', 33.33, 100)";String QUERY_TABLE_SQL = "SELECT * FROM books";// create activemq source table tEnv.sqlUpdate(TABLE_CREATE_SQL);// produce event to activemq tEnv.sqlUpdate(INITIALIZE_TABLE_SQL);// consume from activemq Table table = tEnv.sqlQuery(QUERY_TABLE_SQL); -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (BAHIR-232) Support Flink Table API & SQL for flink-connector-activemq
[ https://issues.apache.org/jira/browse/BAHIR-232?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zl updated BAHIR-232: - Description: flink-connector-activemq does not support Flink Table API & SQL, based on the the existing code, it is not very difficult to support this feature, we just need to provide 4 classes: AMQTableSource、AMQTableSink、AMQTableSourceFactory and AMQTableSourceFactory. Then we can connect activemq by the following way: {code:java} String TABLE_CREATE_SQL = "CREATE TABLE books (" + " id int, " + " title varchar, " + " author varchar, " + " price double, " + " qty int " + ") with (" + " 'connector.type' = 'activemq', " + " 'connector.broker-url' = 'vm://localhost?broker.persistent=false', " + " 'connector.destination-type' = 'QUEUE', " + " 'connector.destination-name' = 'source_queue' " + ")";String INITIALIZE_TABLE_SQL = "INSERT INTO books VALUES\n" + "(1001, 'Java public for dummies', 'Tan Ah Teck', 11.11, 11),\n" + "(1002, 'More Java for dummies', 'Tan Ah Teck', 22.22, 22),\n" + "(1003, 'More Java for more dummies', 'Mohammad Ali', 33.33, 33),\n" + "(1004, 'A Cup of Java', 'Kumar', 44.44, 44),\n" + "(1005, 'A Teaspoon of Java', 'Kevin Jones', 55.55, 55),\n" + "(1006, 'A Teaspoon of Java 1.4', 'Kevin Jones', 66.66, 66),\n" + "(1007, 'A Teaspoon of Java 1.5', 'Kevin Jones', 77.77, 77),\n" + "(1008, 'A Teaspoon of Java 1.6', 'Kevin Jones', 88.88, 88),\n" + "(1009, 'A Teaspoon of Java 1.7', 'Kevin Jones', 99.99, 99),\n" + "(1010, 'A Teaspoon of Java 1.8', 'Kevin Jones', 33.33, 100)";String QUERY_TABLE_SQL = "SELECT * FROM books";// create activemq source table tEnv.sqlUpdate(TABLE_CREATE_SQL);// produce event to activemq tEnv.sqlUpdate(INITIALIZE_TABLE_SQL);// consume from activemq Table table = tEnv.sqlQuery(QUERY_TABLE_SQL); {code} was: flink-connector-activemq does not support Flink Table API & SQL, based on the the existing code, it is not very difficult to support this feature, we just need to provide 4 classes: AMQTableSource、AMQTableSink、AMQTableSourceFactory and AMQTableSourceFactory. Then we can connect activemq by the following way: {code:java} // code placeholder {code} String TABLE_CREATE_SQL = "CREATE TABLE books (" + " id int, " + " title varchar, " + " author varchar, " + " price double, " + " qty int " + ") with (" + " 'connector.type' = 'activemq', " + " 'connector.broker-url' = 'vm://localhost?broker.persistent=false', " + " 'connector.destination-type' = 'QUEUE', " + " 'connector.destination-name' = 'source_queue' " + ")";String INITIALIZE_TABLE_SQL = "INSERT INTO books VALUES\n" + "(1001, 'Java public for dummies', 'Tan Ah Teck', 11.11, 11),\n" + "(1002, 'More Java for dummies', 'Tan Ah Teck', 22.22, 22),\n" + "(1003, 'More Java for more dummies', 'Mohammad Ali', 33.33, 33),\n" + "(1004, 'A Cup of Java', 'Kumar', 44.44, 44),\n" + "(1005, 'A Teaspoon of Java', 'Kevin Jones', 55.55, 55),\n" + "(1006, 'A Teaspoon of Java 1.4', 'Kevin Jones', 66.66, 66),\n" + "(1007, 'A Teaspoon of Java 1.5', 'Kevin Jones', 77.77, 77),\n" + "(1008, 'A Teaspoon of Java 1.6', 'Kevin Jones', 88.88, 88),\n" + "(1009, 'A Teaspoon of Java 1.7', 'Kevin Jones', 99.99, 99),\n" + "(1010, 'A Teaspoon of Java 1.8', 'Kevin Jones', 33.33, 100)";String QUERY_TABLE_SQL = "SELECT * FROM books";// create activemq source table tEnv.sqlUpdate(TABLE_CREATE_SQL);// produce event to activemq tEnv.sqlUpdate(INITIALIZE_TABLE_SQL);// consume from activemq Table table = tEnv.sqlQuery(QUERY_TABLE_SQL); > Support Flink Table API & SQL for flink-connector-activemq > -- > > Key: BAHIR-232 > URL: https://issues.apache.org/jira/browse/BAHIR-232 > Project: Bahir > Issue Type: Improvement > Components: Flink Streaming Connectors >Affects Versions: Flink-1.0 >Reporter: zl >Priority: Major > > flink-connector-activemq does not support Flink Table API & SQL, based on the > the existing code, it is not very difficult to support this feature, we just > need to provide 4 classes: AMQTableSource、AMQTableSink、AMQTableSourceFactory > and AMQTableSourceFactory. Then we can connect activemq by the following way: > {code:java} > String TABLE_CREATE_SQL = "CREATE TABLE books (" + " id int, " + " title > varchar, " + " author varchar, " + " price double, " + " qty int " + ") with > (" + " 'connector.type' = 'activemq', " + " 'connector.broker-url' = > 'vm://localhost?broker.persistent=false', " + " 'connector.destination-type' > = 'QUEUE', " + " 'connector.destination-name' = 'source_queue' " + ")";String > INITIALIZE_TABLE_SQL = "INSERT INTO books VALUES\n" + "(1001, 'Java public > for dummies', 'Tan Ah Teck', 11.11, 11),\n" + "(1002, 'More Java for > dummies', 'Tan Ah Teck', 22.22, 22),\n" + "(1003, 'More Java for more > dummies', 'Mohammad Ali', 33.33, 33),\n" + "(1004, 'A Cup of
[jira] [Updated] (BAHIR-232) Support Flink Table API & SQL for flink-connector-activemq
[ https://issues.apache.org/jira/browse/BAHIR-232?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zl updated BAHIR-232: - Description: flink-connector-activemq does not support Flink Table API & SQL, based on the the existing code, it is not very difficult to support this feature, we just need to provide 4 classes: AMQTableSource、AMQTableSink、AMQTableSourceFactory and AMQTableSourceFactory. Then we can connect activemq by the following way: {code:java} String TABLE_CREATE_SQL = "CREATE TABLE books (" + " id int, " + " title varchar, " + " author varchar, " + " price double, " + " qty int " + ") with (" + " 'connector.type' = 'activemq', " + " 'connector.broker-url' = 'vm://localhost?broker.persistent=false', " + " 'connector.destination-type' = 'QUEUE', " + " 'connector.destination-name' = 'source_queue' " + ")"; String INITIALIZE_TABLE_SQL = "INSERT INTO books VALUES\n" + "(1001, 'Java public for dummies', 'Tan Ah Teck', 11.11, 11),\n" + "(1002, 'More Java for dummies', 'Tan Ah Teck', 22.22, 22),\n" + "(1003, 'More Java for more dummies', 'Mohammad Ali', 33.33, 33),\n" + "(1004, 'A Cup of Java', 'Kumar', 44.44, 44),\n" + "(1005, 'A Teaspoon of Java', 'Kevin Jones', 55.55, 55),\n" + "(1006, 'A Teaspoon of Java 1.4', 'Kevin Jones', 66.66, 66),\n" + "(1007, 'A Teaspoon of Java 1.5', 'Kevin Jones', 77.77, 77),\n" + "(1008, 'A Teaspoon of Java 1.6', 'Kevin Jones', 88.88, 88),\n" + "(1009, 'A Teaspoon of Java 1.7', 'Kevin Jones', 99.99, 99),\n" + "(1010, 'A Teaspoon of Java 1.8', 'Kevin Jones', 33.33, 100)"; String QUERY_TABLE_SQL = "SELECT * FROM books"; // create activemq source table tEnv.sqlUpdate(TABLE_CREATE_SQL); // produce event to activemq tEnv.sqlUpdate(INITIALIZE_TABLE_SQL); // consume from activemq Table table = tEnv.sqlQuery(QUERY_TABLE_SQL); {code} was: flink-connector-activemq does not support Flink Table API & SQL, based on the the existing code, it is not very difficult to support this feature, we just need to provide 4 classes: AMQTableSource、AMQTableSink、AMQTableSourceFactory and AMQTableSourceFactory. Then we can connect activemq by the following way: {code:java} String TABLE_CREATE_SQL = "CREATE TABLE books (" + " id int, " + " title varchar, " + " author varchar, " + " price double, " + " qty int " + ") with (" + " 'connector.type' = 'activemq', " + " 'connector.broker-url' = 'vm://localhost?broker.persistent=false', " + " 'connector.destination-type' = 'QUEUE', " + " 'connector.destination-name' = 'source_queue' " + ")";String INITIALIZE_TABLE_SQL = "INSERT INTO books VALUES\n" + "(1001, 'Java public for dummies', 'Tan Ah Teck', 11.11, 11),\n" + "(1002, 'More Java for dummies', 'Tan Ah Teck', 22.22, 22),\n" + "(1003, 'More Java for more dummies', 'Mohammad Ali', 33.33, 33),\n" + "(1004, 'A Cup of Java', 'Kumar', 44.44, 44),\n" + "(1005, 'A Teaspoon of Java', 'Kevin Jones', 55.55, 55),\n" + "(1006, 'A Teaspoon of Java 1.4', 'Kevin Jones', 66.66, 66),\n" + "(1007, 'A Teaspoon of Java 1.5', 'Kevin Jones', 77.77, 77),\n" + "(1008, 'A Teaspoon of Java 1.6', 'Kevin Jones', 88.88, 88),\n" + "(1009, 'A Teaspoon of Java 1.7', 'Kevin Jones', 99.99, 99),\n" + "(1010, 'A Teaspoon of Java 1.8', 'Kevin Jones', 33.33, 100)";String QUERY_TABLE_SQL = "SELECT * FROM books";// create activemq source table tEnv.sqlUpdate(TABLE_CREATE_SQL);// produce event to activemq tEnv.sqlUpdate(INITIALIZE_TABLE_SQL);// consume from activemq Table table = tEnv.sqlQuery(QUERY_TABLE_SQL); {code} > Support Flink Table API & SQL for flink-connector-activemq > -- > > Key: BAHIR-232 > URL: https://issues.apache.org/jira/browse/BAHIR-232 > Project: Bahir > Issue Type: Improvement > Components: Flink Streaming Connectors >Affects Versions: Flink-1.0 >Reporter: zl >Priority: Major > > flink-connector-activemq does not support Flink Table API & SQL, based on the > the existing code, it is not very difficult to support this feature, we just > need to provide 4 classes: AMQTableSource、AMQTableSink、AMQTableSourceFactory > and AMQTableSourceFactory. Then we can connect activemq by the following way: > {code:java} > String TABLE_CREATE_SQL = "CREATE TABLE books (" + > " id int, " + > " title varchar, " + > " author varchar, " + > " price double, " + > " qty int " + > ") with (" + > " 'connector.type' = 'activemq', " + > " 'connector.broker-url' = 'vm://localhost?broker.persistent=false', > " + > " 'connector.destination-type' = 'QUEUE', " + > " 'connector.destination-name' = 'source_queue' " + > ")"; > String INITIALIZE_TABLE_SQL = "INSERT INTO boo
[jira] [Updated] (BAHIR-232) Support Flink Table API & SQL for flink-connector-activemq
[ https://issues.apache.org/jira/browse/BAHIR-232?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zl updated BAHIR-232: - Description: flink-connector-activemq does not support Flink Table API & SQL, based on the the existing code, it is not very difficult to support this feature, we just need to provide 4 classes: AMQTableSource、AMQTableSink、AMQTableSourceFactory and AMQTableSinkFactory. Then we can connect activemq by the following way: {code:java} String TABLE_CREATE_SQL = "CREATE TABLE books (" + " id int, " + " title varchar, " + " author varchar, " + " price double, " + " qty int " + ") with (" + " 'connector.type' = 'activemq', " + " 'connector.broker-url' = 'vm://localhost?broker.persistent=false', " + " 'connector.destination-type' = 'QUEUE', " + " 'connector.destination-name' = 'source_queue' " + ")"; String INITIALIZE_TABLE_SQL = "INSERT INTO books VALUES\n" + "(1001, 'Java public for dummies', 'Tan Ah Teck', 11.11, 11),\n" + "(1002, 'More Java for dummies', 'Tan Ah Teck', 22.22, 22),\n" + "(1003, 'More Java for more dummies', 'Mohammad Ali', 33.33, 33),\n" + "(1004, 'A Cup of Java', 'Kumar', 44.44, 44),\n" + "(1005, 'A Teaspoon of Java', 'Kevin Jones', 55.55, 55),\n" + "(1006, 'A Teaspoon of Java 1.4', 'Kevin Jones', 66.66, 66),\n" + "(1007, 'A Teaspoon of Java 1.5', 'Kevin Jones', 77.77, 77),\n" + "(1008, 'A Teaspoon of Java 1.6', 'Kevin Jones', 88.88, 88),\n" + "(1009, 'A Teaspoon of Java 1.7', 'Kevin Jones', 99.99, 99),\n" + "(1010, 'A Teaspoon of Java 1.8', 'Kevin Jones', 33.33, 100)"; String QUERY_TABLE_SQL = "SELECT * FROM books"; // create activemq source table tEnv.sqlUpdate(TABLE_CREATE_SQL); // produce event to activemq tEnv.sqlUpdate(INITIALIZE_TABLE_SQL); // consume from activemq Table table = tEnv.sqlQuery(QUERY_TABLE_SQL); {code} was: flink-connector-activemq does not support Flink Table API & SQL, based on the the existing code, it is not very difficult to support this feature, we just need to provide 4 classes: AMQTableSource、AMQTableSink、AMQTableSourceFactory and AMQTableSourceFactory. Then we can connect activemq by the following way: {code:java} String TABLE_CREATE_SQL = "CREATE TABLE books (" + " id int, " + " title varchar, " + " author varchar, " + " price double, " + " qty int " + ") with (" + " 'connector.type' = 'activemq', " + " 'connector.broker-url' = 'vm://localhost?broker.persistent=false', " + " 'connector.destination-type' = 'QUEUE', " + " 'connector.destination-name' = 'source_queue' " + ")"; String INITIALIZE_TABLE_SQL = "INSERT INTO books VALUES\n" + "(1001, 'Java public for dummies', 'Tan Ah Teck', 11.11, 11),\n" + "(1002, 'More Java for dummies', 'Tan Ah Teck', 22.22, 22),\n" + "(1003, 'More Java for more dummies', 'Mohammad Ali', 33.33, 33),\n" + "(1004, 'A Cup of Java', 'Kumar', 44.44, 44),\n" + "(1005, 'A Teaspoon of Java', 'Kevin Jones', 55.55, 55),\n" + "(1006, 'A Teaspoon of Java 1.4', 'Kevin Jones', 66.66, 66),\n" + "(1007, 'A Teaspoon of Java 1.5', 'Kevin Jones', 77.77, 77),\n" + "(1008, 'A Teaspoon of Java 1.6', 'Kevin Jones', 88.88, 88),\n" + "(1009, 'A Teaspoon of Java 1.7', 'Kevin Jones', 99.99, 99),\n" + "(1010, 'A Teaspoon of Java 1.8', 'Kevin Jones', 33.33, 100)"; String QUERY_TABLE_SQL = "SELECT * FROM books"; // create activemq source table tEnv.sqlUpdate(TABLE_CREATE_SQL); // produce event to activemq tEnv.sqlUpdate(INITIALIZE_TABLE_SQL); // consume from activemq Table table = tEnv.sqlQuery(QUERY_TABLE_SQL); {code} > Support Flink Table API & SQL for flink-connector-activemq > -- > > Key: BAHIR-232 > URL: https://issues.apache.org/jira/browse/BAHIR-232 > Project: Bahir > Issue Type: Improvement > Components: Flink Streaming Connectors >Affects Versions: Flink-1.0 >Reporter: zl >Priority: Major > > flink-connector-activemq does not support Flink Table API & SQL, based on the > the existing code, it is not very difficult to support this feature, we just > need to provide 4 classes: AMQTableSource、AMQTableSink、AMQTableSourceFactory > and AMQTableSinkFactory. Then we can connect activemq by the following way: > {code:java} > String TABLE_CREATE_SQL = "CREATE TABLE books (" + > " id int, " + > " title varchar, " + > " author varchar, " + > " price double, " + > " qty int " + > ") with (" + > " 'connector.type' = 'activemq', " + > " 'connector.broker-url' = 'vm://localhost?broker.persistent=false', > " + > " 'connector.d