[jira] [Commented] (BEAM-9046) Kafka connector for Python throws ClassCastException when reading KafkaRecord

2020-03-15 Thread Nicolae Rosia (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-9046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17059596#comment-17059596
 ] 

Nicolae Rosia commented on BEAM-9046:
-

Hello, I have the same issue with either of these versions:
 * 2.19.0
 * 2.20.0
 * master

+ Flink 1.9 + Kafka

Is there a known working example from which I could start?

> Kafka connector for Python throws ClassCastException when reading KafkaRecord
> -
>
> Key: BEAM-9046
> URL: https://issues.apache.org/jira/browse/BEAM-9046
> Project: Beam
>  Issue Type: Bug
>  Components: io-py-kafka
>Affects Versions: 2.16.0
>Reporter: Berkay Öztürk
>Priority: Major
>  Labels: KafkaIO, Python
> Fix For: Not applicable
>
>
>  I'm trying to read the data streaming from Apache Kafka using the Python SDK 
> for Apache Beam with the Flink runner. After running Kafka 2.4.0 and Flink 
> 1.8.3, I follow these steps:
>  * Compile and run Beam 2.16 with Flink 1.8 runner.
> {code:java}
> git clone --single-branch --branch release-2.16.0 
> https://github.com/apache/beam.git beam-2.16.0
> cd beam-2.16.0
> nohup ./gradlew :runners:flink:1.8:job-server:runShadow 
> -PflinkMasterUrl=localhost:8081 &
> {code}
>  * Run the Python pipeline.
> {code:python}
> from apache_beam import Pipeline
> from apache_beam.io.external.kafka import ReadFromKafka
> from apache_beam.options.pipeline_options import PipelineOptions
> if __name__ == '__main__':
> with Pipeline(options=PipelineOptions([
> '--runner=FlinkRunner',
> '--flink_version=1.8',
> '--flink_master_url=localhost:8081',
> '--environment_type=LOOPBACK',
> '--streaming'
> ])) as pipeline:
> (
> pipeline
> | 'read' >> ReadFromKafka({'bootstrap.servers': 
> 'localhost:9092'}, ['test'])  # [BEAM-3788] ???
> )
> result = pipeline.run()
> result.wait_until_finish()
> {code}
>  * Publish some data to Kafka.
> {code:java}
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
> >{"hello":"world!"}
> {code}
> The Python script throws this error:
> {code:java}
> [flink-runner-job-invoker] ERROR 
> org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation - Error 
> during job invocation BeamApp-USER-somejob. 
> org.apache.flink.client.program.ProgramInvocationException: Job failed. 
> (JobID: xxx)
> at 
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:268)
> at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:483)
> at 
> org.apache.beam.runners.flink.FlinkExecutionEnvironments$BeamFlinkRemoteStreamEnvironment.executeRemotely(FlinkExecutionEnvironments.java:360)
> at 
> org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.execute(RemoteStreamEnvironment.java:310)
> at 
> org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator$StreamingTranslationContext.execute(FlinkStreamingPortablePipelineTranslator.java:173)
> at 
> org.apache.beam.runners.flink.FlinkPipelineRunner.runPipelineWithTranslator(FlinkPipelineRunner.java:104)
> at 
> org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:80)
> at 
> org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation.runPipeline(JobInvocation.java:78)
> at 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125)
> at 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:57)
> at 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job 
> execution failed.
> at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
> at 
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265)
> ... 13 more
> Caused by: java.lang.ClassCastException: 
> org.apache.beam.sdk.io.kafka.KafkaRecord cannot be cast to [B
> at 
> org.apache.beam.sdk.coders.ByteArrayCoder.encode(ByteArrayCoder.java:41)
> at 
> org.apache.beam.sdk.coders.LengthPrefixCoder.encode(LengthPrefixCoder.java:56)
> at 
> 

[jira] [Commented] (BEAM-9046) Kafka connector for Python throws ClassCastException when reading KafkaRecord

2020-01-20 Thread Maximilian Michels (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-9046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17019752#comment-17019752
 ] 

Maximilian Michels commented on BEAM-9046:
--

Are you running this against a Beam 2.13.0 Expansion / Job server? At this 
point your setup should be identical to mine :)

