Hi all,
  I want to write a program, a thread read the real-time message from
/var/log/messages and write them to kafaka, and it works. Then I want to use
sql of flink to query the messages, and the following are my code:

-----------------------------------------------------------------------------------------------------------

        // set up the execution environment
        final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(2);

        StreamTableEnvironment tableEnv =
TableEnvironment.getTableEnvironment(env);


        DataStream<String> text = env.addSource(new
FlinkKafkaConsumer08<String>("test", new SimpleStringSchema(), properties));
        DataStream<Tuple4&lt;Long, String, String, String>> messages =
                text.flatMap(new Tokenizer());
        tableEnv.registerDataStream("Syslogs", messages, "time, user,
process, msg");

        Table result = tableEnv.sql(
                "SELECT STREAM msg FROM Syslogs WHERE msg LIKE '%system%'"
        );


        TableSink sink = new CsvTableSink("/home/jiecxy/Desktop/test.csv",
"|");
        result.writeToSink(sink);

        // execute program
        env.execute();  
-----------------------------------------------------------------------------------------------------------
Note: the class Tokenizer is to transfer the log to four parts. Like this:
   Sep  6 09:28:01 master systemd: Stopping user-988.slice.
to
  Tuple4<time, master, systemd,  Stopping user-988.slice.>


But when I ran it use Flink:
  bin/flink run readlog.jar

I got the exception...  What should I do?


Starting execution of program

------------------------------------------------------------
 The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error.
        at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:524)
        at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
        at
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:331)
        at 
org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:777)
        at org.apache.flink.client.CliFrontend.run(CliFrontend.java:253)
        at
org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1005)
        at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1048)
Caused by: java.lang.RuntimeException: java.sql.SQLException: No suitable
driver found for jdbc:calcite:
        at org.apache.calcite.tools.Frameworks.withPrepare(Frameworks.java:151)
        at org.apache.calcite.tools.Frameworks.withPlanner(Frameworks.java:106)
        at org.apache.calcite.tools.Frameworks.withPlanner(Frameworks.java:127)
        at
org.apache.flink.api.table.FlinkRelBuilder$.create(FlinkRelBuilder.scala:56)
        at
org.apache.flink.api.table.TableEnvironment.<init>(TableEnvironment.scala:73)
        at
org.apache.flink.api.table.StreamTableEnvironment.<init>(StreamTableEnvironment.scala:58)
        at
org.apache.flink.api.java.table.StreamTableEnvironment.<init>(StreamTableEnvironment.scala:45)
        at
org.apache.flink.api.table.TableEnvironment$.getTableEnvironment(TableEnvironment.scala:376)
        at
org.apache.flink.api.table.TableEnvironment.getTableEnvironment(TableEnvironment.scala)
        at 
org.myorg.quickstart.ReadingFromKafka2.main(ReadingFromKafka2.java:48)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:509)
        ... 6 more
Caused by: java.sql.SQLException: No suitable driver found for jdbc:calcite:
        at java.sql.DriverManager.getConnection(DriverManager.java:689)
        at java.sql.DriverManager.getConnection(DriverManager.java:208)
        at org.apache.calcite.tools.Frameworks.withPrepare(Frameworks.java:144)
        ... 20 more




--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Stream-sql-query-in-Flink-tp8928.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.

Reply via email to