[jira] [Commented] (BEAM-9046) Kafka connector for Python throws ClassCastException when reading KafkaRecord
[ https://issues.apache.org/jira/browse/BEAM-9046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17059596#comment-17059596 ] Nicolae Rosia commented on BEAM-9046: - Hello, I have the same issue with either of these versions: * 2.19.0 * 2.20.0 * master + Flink 1.9 + Kafka Is there a known working example from which I could start? > Kafka connector for Python throws ClassCastException when reading KafkaRecord > - > > Key: BEAM-9046 > URL: https://issues.apache.org/jira/browse/BEAM-9046 > Project: Beam > Issue Type: Bug > Components: io-py-kafka >Affects Versions: 2.16.0 >Reporter: Berkay Öztürk >Priority: Major > Labels: KafkaIO, Python > Fix For: Not applicable > > > I'm trying to read the data streaming from Apache Kafka using the Python SDK > for Apache Beam with the Flink runner. After running Kafka 2.4.0 and Flink > 1.8.3, I follow these steps: > * Compile and run Beam 2.16 with Flink 1.8 runner. > {code:java} > git clone --single-branch --branch release-2.16.0 > https://github.com/apache/beam.git beam-2.16.0 > cd beam-2.16.0 > nohup ./gradlew :runners:flink:1.8:job-server:runShadow > -PflinkMasterUrl=localhost:8081 & > {code} > * Run the Python pipeline. > {code:python} > from apache_beam import Pipeline > from apache_beam.io.external.kafka import ReadFromKafka > from apache_beam.options.pipeline_options import PipelineOptions > if __name__ == '__main__': > with Pipeline(options=PipelineOptions([ > '--runner=FlinkRunner', > '--flink_version=1.8', > '--flink_master_url=localhost:8081', > '--environment_type=LOOPBACK', > '--streaming' > ])) as pipeline: > ( > pipeline > | 'read' >> ReadFromKafka({'bootstrap.servers': > 'localhost:9092'}, ['test']) # [BEAM-3788] ??? > ) > result = pipeline.run() > result.wait_until_finish() > {code} > * Publish some data to Kafka. > {code:java} > bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test > >{"hello":"world!"} > {code} > The Python script throws this error: > {code:java} > [flink-runner-job-invoker] ERROR > org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation - Error > during job invocation BeamApp-USER-somejob. > org.apache.flink.client.program.ProgramInvocationException: Job failed. > (JobID: xxx) > at > org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:268) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:483) > at > org.apache.beam.runners.flink.FlinkExecutionEnvironments$BeamFlinkRemoteStreamEnvironment.executeRemotely(FlinkExecutionEnvironments.java:360) > at > org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.execute(RemoteStreamEnvironment.java:310) > at > org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator$StreamingTranslationContext.execute(FlinkStreamingPortablePipelineTranslator.java:173) > at > org.apache.beam.runners.flink.FlinkPipelineRunner.runPipelineWithTranslator(FlinkPipelineRunner.java:104) > at > org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:80) > at > org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation.runPipeline(JobInvocation.java:78) > at > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125) > at > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:57) > at > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.flink.runtime.client.JobExecutionException: Job > execution failed. > at > org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146) > at > org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265) > ... 13 more > Caused by: java.lang.ClassCastException: > org.apache.beam.sdk.io.kafka.KafkaRecord cannot be cast to [B > at > org.apache.beam.sdk.coders.ByteArrayCoder.encode(ByteArrayCoder.java:41) > at > org.apache.beam.sdk.coders.LengthPrefixCoder.encode(LengthPrefixCoder.java:56) > at >
[jira] [Commented] (BEAM-9046) Kafka connector for Python throws ClassCastException when reading KafkaRecord
[ https://issues.apache.org/jira/browse/BEAM-9046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17019752#comment-17019752 ] Maximilian Michels commented on BEAM-9046: -- Are you running this against a Beam 2.13.0 Expansion / Job server? At this point your setup should be identical to mine :) > Kafka connector for Python throws ClassCastException when reading KafkaRecord > - > > Key: BEAM-9046 > URL: https://issues.apache.org/jira/browse/BEAM-9046 > Project: Beam > Issue Type: Bug > Components: io-py-kafka >Affects Versions: 2.16.0 >Reporter: Berkay Öztürk >Priority: Major > Labels: KafkaIO, Python > Fix For: Not applicable > > > I'm trying to read the data streaming from Apache Kafka using the Python SDK > for Apache Beam with the Flink runner. After running Kafka 2.4.0 and Flink > 1.8.3, I follow these steps: > * Compile and run Beam 2.16 with Flink 1.8 runner. > {code:java} > git clone --single-branch --branch release-2.16.0 > https://github.com/apache/beam.git beam-2.16.0 > cd beam-2.16.0 > nohup ./gradlew :runners:flink:1.8:job-server:runShadow > -PflinkMasterUrl=localhost:8081 & > {code} > * Run the Python pipeline. > {code:python} > from apache_beam import Pipeline > from apache_beam.io.external.kafka import ReadFromKafka > from apache_beam.options.pipeline_options import PipelineOptions > if __name__ == '__main__': > with Pipeline(options=PipelineOptions([ > '--runner=FlinkRunner', > '--flink_version=1.8', > '--flink_master_url=localhost:8081', > '--environment_type=LOOPBACK', > '--streaming' > ])) as pipeline: > ( > pipeline > | 'read' >> ReadFromKafka({'bootstrap.servers': > 'localhost:9092'}, ['test']) # [BEAM-3788] ??? > ) > result = pipeline.run() > result.wait_until_finish() > {code} > * Publish some data to Kafka. > {code:java} > bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test > >{"hello":"world!"} > {code} > The Python script throws this error: > {code:java} > [flink-runner-job-invoker] ERROR > org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation - Error > during job invocation BeamApp-USER-somejob. > org.apache.flink.client.program.ProgramInvocationException: Job failed. > (JobID: xxx) > at > org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:268) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:483) > at > org.apache.beam.runners.flink.FlinkExecutionEnvironments$BeamFlinkRemoteStreamEnvironment.executeRemotely(FlinkExecutionEnvironments.java:360) > at > org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.execute(RemoteStreamEnvironment.java:310) > at > org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator$StreamingTranslationContext.execute(FlinkStreamingPortablePipelineTranslator.java:173) > at > org.apache.beam.runners.flink.FlinkPipelineRunner.runPipelineWithTranslator(FlinkPipelineRunner.java:104) > at > org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:80) > at > org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation.runPipeline(JobInvocation.java:78) > at > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125) > at > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:57) > at > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.flink.runtime.client.JobExecutionException: Job > execution failed. > at > org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146) > at > org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265) > ... 13 more > Caused by: java.lang.ClassCastException: > org.apache.beam.sdk.io.kafka.KafkaRecord cannot be cast to [B > at > org.apache.beam.sdk.coders.ByteArrayCoder.encode(ByteArrayCoder.java:41) > at > org.apache.beam.sdk.coders.LengthPrefixCoder.encode(LengthPrefixCoder.java:56) > at >
[jira] [Commented] (BEAM-9046) Kafka connector for Python throws ClassCastException when reading KafkaRecord
[ https://issues.apache.org/jira/browse/BEAM-9046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17019670#comment-17019670 ] Berkay Öztürk commented on BEAM-9046: - [~mxm] I think I'm getting there. I have built the Docker containers and downgraded to apache_beam 2.13.0: {code:java} Traceback (most recent call last): File "test.py", line 27, in p.run() File "/home/USER/beam213/lib/python3.7/site-packages/apache _beam/pipeline.py", line 419, in run return self.runner.run_pipeline(self, self._options) File "/home/USER/beam213/lib/python3.7/site-packages/apache _beam/runners/portability/portable_runner.py", line 186, in run_pipeline portable_options)) File "/home/USER/beam213/lib/python3.7/site-packages/apache _beam/pipeline.py", line 650, in to_runner_api root_transform_id = context.transforms.get_id(self._root_transform()) File "/home/USER/beam213/lib/python3.7/site-packages/apache _beam/runners/pipeline_context.py", line 84, in get_id self._id_to_proto[id] = obj.to_runner_api(self._pipeline_context) File "/home/USER/beam213/lib/python3.7/site-packages/apache _beam/pipeline.py", line 871, in to_runner_api for part in self.parts], File "/home/USER/beam213/lib/python3.7/site-packages/apache _beam/pipeline.py", line 871, in for part in self.parts], File "/home/USER/beam213/lib/python3.7/site-packages/apache _beam/runners/pipeline_context.py", line 84, in get_id self._id_to_proto[id] = obj.to_runner_api(self._pipeline_context) File "/home/USER/beam213/lib/python3.7/site-packages/apache _beam/pipeline.py", line 871, in to_runner_api for part in self.parts], File "/home/USER/beam213/lib/python3.7/site-packages/apache _beam/pipeline.py", line 871, in for part in self.parts], File "/home/USER/beam213/lib/python3.7/site-packages/apache _beam/runners/pipeline_context.py", line 84, in get_id self._id_to_proto[id] = obj.to_runner_api(self._pipeline_context) File "/home/USER/beam213/lib/python3.7/site-packages/apache _beam/pipeline.py", line 856, in to_runner_api return self.transform.to_runner_api_transform(context, self.full_label) File "/home/USER/beam213/lib/python3.7/site-packages/apache _beam/transforms/external.py", line 180, in to_runner_api_transform id, context.coders._id_to_proto[id], proto)) RuntimeError: Re-used coder id: ref_Coder_TupleCoder_1 spec { spec { urn: "beam:coder:kv:v1" } environment_id: "ref_Environment_default_environment_1" } component_coder_ids: "ref_Coder_BytesCoder_2" component_coder_ids: "ref_Coder_BytesCoder_2"spec { spec { urn: "beam:coder:kv:v1" } } component_coder_ids: "ref_Coder_StrUtf8Coder_2" component_coder_ids: "ref_Coder_StrUtf8Coder_2"{code} Code (Taken from your Beam Summit NA 2019 presentation): {code:java} pipeline_options = PipelineOptions(['--job_name=Cross-Language-Demo', '--runner=PortableRunner', '--job_endpoint=localhost:8099', '--parallelism=1', '--kafka_bootstrap=localhost:9092']) p = beam.Pipeline(options=pipeline_options) def to_upper(kv): k, v = kv return str(k).upper(), str(v).upper() (p | ReadFromKafka(consumer_config={'bootstrap.servers': 'localhost:9092', 'auto.offset.reset': 'latest'}, topics=['test']) | beam.Map(lambda kv: to_upper(kv)).with_output_types(KV[str, str]) | WriteToKafka(producer_config={'bootstrap.servers': 'localhost:9092'}, topic='test-out') ) p.run() {code} > Kafka connector for Python throws ClassCastException when reading KafkaRecord >
[jira] [Commented] (BEAM-9046) Kafka connector for Python throws ClassCastException when reading KafkaRecord
[ https://issues.apache.org/jira/browse/BEAM-9046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17019581#comment-17019581 ] Maximilian Michels commented on BEAM-9046: -- Are you also using the 2.13.0 version of the Python SDK? If not, you'll likely run into issues with non-matching versions. Please try with a virtual environment and something like {{pip install apache_beam==2.13.0}}. That said, the patch should also work with the newest version, although I haven't verified that, so better to test this with the 2.13.0 branch. > Kafka connector for Python throws ClassCastException when reading KafkaRecord > - > > Key: BEAM-9046 > URL: https://issues.apache.org/jira/browse/BEAM-9046 > Project: Beam > Issue Type: Bug > Components: io-py-kafka >Affects Versions: 2.16.0 >Reporter: Berkay Öztürk >Priority: Major > Labels: KafkaIO, Python > Fix For: Not applicable > > > I'm trying to read the data streaming from Apache Kafka using the Python SDK > for Apache Beam with the Flink runner. After running Kafka 2.4.0 and Flink > 1.8.3, I follow these steps: > * Compile and run Beam 2.16 with Flink 1.8 runner. > {code:java} > git clone --single-branch --branch release-2.16.0 > https://github.com/apache/beam.git beam-2.16.0 > cd beam-2.16.0 > nohup ./gradlew :runners:flink:1.8:job-server:runShadow > -PflinkMasterUrl=localhost:8081 & > {code} > * Run the Python pipeline. > {code:python} > from apache_beam import Pipeline > from apache_beam.io.external.kafka import ReadFromKafka > from apache_beam.options.pipeline_options import PipelineOptions > if __name__ == '__main__': > with Pipeline(options=PipelineOptions([ > '--runner=FlinkRunner', > '--flink_version=1.8', > '--flink_master_url=localhost:8081', > '--environment_type=LOOPBACK', > '--streaming' > ])) as pipeline: > ( > pipeline > | 'read' >> ReadFromKafka({'bootstrap.servers': > 'localhost:9092'}, ['test']) # [BEAM-3788] ??? > ) > result = pipeline.run() > result.wait_until_finish() > {code} > * Publish some data to Kafka. > {code:java} > bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test > >{"hello":"world!"} > {code} > The Python script throws this error: > {code:java} > [flink-runner-job-invoker] ERROR > org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation - Error > during job invocation BeamApp-USER-somejob. > org.apache.flink.client.program.ProgramInvocationException: Job failed. > (JobID: xxx) > at > org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:268) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:483) > at > org.apache.beam.runners.flink.FlinkExecutionEnvironments$BeamFlinkRemoteStreamEnvironment.executeRemotely(FlinkExecutionEnvironments.java:360) > at > org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.execute(RemoteStreamEnvironment.java:310) > at > org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator$StreamingTranslationContext.execute(FlinkStreamingPortablePipelineTranslator.java:173) > at > org.apache.beam.runners.flink.FlinkPipelineRunner.runPipelineWithTranslator(FlinkPipelineRunner.java:104) > at > org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:80) > at > org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation.runPipeline(JobInvocation.java:78) > at > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125) > at > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:57) > at > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.flink.runtime.client.JobExecutionException: Job > execution failed. > at > org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146) > at > org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265) > ... 13 more > Caused by: java.lang.ClassCastException: > org.apache.beam.sdk.io.kafka.KafkaRecord cannot be cast to [B > at >
[jira] [Commented] (BEAM-9046) Kafka connector for Python throws ClassCastException when reading KafkaRecord
[ https://issues.apache.org/jira/browse/BEAM-9046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17017933#comment-17017933 ] Berkay Öztürk commented on BEAM-9046: - [~mxm] This is what I have done: # {code:java} git clone https://github.com/mxm/beam.git beam-mxm{code} # {code:java} cd beam-mxm{code} # {code:java} git reset --hard b31cf99c75{code} # {code:java} ./gradlew build -p runners/flink/1.8/job-server{code} # {code:java} java -jar runners/flink/1.8/job-server/build/libs/beam-runners-flink-1.8-job-server-2.13.0-SNAPSHOT.jar {code} # Run Python pipeline with PortableRunner. {code:java} Traceback (most recent call last): File "main.py", line 49, in run() File "main.py", line 32, in run topics=['test'] File "/home/USER/beam/lib/python3.5/site-packages/apache_be am/transforms/ptransform.py", line 905, in __ror__ return self.transform.__ror__(pvalueish, self.label) File "/home/USER/beam/lib/python3.5/site-packages/apache_be am/transforms/ptransform.py", line 514, in __ror__ result = p.apply(self, pvalueish, label) File "/home/USER/beam/lib/python3.5/site-packages/apache_be am/pipeline.py", line 481, in apply return self.apply(transform, pvalueish) File "/home/USER/beam/lib/python3.5/site-packages/apache_be am/pipeline.py", line 517, in apply pvalueish_result = self.runner.apply(transform, pvalueish, self._options) File "/home/USER/beam/lib/python3.5/site-packages/apache_be am/runners/runner.py", line 175, in apply return m(transform, input, options) File "/home/USER/beam/lib/python3.5/site-packages/apache_be am/runners/runner.py", line 181, in apply_PTransform return transform.expand(input) File "/home/USER/beam/lib/python3.5/site-packages/apache_be am/io/external/kafka.py", line 119, in expand self.expansion_service)) File "/home/USER/beam/lib/python3.5/site-packages/apache_be am/pvalue.py", line 110, in apply return self.pipeline.apply(*arglist, **kwargs) File "/home/USER/beam/lib/python3.5/site-packages/apache_be am/pipeline.py", line 517, in apply pvalueish_result = self.runner.apply(transform, pvalueish, self._options) File "/home/USER/beam/lib/python3.5/site-packages/apache_be am/runners/runner.py", line 175, in apply return m(transform, input, options) File "/home/USER/beam/lib/python3.5/site-packages/apache_be am/runners/runner.py", line 181, in apply_PTransform return transform.expand(input) File "/home/USER/beam/lib/python3.5/site-packages/apache_be am/transforms/external.py", line 142, in expand for tag, pcoll_id in self._expanded_transform.outputs.items() File "/home/USER/beam/lib/python3.5/site-packages/apache_be am/transforms/external.py", line 142, in for tag, pcoll_id in self._expanded_transform.outputs.items() File "/home/USER/beam/lib/python3.5/site-packages/apache_be am/runners/pipeline_context.py", line 94, in get_by_id self._id_to_proto[id], self._pipeline_context) File "/home/USER/beam/lib/python3.5/site-packages/apache_be am/pvalue.py", line 178, in from_runner_api element_type=context.element_type_from_coder_id(proto.coder_id), File
[jira] [Commented] (BEAM-9046) Kafka connector for Python throws ClassCastException when reading KafkaRecord
[ https://issues.apache.org/jira/browse/BEAM-9046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17017855#comment-17017855 ] Maximilian Michels commented on BEAM-9046: -- A couple of comments here regarding the error: - You should be using the PortableRunner here if you submit against a JobServer. Otherwise, the FlinkRunner will start its own JobServer. - Make sure that you rebuild the job-server to include the KafkaIO changes. You can see that they are not included because of the ClassNotFoundException. > Kafka connector for Python throws ClassCastException when reading KafkaRecord > - > > Key: BEAM-9046 > URL: https://issues.apache.org/jira/browse/BEAM-9046 > Project: Beam > Issue Type: Bug > Components: io-py-kafka >Affects Versions: 2.16.0 >Reporter: Berkay Öztürk >Priority: Major > Labels: KafkaIO, Python > Fix For: Not applicable > > > I'm trying to read the data streaming from Apache Kafka using the Python SDK > for Apache Beam with the Flink runner. After running Kafka 2.4.0 and Flink > 1.8.3, I follow these steps: > * Compile and run Beam 2.16 with Flink 1.8 runner. > {code:java} > git clone --single-branch --branch release-2.16.0 > https://github.com/apache/beam.git beam-2.16.0 > cd beam-2.16.0 > nohup ./gradlew :runners:flink:1.8:job-server:runShadow > -PflinkMasterUrl=localhost:8081 & > {code} > * Run the Python pipeline. > {code:python} > from apache_beam import Pipeline > from apache_beam.io.external.kafka import ReadFromKafka > from apache_beam.options.pipeline_options import PipelineOptions > if __name__ == '__main__': > with Pipeline(options=PipelineOptions([ > '--runner=FlinkRunner', > '--flink_version=1.8', > '--flink_master_url=localhost:8081', > '--environment_type=LOOPBACK', > '--streaming' > ])) as pipeline: > ( > pipeline > | 'read' >> ReadFromKafka({'bootstrap.servers': > 'localhost:9092'}, ['test']) # [BEAM-3788] ??? > ) > result = pipeline.run() > result.wait_until_finish() > {code} > * Publish some data to Kafka. > {code:java} > bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test > >{"hello":"world!"} > {code} > The Python script throws this error: > {code:java} > [flink-runner-job-invoker] ERROR > org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation - Error > during job invocation BeamApp-USER-somejob. > org.apache.flink.client.program.ProgramInvocationException: Job failed. > (JobID: xxx) > at > org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:268) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:483) > at > org.apache.beam.runners.flink.FlinkExecutionEnvironments$BeamFlinkRemoteStreamEnvironment.executeRemotely(FlinkExecutionEnvironments.java:360) > at > org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.execute(RemoteStreamEnvironment.java:310) > at > org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator$StreamingTranslationContext.execute(FlinkStreamingPortablePipelineTranslator.java:173) > at > org.apache.beam.runners.flink.FlinkPipelineRunner.runPipelineWithTranslator(FlinkPipelineRunner.java:104) > at > org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:80) > at > org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation.runPipeline(JobInvocation.java:78) > at > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125) > at > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:57) > at > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.flink.runtime.client.JobExecutionException: Job > execution failed. > at > org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146) > at > org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265) > ... 13 more > Caused by: java.lang.ClassCastException: > org.apache.beam.sdk.io.kafka.KafkaRecord cannot be cast to [B > at >
[jira] [Commented] (BEAM-9046) Kafka connector for Python throws ClassCastException when reading KafkaRecord
[ https://issues.apache.org/jira/browse/BEAM-9046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17016711#comment-17016711 ] Maximilian Michels commented on BEAM-9046: -- Thanks for opening this issue [~BerkayOzturk]. Unfortunately, the cross-language support is still in its infancies. Please see BEAM-7870 for a plan on how to fix it. The issue also contains a workaround for the meantime. > Kafka connector for Python throws ClassCastException when reading KafkaRecord > - > > Key: BEAM-9046 > URL: https://issues.apache.org/jira/browse/BEAM-9046 > Project: Beam > Issue Type: Bug > Components: io-py-kafka >Affects Versions: 2.16.0 >Reporter: Berkay Öztürk >Priority: Major > Labels: KafkaIO, Python > > I'm trying to read the data streaming from Apache Kafka using the Python SDK > for Apache Beam with the Flink runner. After running Kafka 2.4.0 and Flink > 1.8.3, I follow these steps: > * Compile and run Beam 2.16 with Flink 1.8 runner. > {code:java} > git clone --single-branch --branch release-2.16.0 > https://github.com/apache/beam.git beam-2.16.0 > cd beam-2.16.0 > nohup ./gradlew :runners:flink:1.8:job-server:runShadow > -PflinkMasterUrl=localhost:8081 & > {code} > * Run the Python pipeline. > {code:python} > from apache_beam import Pipeline > from apache_beam.io.external.kafka import ReadFromKafka > from apache_beam.options.pipeline_options import PipelineOptions > if __name__ == '__main__': > with Pipeline(options=PipelineOptions([ > '--runner=FlinkRunner', > '--flink_version=1.8', > '--flink_master_url=localhost:8081', > '--environment_type=LOOPBACK', > '--streaming' > ])) as pipeline: > ( > pipeline > | 'read' >> ReadFromKafka({'bootstrap.servers': > 'localhost:9092'}, ['test']) # [BEAM-3788] ??? > ) > result = pipeline.run() > result.wait_until_finish() > {code} > * Publish some data to Kafka. > {code:java} > bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test > >{"hello":"world!"} > {code} > The Python script throws this error: > {code:java} > [flink-runner-job-invoker] ERROR > org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation - Error > during job invocation BeamApp-USER-somejob. > org.apache.flink.client.program.ProgramInvocationException: Job failed. > (JobID: xxx) > at > org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:268) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:483) > at > org.apache.beam.runners.flink.FlinkExecutionEnvironments$BeamFlinkRemoteStreamEnvironment.executeRemotely(FlinkExecutionEnvironments.java:360) > at > org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.execute(RemoteStreamEnvironment.java:310) > at > org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator$StreamingTranslationContext.execute(FlinkStreamingPortablePipelineTranslator.java:173) > at > org.apache.beam.runners.flink.FlinkPipelineRunner.runPipelineWithTranslator(FlinkPipelineRunner.java:104) > at > org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:80) > at > org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation.runPipeline(JobInvocation.java:78) > at > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125) > at > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:57) > at > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.flink.runtime.client.JobExecutionException: Job > execution failed. > at > org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146) > at > org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265) > ... 13 more > Caused by: java.lang.ClassCastException: > org.apache.beam.sdk.io.kafka.KafkaRecord cannot be cast to [B > at > org.apache.beam.sdk.coders.ByteArrayCoder.encode(ByteArrayCoder.java:41) > at > org.apache.beam.sdk.coders.LengthPrefixCoder.encode(LengthPrefixCoder.java:56) > at >
[jira] [Commented] (BEAM-9046) Kafka connector for Python throws ClassCastException when reading KafkaRecord
[ https://issues.apache.org/jira/browse/BEAM-9046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17010192#comment-17010192 ] Ankur Goenka commented on BEAM-9046: cc: [~mxm] [~chamikara] > Kafka connector for Python throws ClassCastException when reading KafkaRecord > - > > Key: BEAM-9046 > URL: https://issues.apache.org/jira/browse/BEAM-9046 > Project: Beam > Issue Type: Bug > Components: io-py-kafka >Affects Versions: 2.16.0 >Reporter: Berkay Öztürk >Priority: Major > Labels: KafkaIO, Python > > I'm trying to read the data streaming from Apache Kafka using the Python SDK > for Apache Beam with the Flink runner. After running Kafka 2.4.0 and Flink > 1.8.3, I follow these steps: > * Compile and run Beam 2.16 with Flink 1.8 runner. > {code:java} > git clone --single-branch --branch release-2.16.0 > https://github.com/apache/beam.git beam-2.16.0 > cd beam-2.16.0 > nohup ./gradlew :runners:flink:1.8:job-server:runShadow > -PflinkMasterUrl=localhost:8081 & > {code} > * Run the Python pipeline. > {code:python} > from apache_beam import Pipeline > from apache_beam.io.external.kafka import ReadFromKafka > from apache_beam.options.pipeline_options import PipelineOptions > if __name__ == '__main__': > with Pipeline(options=PipelineOptions([ > '--runner=FlinkRunner', > '--flink_version=1.8', > '--flink_master_url=localhost:8081', > '--environment_type=LOOPBACK', > '--streaming' > ])) as pipeline: > ( > pipeline > | 'read' >> ReadFromKafka({'bootstrap.servers': > 'localhost:9092'}, ['test']) # [BEAM-3788] ??? > ) > result = pipeline.run() > result.wait_until_finish() > {code} > * Publish some data to Kafka. > {code:java} > bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test > >{"hello":"world!"} > {code} > The Python script throws this error: > {code:java} > [flink-runner-job-invoker] ERROR > org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation - Error > during job invocation BeamApp-USER-somejob. > org.apache.flink.client.program.ProgramInvocationException: Job failed. > (JobID: xxx) > at > org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:268) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:483) > at > org.apache.beam.runners.flink.FlinkExecutionEnvironments$BeamFlinkRemoteStreamEnvironment.executeRemotely(FlinkExecutionEnvironments.java:360) > at > org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.execute(RemoteStreamEnvironment.java:310) > at > org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator$StreamingTranslationContext.execute(FlinkStreamingPortablePipelineTranslator.java:173) > at > org.apache.beam.runners.flink.FlinkPipelineRunner.runPipelineWithTranslator(FlinkPipelineRunner.java:104) > at > org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:80) > at > org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation.runPipeline(JobInvocation.java:78) > at > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125) > at > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:57) > at > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.flink.runtime.client.JobExecutionException: Job > execution failed. > at > org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146) > at > org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265) > ... 13 more > Caused by: java.lang.ClassCastException: > org.apache.beam.sdk.io.kafka.KafkaRecord cannot be cast to [B > at > org.apache.beam.sdk.coders.ByteArrayCoder.encode(ByteArrayCoder.java:41) > at > org.apache.beam.sdk.coders.LengthPrefixCoder.encode(LengthPrefixCoder.java:56) > at > org.apache.beam.sdk.values.ValueWithRecordId$ValueWithRecordIdCoder.encode(ValueWithRecordId.java:105) > at > org.apache.beam.sdk.values.ValueWithRecordId$ValueWithRecordIdCoder.encode(ValueWithRecordId.java:81)