Hello All, I have a Pyflink script that needs to read from Pulsar and process the data.
I have done the following to implement a prototype. 1. Since I need Pyflink way to connect to Pulsar , I checked out the code from master branch as advised in a different thread. (PyFlink Pulsar connector seems to be slated for 1.15 release) 2. I built the Flink source. 3. I am using the following location as FLINK_HOME under the source: flink-dist/target/flink-1.15-SNAPSHOT-bin/flink-1.15-SNAPSHOT 4. The python pyflink wheels have been appropriately installed in the right python conda environment. 5. I copied the flink-sql-connector-pulsar-1.15-SNAPSHOT.jar into the $FLINK_HOME/lib folder. 6. I started the standalone cluster by running bin/start-cluster.sh 7. I submit my test script by using bin/flink run –python … 8. If am launching the the word_count example in flink documentation, everything runs fine and it completes successfully. 9. However, if the script involves the Pulsar connector, the logs show that the Flink client codebase is not able to submit the job to the Jobamanger. 10. It ultimately dies with a Channel Idle exception. (See this in DEBUG mode of the logs). I am attaching the logs for reference. I am trying this on OSx. Please note that the classic word_count script works fine without any issues and I see the job submission failures on the client only when the pulsar source connector is in the script. I have also added the logs for the standalone session job manager.I am also attaching the script for reference. Could you please advise what can I do to resolve the issue. (Will raise an JIRA-Issue if someone thinks it is a bug). Regards, Ananth
flink-ananth-client-Ananths-MacBook-Pro.local.log.gz
Description: flink-ananth-client-Ananths-MacBook-Pro.local.log.gz
flink-ananth-standalonesession-0-Ananths-MacBook-Pro.local.log.gz
Description: flink-ananth-standalonesession-0-Ananths-MacBook-Pro.local.log.gz
test_pulsar.py.gz
Description: test_pulsar.py.gz