> Kafka connector for Python throws ClassCastException when reading KafkaRecord
> -
>
> Key: BEAM-9046
> URL: https://issues.apache.org/jira/browse/BEAM-9046
> Project: Beam
>  Issue Type: Bug
>  Components: io-py-kafka
>Affects Versions: 2.16.0
>Reporter: Berkay Öztürk
>Priority: Major
>  Labels: KafkaIO, Python
> Fix For: Not applicable
>
>
>  I'm trying to read the data streaming from Apache Kafka using the Python SDK 
> for Apache Beam with the Flink runner. After running Kafka 2.4.0 and Flink 
> 1.8.3, I follow these steps:
>  * Compile and run Beam 2.16 with Flink 1.8 runner.
> {code:java}
> git clone --single-branch --branch release-2.16.0 
> https://github.com/apache/beam.git beam-2.16.0
> cd beam-2.16.0
> nohup ./gradlew :runners:flink:1.8:job-server:runShadow 
> -PflinkMasterUrl=localhost:8081 &
> {code}
>  * Run the Python pipeline.
> {code:python}
> from apache_beam import Pipeline
> from apache_beam.io.external.kafka import ReadFromKafka
> from apache_beam.options.pipeline_options import PipelineOptions
> if __name__ == '__main__':
> with Pipeline(options=PipelineOptions([
> '--runner=FlinkRunner',
> '--flink_version=1.8',
> '--flink_master_url=localhost:8081',
> '--environment_type=LOOPBACK',
> '--streaming'
> ])) as pipeline:
> (
> pipeline
> | 'read' >> ReadFromKafka({'bootstrap.servers': 
> 'localhost:9092'}, ['test'])  # [BEAM-3788] ???
> )
> result = pipeline.run()
> result.wait_until_finish()
> {code}
>  * Publish some data to Kafka.
> {code:java}
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
> >{"hello":"world!"}
> {code}
> The Python script throws this error:
> {code:java}
> [flink-runner-job-invoker] ERROR 
> org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation - Error 
> during job invocation BeamApp-USER-somejob. 
> org.apache.flink.client.program.ProgramInvocationException: Job failed. 
> (JobID: xxx)
> at 
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:268)
> at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:483)
> at 
> org.apache.beam.runners.flink.FlinkExecutionEnvironments$BeamFlinkRemoteStreamEnvironment.executeRemotely(FlinkExecutionEnvironments.java:360)
> at 
> org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.execute(RemoteStreamEnvironment.java:310)
> at 
> org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator$StreamingTranslationContext.execute(FlinkStreamingPortablePipelineTranslator.java:173)
> at 
> org.apache.beam.runners.flink.FlinkPipelineRunner.runPipelineWithTranslator(FlinkPipelineRunner.java:104)
> at 
> org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:80)
> at 
> org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation.runPipeline(JobInvocation.java:78)
> at 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125)
> at 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:57)
> at 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job 
> execution failed.
> at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
> at 
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265)
> ... 13 more
> Caused by: java.lang.ClassCastException: 
> org.apache.beam.sdk.io.kafka.KafkaRecord cannot be cast to [B
> at 
> org.apache.beam.sdk.coders.ByteArrayCoder.encode(ByteArrayCoder.java:41)
> at 
> org.apache.beam.sdk.coders.LengthPrefixCoder.encode(LengthPrefixCoder.java:56)
> at 
> 

[jira] [Commented] (BEAM-9046) Kafka connector for Python throws ClassCastException when reading KafkaRecord

2020-01-20 Thread Jira


[ 
https://issues.apache.org/jira/browse/BEAM-9046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17019670#comment-17019670
 ] 

Berkay Öztürk commented on BEAM-9046:
-

[~mxm] I think I'm getting there. I have built the Docker containers and 
downgraded to apache_beam 2.13.0:

 
{code:java}
Traceback (most recent call last):
  File "test.py", line 27, in 
p.run()
  File "/home/USER/beam213/lib/python3.7/site-packages/apache   

  _beam/pipeline.py", line 419, in run
return self.runner.run_pipeline(self, self._options)
  File "/home/USER/beam213/lib/python3.7/site-packages/apache   

  _beam/runners/portability/portable_runner.py", line 186, in 
run_pipeline
portable_options))
  File "/home/USER/beam213/lib/python3.7/site-packages/apache   

  _beam/pipeline.py", line 650, in to_runner_api
root_transform_id = context.transforms.get_id(self._root_transform())
  File "/home/USER/beam213/lib/python3.7/site-packages/apache   

  _beam/runners/pipeline_context.py", line 84, in get_id
