Hi to all,
I'm trying to read and print out the content of my parquet directory with
Flink 1.11 (using the bridge API). However Flink complains that there is no
topology to execute..what am I doing wrong? The exception is:

java.lang.IllegalStateException: No operators defined in streaming
topology. Cannot generate StreamGraph.
at
org.apache.flink.table.planner.utils.ExecutorUtils.generateStreamGraph(ExecutorUtils.java:47)
at
org.apache.flink.table.planner.delegation.StreamExecutor.createPipeline(StreamExecutor.java:47)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1197)
at it.okkam.datalinks.batch.flink.ProfileTest.main(ProfileTest.java:52)

This is the code: ------------------------

StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
        EnvironmentSettings.newInstance().inStreamingMode().build());
tableEnv.executeSql(-----see below  [1] ----);
Table inputTable = tableEnv.sqlQuery("SELECT * FROM source");
tableEnv.toAppendStream(inputTable,
   new RowTypeInfo(inputTable.getSchema().getFieldTypes());).print()
final JobExecutionResult jobRes = tableEnv.execute("test-job");

[1] ----------
CREATE TABLE `source` (
`col1` BIGINT,
`col2` STRING
) WITH (
'connector' = 'filesystem',
'format' = 'parquet',
'update-mode' = 'append',
'path' = '/tmp/parquet-test',
'sink.partition-commit.delay'='1 h',
'sink.partition-commit.policy.kind'='success-file',
'format.parquet.compression'='snappy',
'format.parquet.enable.dictionary'='true',
'format.parquet.block.size'='0',
'sink.shuffle-by-partition.enable' = 'true'
)
-----------

Thanks in advance,
Flavio

Reply via email to