Thanks for your help, it works. Best, Yik San Chan
On Tue, Mar 16, 2021 at 10:03 AM Xingbo Huang <hxbks...@gmail.com> wrote: > Hi, > > The problem is that the legacy DataSet you are using does not support the > FileSystem connector you declared. You can use blink Planner to achieve > your needs. > > >>> > t_env = BatchTableEnvironment.create( > environment_settings=EnvironmentSettings.new_instance() > .in_batch_mode().use_blink_planner().build()) > t_env._j_tenv.getPlanner().getExecEnv().setParallelism(1) > > my_source_ddl = """ > create table mySource ( > word VARCHAR > ) with ( > 'connector' = 'filesystem', > 'format' = 'csv', > 'path' = '/tmp/input' > ) > """ > > my_sink_ddl = """ > create table mySink ( > word VARCHAR, > `count` BIGINT > ) with ( > 'connector' = 'filesystem', > 'format' = 'csv', > 'path' = '/tmp/output' > ) > """ > > t_env.execute_sql(my_source_ddl) > t_env.execute_sql(my_sink_ddl) > > tab = t_env.from_path('mySource') > tab.group_by(tab.word) \ > .select(tab.word, lit(1).count) \ > .execute_insert('mySink').wait() > >>> > > Best, > Xingbo > > Yik San Chan <evan.chanyik...@gmail.com> 于2021年3月15日周一 下午1:26写道: > >> (The question is cross-posted on StackOverflow >> https://stackoverflow.com/questions/66632765/got-pyflink-util-exceptions-tableexception-findandcreatetablesource-failed-w >> ) >> >> I am running below PyFlink program (copied from >> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table_api_tutorial.html >> ) >> >> ```python >> from pyflink.dataset import ExecutionEnvironment >> from pyflink.table import TableConfig, DataTypes, BatchTableEnvironment >> from pyflink.table.descriptors import Schema, OldCsv, FileSystem >> from pyflink.table.expressions import lit >> >> exec_env = ExecutionEnvironment.get_execution_environment() >> exec_env.set_parallelism(1) >> t_config = TableConfig() >> t_env = BatchTableEnvironment.create(exec_env, t_config) >> >> t_env.connect(FileSystem().path('/tmp/input')) \ >> .with_format(OldCsv() >> .field('word', DataTypes.STRING())) \ >> .with_schema(Schema() >> .field('word', DataTypes.STRING())) \ >> .create_temporary_table('mySource') >> >> t_env.connect(FileSystem().path('/tmp/output')) \ >> .with_format(OldCsv() >> .field_delimiter('\t') >> .field('word', DataTypes.STRING()) >> .field('count', DataTypes.BIGINT())) \ >> .with_schema(Schema() >> .field('word', DataTypes.STRING()) >> .field('count', DataTypes.BIGINT())) \ >> .create_temporary_table('mySink') >> >> tab = t_env.from_path('mySource') >> tab.group_by(tab.word) \ >> .select(tab.word, lit(1).count) \ >> .execute_insert('mySink').wait() >> ``` >> >> To verify it works, I did the following in order: >> >> 1. Run `echo -e "flink\npyflink\nflink" > /tmp/input` >> 1. Run `python WordCount.py` >> 1. Run `cat /tmp/out` and find expected output >> >> Then I changed my PyFlink program a bit to prefer SQL over Table API, but >> I find it doesn't work. >> >> ```python >> from pyflink.dataset import ExecutionEnvironment >> from pyflink.table import TableConfig, DataTypes, BatchTableEnvironment >> from pyflink.table.descriptors import Schema, OldCsv, FileSystem >> from pyflink.table.expressions import lit >> >> exec_env = ExecutionEnvironment.get_execution_environment() >> exec_env.set_parallelism(1) >> t_config = TableConfig() >> t_env = BatchTableEnvironment.create(exec_env, t_config) >> >> my_source_ddl = """ >> create table mySource ( >> word VARCHAR >> ) with ( >> 'connector' = 'filesystem', >> 'format' = 'csv', >> 'path' = '/tmp/input' >> ) >> """ >> >> my_sink_ddl = """ >> create table mySink ( >> word VARCHAR, >> `count` BIGINT >> ) with ( >> 'connector' = 'filesystem', >> 'format' = 'csv', >> 'path' = '/tmp/output' >> ) >> """ >> >> t_env.sql_update(my_source_ddl) >> t_env.sql_update(my_sink_ddl) >> >> tab = t_env.from_path('mySource') >> tab.group_by(tab.word) \ >> .select(tab.word, lit(1).count) \ >> .execute_insert('mySink').wait() >> ``` >> >> Here's the error: >> >> ``` >> Traceback (most recent call last): >> File "WordCount.py", line 38, in <module> >> .execute_insert('mySink').wait() >> File >> "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/pyflink/table/table.py", >> line 864, in execute_insert >> return TableResult(self._j_table.executeInsert(table_path, overwrite)) >> File >> "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/py4j/java_gateway.py", >> line 1286, in __call__ >> answer, self.gateway_client, self.target_id, self.name) >> File >> "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/pyflink/util/exceptions.py", >> line 162, in deco >> raise java_exception >> pyflink.util.exceptions.TableException: findAndCreateTableSink failed. >> at >> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSink(TableFactoryUtil.java:87) >> at >> org.apache.flink.table.api.internal.TableEnvImpl.getTableSink(TableEnvImpl.scala:1097) >> at org.apache.flink.table.api.internal.TableEnvImpl.org >> $apache$flink$table$api$internal$TableEnvImpl$$writeToSinkAndTranslate(TableEnvImpl.scala:929) >> at >> org.apache.flink.table.api.internal.TableEnvImpl$$anonfun$1.apply(TableEnvImpl.scala:556) >> at >> org.apache.flink.table.api.internal.TableEnvImpl$$anonfun$1.apply(TableEnvImpl.scala:554) >> at >> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) >> at >> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) >> at scala.collection.Iterator$class.foreach(Iterator.scala:891) >> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) >> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) >> at scala.collection.AbstractIterable.foreach(Iterable.scala:54) >> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) >> at scala.collection.AbstractTraversable.map(Traversable.scala:104) >> at >> org.apache.flink.table.api.internal.TableEnvImpl.executeInternal(TableEnvImpl.scala:554) >> at >> org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:572) >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> at >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> at java.lang.reflect.Method.invoke(Method.java:498) >> at >> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) >> at >> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) >> at >> org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) >> at >> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) >> at >> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) >> at >> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) >> at java.lang.Thread.run(Thread.java:748) >> ``` >> >> I wonder what's wrong with my new program? Thanks! >> >