self._id_to_proto[id] = obj.to_runner_api(self._pipeline_context)
  File "/home/USER/beam213/lib/python3.7/site-packages/apache   

  _beam/pipeline.py", line 871, in to_runner_api
for part in self.parts],
  File "/home/USER/beam213/lib/python3.7/site-packages/apache   

  _beam/pipeline.py", line 871, in 
for part in self.parts],
  File "/home/USER/beam213/lib/python3.7/site-packages/apache   

  _beam/runners/pipeline_context.py", line 84, in get_id
self._id_to_proto[id] = obj.to_runner_api(self._pipeline_context)
  File "/home/USER/beam213/lib/python3.7/site-packages/apache   

  _beam/pipeline.py", line 871, in to_runner_api
for part in self.parts],
  File "/home/USER/beam213/lib/python3.7/site-packages/apache   

  _beam/pipeline.py", line 871, in 
for part in self.parts],
  File "/home/USER/beam213/lib/python3.7/site-packages/apache   

  _beam/runners/pipeline_context.py", line 84, in get_id
self._id_to_proto[id] = obj.to_runner_api(self._pipeline_context)
  File "/home/USER/beam213/lib/python3.7/site-packages/apache   

  _beam/pipeline.py", line 856, in to_runner_api
return self.transform.to_runner_api_transform(context, self.full_label)
  File "/home/USER/beam213/lib/python3.7/site-packages/apache   

  _beam/transforms/external.py", line 180, in 
to_runner_api_transform
id, context.coders._id_to_proto[id], proto))
RuntimeError: Re-used coder id: ref_Coder_TupleCoder_1
spec {
  spec {
urn: "beam:coder:kv:v1"
  }
  environment_id: "ref_Environment_default_environment_1"
}
component_coder_ids: "ref_Coder_BytesCoder_2"
component_coder_ids: "ref_Coder_BytesCoder_2"spec {
  spec {
urn: "beam:coder:kv:v1"
  }
}
component_coder_ids: "ref_Coder_StrUtf8Coder_2"
component_coder_ids: "ref_Coder_StrUtf8Coder_2"{code}
Code (Taken from your Beam Summit NA 2019 presentation):

 
{code:java}
pipeline_options = PipelineOptions(['--job_name=Cross-Language-Demo', 
'--runner=PortableRunner', '--job_endpoint=localhost:8099', '--parallelism=1', 
'--kafka_bootstrap=localhost:9092'])

p = beam.Pipeline(options=pipeline_options)

def to_upper(kv):
k, v = kv
return str(k).upper(), str(v).upper()

(p
| ReadFromKafka(consumer_config={'bootstrap.servers': 'localhost:9092', 
'auto.offset.reset': 'latest'}, topics=['test'])
| beam.Map(lambda kv: to_upper(kv)).with_output_types(KV[str, str])
| WriteToKafka(producer_config={'bootstrap.servers': 'localhost:9092'}, 
topic='test-out')
)

p.run()
{code}
 

 

> Kafka connector for Python throws ClassCastException when reading KafkaRecord
> 

[jira] [Commented] (BEAM-9046) Kafka connector for Python throws ClassCastException when reading KafkaRecord

2020-01-20 Thread Maximilian Michels (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-9046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17019581#comment-17019581
 ] 

Maximilian Michels commented on BEAM-9046:
--

Are you also using the 2.13.0 version of the Python SDK? If not, you'll likely 
run into issues with non-matching versions. Please try with a virtual 
environment and something like {{pip install apache_beam==2.13.0}}.

That said, the patch should also work with the newest version, although I 
haven't verified that, so better to test this with the 2.13.0 branch.

> Kafka connector for Python throws ClassCastException when reading KafkaRecord
> -
>
> Key: BEAM-9046
> URL: https://issues.apache.org/jira/browse/BEAM-9046
> Project: Beam
>  Issue Type: Bug
>  Components: io-py-kafka
>Affects Versions: 2.16.0
>Reporter: Berkay Öztürk
>Priority: Major
>  Labels: KafkaIO, Python
> Fix For: Not applicable
>
>
>  I'm trying to read the data streaming from Apache Kafka using the Python SDK 
> for Apache Beam with the Flink runner. After running Kafka 2.4.0 and Flink 
> 1.8.3, I follow these steps:
>  * Compile and run Beam 2.16 with Flink 1.8 runner.
> {code:java}
> git clone --single-branch --branch release-2.16.0 
> https://github.com/apache/beam.git beam-2.16.0
> cd beam-2.16.0
> nohup ./gradlew :runners:flink:1.8:job-server:runShadow 
> -PflinkMasterUrl=localhost:8081 &
> {code}
>  * Run the Python pipeline.
> {code:python}
> from apache_beam import Pipeline
> from apache_beam.io.external.kafka import ReadFromKafka
> from apache_beam.options.pipeline_options import PipelineOptions
> if __name__ == '__main__':
> with Pipeline(options=PipelineOptions([
> '--runner=FlinkRunner',
> '--flink_version=1.8',
> '--flink_master_url=localhost:8081',
> '--environment_type=LOOPBACK',
> '--streaming'
> ])) as pipeline:
> (
> pipeline
> | 'read' >> ReadFromKafka({'bootstrap.servers': 
> 'localhost:9092'}, ['test'])  # [BEAM-3788] ???
> )
> result = pipeline.run()
> result.wait_until_finish()
> {code}
>  * Publish some data to Kafka.
> {code:java}
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
> >{"hello":"world!"}
> {code}
> The Python script throws this error:
> {code:java}
> [flink-runner-job-invoker] ERROR 
> org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation - Error 
> during job invocation BeamApp-USER-somejob. 
> org.apache.flink.client.program.ProgramInvocationException: Job failed. 
> (JobID: xxx)
> at 
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:268)
> at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:483)
> at 
> org.apache.beam.runners.flink.FlinkExecutionEnvironments$BeamFlinkRemoteStreamEnvironment.executeRemotely(FlinkExecutionEnvironments.java:360)
> at 
> org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.execute(RemoteStreamEnvironment.java:310)
> at 
> org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator$StreamingTranslationContext.execute(FlinkStreamingPortablePipelineTranslator.java:173)
> at 
> org.apache.beam.runners.flink.FlinkPipelineRunner.runPipelineWithTranslator(FlinkPipelineRunner.java:104)
> at 
> org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:80)
> at 
> org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation.runPipeline(JobInvocation.java:78)
> at 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125)
> at 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:57)
> at 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job 
> execution failed.
> at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
> at 
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265)
> ... 13 more
> Caused by: java.lang.ClassCastException: 
> org.apache.beam.sdk.io.kafka.KafkaRecord cannot be cast to [B
> at 
> 

[jira] [Commented] (BEAM-9046) Kafka connector for Python throws ClassCastException when reading KafkaRecord

2020-01-17 Thread Jira


[ 
https://issues.apache.org/jira/browse/BEAM-9046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17017933#comment-17017933
 ] 

Berkay Öztürk commented on BEAM-9046:
-

[~mxm]

This is what I have done:
 # 
{code:java}
git clone https://github.com/mxm/beam.git beam-mxm{code}

 # 
{code:java}
cd beam-mxm{code}

 # 
{code:java}
git reset --hard b31cf99c75{code}

 # 
{code:java}
./gradlew build -p runners/flink/1.8/job-server{code}

 # 
{code:java}
java -jar 
runners/flink/1.8/job-server/build/libs/beam-runners-flink-1.8-job-server-2.13.0-SNAPSHOT.jar
{code}

 # Run Python pipeline with PortableRunner.

 
{code:java}
Traceback (most recent call last):
  File "main.py", line 49, in 
run()
  File "main.py", line 32, in run
topics=['test']
  File "/home/USER/beam/lib/python3.5/site-packages/apache_be   

  am/transforms/ptransform.py", line 905, in __ror__
return self.transform.__ror__(pvalueish, self.label)
  File "/home/USER/beam/lib/python3.5/site-packages/apache_be   

  am/transforms/ptransform.py", line 514, in __ror__
result = p.apply(self, pvalueish, label)
  File "/home/USER/beam/lib/python3.5/site-packages/apache_be   

  am/pipeline.py", line 481, in apply
return self.apply(transform, pvalueish)
  File "/home/USER/beam/lib/python3.5/site-packages/apache_be   

  am/pipeline.py", line 517, in apply
pvalueish_result = self.runner.apply(transform, pvalueish, self._options)
  File "/home/USER/beam/lib/python3.5/site-packages/apache_be   

  am/runners/runner.py", line 175, in apply
return m(transform, input, options)
  File "/home/USER/beam/lib/python3.5/site-packages/apache_be   

  am/runners/runner.py", line 181, in apply_PTransform
return transform.expand(input)
  File "/home/USER/beam/lib/python3.5/site-packages/apache_be   

  am/io/external/kafka.py", line 119, in expand
self.expansion_service))
  File "/home/USER/beam/lib/python3.5/site-packages/apache_be   

  am/pvalue.py", line 110, in apply
return self.pipeline.apply(*arglist, **kwargs)
  File "/home/USER/beam/lib/python3.5/site-packages/apache_be   

  am/pipeline.py", line 517, in apply
pvalueish_result = self.runner.apply(transform, pvalueish, self._options)
  File "/home/USER/beam/lib/python3.5/site-packages/apache_be   

  am/runners/runner.py", line 175, in apply
return m(transform, input, options)
  File "/home/USER/beam/lib/python3.5/site-packages/apache_be   

  am/runners/runner.py", line 181, in apply_PTransform
return transform.expand(input)
  File "/home/USER/beam/lib/python3.5/site-packages/apache_be   

  am/transforms/external.py", line 142, in expand
for tag, pcoll_id in self._expanded_transform.outputs.items()
  File "/home/USER/beam/lib/python3.5/site-packages/apache_be   

  am/transforms/external.py", line 142, in 
for tag, pcoll_id in self._expanded_transform.outputs.items()
  File "/home/USER/beam/lib/python3.5/site-packages/apache_be   

  am/runners/pipeline_context.py", line 94, in get_by_id
self._id_to_proto[id], self._pipeline_context)
  File "/home/USER/beam/lib/python3.5/site-packages/apache_be   

  am/pvalue.py", line 178, in from_runner_api
element_type=context.element_type_from_coder_id(proto.coder_id),
  File 

[jira] [Commented] (BEAM-9046) Kafka connector for Python throws ClassCastException when reading KafkaRecord

2020-01-17 Thread Maximilian Michels (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-9046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17017855#comment-17017855
 ] 

Maximilian Michels commented on BEAM-9046:
--

A couple of comments here regarding the error:

- You should be using the PortableRunner here if you submit against a 
JobServer. Otherwise, the FlinkRunner will start its own JobServer.
- Make sure that you rebuild the job-server to include the KafkaIO changes. You 
can see that they are not included because of the ClassNotFoundException.

> Kafka connector for Python throws ClassCastException when reading KafkaRecord
> -
>
> Key: BEAM-9046
> URL: https://issues.apache.org/jira/browse/BEAM-9046
> Project: Beam
>  Issue Type: Bug
>  Components: io-py-kafka
>Affects Versions: 2.16.0
>Reporter: Berkay Öztürk
>Priority: Major
>  Labels: KafkaIO, Python
> Fix For: Not applicable
>
>
>  I'm trying to read the data streaming from Apache Kafka using the Python SDK 
> for Apache Beam with the Flink runner. After running Kafka 2.4.0 and Flink 
> 1.8.3, I follow these steps:
>  * Compile and run Beam 2.16 with Flink 1.8 runner.
> {code:java}
> git clone --single-branch --branch release-2.16.0 
> https://github.com/apache/beam.git beam-2.16.0
> cd beam-2.16.0
> nohup ./gradlew :runners:flink:1.8:job-server:runShadow 
> -PflinkMasterUrl=localhost:8081 &
> {code}
>  * Run the Python pipeline.
> {code:python}
> from apache_beam import Pipeline
> from apache_beam.io.external.kafka import ReadFromKafka
> from apache_beam.options.pipeline_options import PipelineOptions
> if __name__ == '__main__':
> with Pipeline(options=PipelineOptions([
> '--runner=FlinkRunner',
> '--flink_version=1.8',
> '--flink_master_url=localhost:8081',
> '--environment_type=LOOPBACK',
> '--streaming'
> ])) as pipeline:
> (
> pipeline
> | 'read' >> ReadFromKafka({'bootstrap.servers': 
> 'localhost:9092'}, ['test'])  # [BEAM-3788] ???
> )
> result = pipeline.run()
> result.wait_until_finish()
> {code}
>  * Publish some data to Kafka.
> {code:java}
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
> >{"hello":"world!"}
> {code}
> The Python script throws this error:
> {code:java}
> [flink-runner-job-invoker] ERROR 
> org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation - Error 
> during job invocation BeamApp-USER-somejob. 
> org.apache.flink.client.program.ProgramInvocationException: Job failed. 
> (JobID: xxx)
> at 
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:268)
> at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:483)
> at 
> org.apache.beam.runners.flink.FlinkExecutionEnvironments$BeamFlinkRemoteStreamEnvironment.executeRemotely(FlinkExecutionEnvironments.java:360)
> at 
> org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.execute(RemoteStreamEnvironment.java:310)
> at 
> org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator$StreamingTranslationContext.execute(FlinkStreamingPortablePipelineTranslator.java:173)
> at 
> org.apache.beam.runners.flink.FlinkPipelineRunner.runPipelineWithTranslator(FlinkPipelineRunner.java:104)
> at 
> org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:80)
> at 
> org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation.runPipeline(JobInvocation.java:78)
> at 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125)
> at 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:57)
> at 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job 
> execution failed.
> at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
> at 
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265)
> ... 13 more
> Caused by: java.lang.ClassCastException: 
> org.apache.beam.sdk.io.kafka.KafkaRecord cannot be cast to [B
> at 
> 

[jira] [Commented] (BEAM-9046) Kafka connector for Python throws ClassCastException when reading KafkaRecord

2020-01-16 Thread Maximilian Michels (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-9046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17016711#comment-17016711
 ] 

Maximilian Michels commented on BEAM-9046:
--

Thanks for opening this issue [~BerkayOzturk]. Unfortunately, the 
cross-language support is still in its infancies. Please see BEAM-7870 for a 
plan on how to fix it. The issue also contains a workaround for the meantime.

> Kafka connector for Python throws ClassCastException when reading KafkaRecord
> -
>
> Key: BEAM-9046
> URL: https://issues.apache.org/jira/browse/BEAM-9046
> Project: Beam
>  Issue Type: Bug
>  Components: io-py-kafka
>Affects Versions: 2.16.0
>Reporter: Berkay Öztürk
>Priority: Major
>  Labels: KafkaIO, Python
>
>  I'm trying to read the data streaming from Apache Kafka using the Python SDK 
> for Apache Beam with the Flink runner. After running Kafka 2.4.0 and Flink 
> 1.8.3, I follow these steps:
>  * Compile and run Beam 2.16 with Flink 1.8 runner.
> {code:java}
> git clone --single-branch --branch release-2.16.0 
> https://github.com/apache/beam.git beam-2.16.0
> cd beam-2.16.0
> nohup ./gradlew :runners:flink:1.8:job-server:runShadow 
> -PflinkMasterUrl=localhost:8081 &
> {code}
>  * Run the Python pipeline.
> {code:python}
> from apache_beam import Pipeline
> from apache_beam.io.external.kafka import ReadFromKafka
> from apache_beam.options.pipeline_options import PipelineOptions
> if __name__ == '__main__':
> with Pipeline(options=PipelineOptions([
> '--runner=FlinkRunner',
> '--flink_version=1.8',
> '--flink_master_url=localhost:8081',
> '--environment_type=LOOPBACK',
> '--streaming'
> ])) as pipeline:
> (
> pipeline
> | 'read' >> ReadFromKafka({'bootstrap.servers': 
> 'localhost:9092'}, ['test'])  # [BEAM-3788] ???
> )
> result = pipeline.run()
> result.wait_until_finish()
> {code}
>  * Publish some data to Kafka.
> {code:java}
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
> >{"hello":"world!"}
> {code}
> The Python script throws this error:
> {code:java}
> [flink-runner-job-invoker] ERROR 
> org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation - Error 
> during job invocation BeamApp-USER-somejob. 
> org.apache.flink.client.program.ProgramInvocationException: Job failed. 
> (JobID: xxx)
> at 
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:268)
> at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:483)
> at 
> org.apache.beam.runners.flink.FlinkExecutionEnvironments$BeamFlinkRemoteStreamEnvironment.executeRemotely(FlinkExecutionEnvironments.java:360)
> at 
> org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.execute(RemoteStreamEnvironment.java:310)
> at 
> org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator$StreamingTranslationContext.execute(FlinkStreamingPortablePipelineTranslator.java:173)
> at 
> org.apache.beam.runners.flink.FlinkPipelineRunner.runPipelineWithTranslator(FlinkPipelineRunner.java:104)
> at 
> org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:80)
> at 
> org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation.runPipeline(JobInvocation.java:78)
> at 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125)
> at 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:57)
> at 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job 
> execution failed.
> at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
> at 
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265)
> ... 13 more
> Caused by: java.lang.ClassCastException: 
> org.apache.beam.sdk.io.kafka.KafkaRecord cannot be cast to [B
> at 
> org.apache.beam.sdk.coders.ByteArrayCoder.encode(ByteArrayCoder.java:41)
> at 
> org.apache.beam.sdk.coders.LengthPrefixCoder.encode(LengthPrefixCoder.java:56)
> at 
> 

[jira] [Commented] (BEAM-9046) Kafka connector for Python throws ClassCastException when reading KafkaRecord

2020-01-07 Thread Ankur Goenka (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-9046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17010192#comment-17010192
 ] 

Ankur Goenka commented on BEAM-9046:


cc: [~mxm] [~chamikara]

> Kafka connector for Python throws ClassCastException when reading KafkaRecord
> -
>
> Key: BEAM-9046
> URL: https://issues.apache.org/jira/browse/BEAM-9046
> Project: Beam
>  Issue Type: Bug
>  Components: io-py-kafka
>Affects Versions: 2.16.0
>Reporter: Berkay Öztürk
>Priority: Major
>  Labels: KafkaIO, Python
>
>  I'm trying to read the data streaming from Apache Kafka using the Python SDK 
> for Apache Beam with the Flink runner. After running Kafka 2.4.0 and Flink 
> 1.8.3, I follow these steps:
>  * Compile and run Beam 2.16 with Flink 1.8 runner.
> {code:java}
> git clone --single-branch --branch release-2.16.0 
> https://github.com/apache/beam.git beam-2.16.0
> cd beam-2.16.0
> nohup ./gradlew :runners:flink:1.8:job-server:runShadow 
> -PflinkMasterUrl=localhost:8081 &
> {code}
>  * Run the Python pipeline.
> {code:python}
> from apache_beam import Pipeline
> from apache_beam.io.external.kafka import ReadFromKafka
> from apache_beam.options.pipeline_options import PipelineOptions
> if __name__ == '__main__':
> with Pipeline(options=PipelineOptions([
> '--runner=FlinkRunner',
> '--flink_version=1.8',
> '--flink_master_url=localhost:8081',
> '--environment_type=LOOPBACK',
> '--streaming'
> ])) as pipeline:
> (
> pipeline
> | 'read' >> ReadFromKafka({'bootstrap.servers': 
> 'localhost:9092'}, ['test'])  # [BEAM-3788] ???
> )
> result = pipeline.run()
> result.wait_until_finish()
> {code}
>  * Publish some data to Kafka.
> {code:java}
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
> >{"hello":"world!"}
> {code}
> The Python script throws this error:
> {code:java}
> [flink-runner-job-invoker] ERROR 
> org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation - Error 
> during job invocation BeamApp-USER-somejob. 
> org.apache.flink.client.program.ProgramInvocationException: Job failed. 
> (JobID: xxx)
> at 
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:268)
> at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:483)
> at 
> org.apache.beam.runners.flink.FlinkExecutionEnvironments$BeamFlinkRemoteStreamEnvironment.executeRemotely(FlinkExecutionEnvironments.java:360)
> at 
> org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.execute(RemoteStreamEnvironment.java:310)
> at 
> org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator$StreamingTranslationContext.execute(FlinkStreamingPortablePipelineTranslator.java:173)
> at 
> org.apache.beam.runners.flink.FlinkPipelineRunner.runPipelineWithTranslator(FlinkPipelineRunner.java:104)
> at 
> org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:80)
> at 
> org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation.runPipeline(JobInvocation.java:78)
> at 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125)
> at 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:57)
> at 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job 
> execution failed.
> at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
> at 
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265)
> ... 13 more
> Caused by: java.lang.ClassCastException: 
> org.apache.beam.sdk.io.kafka.KafkaRecord cannot be cast to [B
> at 
> org.apache.beam.sdk.coders.ByteArrayCoder.encode(ByteArrayCoder.java:41)
> at 
> org.apache.beam.sdk.coders.LengthPrefixCoder.encode(LengthPrefixCoder.java:56)
> at 
> org.apache.beam.sdk.values.ValueWithRecordId$ValueWithRecordIdCoder.encode(ValueWithRecordId.java:105)
> at 
> org.apache.beam.sdk.values.ValueWithRecordId$ValueWithRecordIdCoder.encode(ValueWithRecordId.java:81)