[jira] [Comment Edited] (BEAM-9046) Kafka connector for Python throws ClassCastException when reading KafkaRecord

2020-01-20 Thread Jira


[ 
https://issues.apache.org/jira/browse/BEAM-9046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17019670#comment-17019670
 ] 

Berkay Öztürk edited comment on BEAM-9046 at 1/20/20 6:18 PM:
--

[~mxm] I think I'm getting there. I have built the Docker containers and 
downgraded to apache_beam 2.13.0:

 
{code:java}
Traceback (most recent call last):
  File "test.py", line 27, in 
p.run()
  File "/home/USER/beam213/lib/python3.7/site-packages/apache   

  _beam/pipeline.py", line 419, in run
return self.runner.run_pipeline(self, self._options)
  File "/home/USER/beam213/lib/python3.7/site-packages/apache   

  _beam/runners/portability/portable_runner.py", line 186, in 
run_pipeline
portable_options))
  File "/home/USER/beam213/lib/python3.7/site-packages/apache   

  _beam/pipeline.py", line 650, in to_runner_api
root_transform_id = context.transforms.get_id(self._root_transform())
  File "/home/USER/beam213/lib/python3.7/site-packages/apache   

  _beam/runners/pipeline_context.py", line 84, in get_id
self._id_to_proto[id] = obj.to_runner_api(self._pipeline_context)
  File "/home/USER/beam213/lib/python3.7/site-packages/apache   

  _beam/pipeline.py", line 871, in to_runner_api
for part in self.parts],
  File "/home/USER/beam213/lib/python3.7/site-packages/apache   

  _beam/pipeline.py", line 871, in 
for part in self.parts],
  File "/home/USER/beam213/lib/python3.7/site-packages/apache   

  _beam/runners/pipeline_context.py", line 84, in get_id
self._id_to_proto[id] = obj.to_runner_api(self._pipeline_context)
  File "/home/USER/beam213/lib/python3.7/site-packages/apache   

  _beam/pipeline.py", line 871, in to_runner_api
for part in self.parts],
  File "/home/USER/beam213/lib/python3.7/site-packages/apache   

  _beam/pipeline.py", line 871, in 
for part in self.parts],
  File "/home/USER/beam213/lib/python3.7/site-packages/apache   

  _beam/runners/pipeline_context.py", line 84, in get_id
self._id_to_proto[id] = obj.to_runner_api(self._pipeline_context)
  File "/home/USER/beam213/lib/python3.7/site-packages/apache   

  _beam/pipeline.py", line 856, in to_runner_api
return self.transform.to_runner_api_transform(context, self.full_label)
  File "/home/USER/beam213/lib/python3.7/site-packages/apache   

  _beam/transforms/external.py", line 180, in 
to_runner_api_transform
id, context.coders._id_to_proto[id], proto))
RuntimeError: Re-used coder id: ref_Coder_TupleCoder_1
spec {
  spec {
urn: "beam:coder:kv:v1"
  }
  environment_id: "ref_Environment_default_environment_1"
}
component_coder_ids: "ref_Coder_BytesCoder_2"
component_coder_ids: "ref_Coder_BytesCoder_2"spec {
  spec {
urn: "beam:coder:kv:v1"
  }
}
component_coder_ids: "ref_Coder_StrUtf8Coder_2"
component_coder_ids: "ref_Coder_StrUtf8Coder_2"{code}
Code (Taken from your Beam Summit NA 2019 presentation):

 
{code:java}
pipeline_options = PipelineOptions(['--job_name=Cross-Language-Demo', 
'--runner=PortableRunner', '--job_endpoint=localhost:8099', '--parallelism=1'])

p = beam.Pipeline(options=pipeline_options)

def to_upper(kv):
k, v = kv
return str(k).upper(), str(v).upper()

(p
| ReadFromKafka(consumer_config={'bootstrap.servers': 'localhost:9092', 
'auto.offset.reset': 'latest'}, topics=['test'])
| beam.Map(lambda kv: to_upper(kv)).with_output_types(KV[str, str])
| WriteToKafka(producer_config={'bootstrap.servers': 'localhost:9092'}, 
topic='test-out')
)

p.run()
{code}
 

 


was (Author: berkayozturk):
[~mxm] I think I'm getting there. I have built 

[jira] [Comment Edited] (BEAM-9046) Kafka connector for Python throws ClassCastException when reading KafkaRecord

2020-01-18 Thread Jira


[ 
https://issues.apache.org/jira/browse/BEAM-9046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17017933#comment-17017933
 ] 

Berkay Öztürk edited comment on BEAM-9046 at 1/18/20 7:28 PM:
--

[~mxm]

This is what I have done:
 * 
{code:java}
git clone https://github.com/mxm/beam.git beam-mxm{code}

 * 
{code:java}
cd beam-mxm{code}

 * 
{code:java}
git reset --hard b31cf99c75{code}

 * 
{code:java}
./gradlew build -p runners/flink/1.8/job-server{code}

 * 
{code:java}
java -jar 
runners/flink/1.8/job-server/build/libs/beam-runners-flink-1.8-job-server-2.13.0-SNAPSHOT.jar
{code}

 * Run Python pipeline with PortableRunner.

Raises the error below:
{code:java}
Traceback (most recent call last):
  File "main.py", line 49, in 
run()
  File "main.py", line 32, in run
topics=['test']
  File "/home/USER/beam/lib/python3.5/site-packages/apache_be   

  am/transforms/ptransform.py", line 905, in __ror__
return self.transform.__ror__(pvalueish, self.label)
  File "/home/USER/beam/lib/python3.5/site-packages/apache_be   

  am/transforms/ptransform.py", line 514, in __ror__
result = p.apply(self, pvalueish, label)
  File "/home/USER/beam/lib/python3.5/site-packages/apache_be   

  am/pipeline.py", line 481, in apply
return self.apply(transform, pvalueish)
  File "/home/USER/beam/lib/python3.5/site-packages/apache_be   

  am/pipeline.py", line 517, in apply
pvalueish_result = self.runner.apply(transform, pvalueish, self._options)
  File "/home/USER/beam/lib/python3.5/site-packages/apache_be   

  am/runners/runner.py", line 175, in apply
return m(transform, input, options)
  File "/home/USER/beam/lib/python3.5/site-packages/apache_be   

  am/runners/runner.py", line 181, in apply_PTransform
return transform.expand(input)
  File "/home/USER/beam/lib/python3.5/site-packages/apache_be   

  am/io/external/kafka.py", line 119, in expand
self.expansion_service))
  File "/home/USER/beam/lib/python3.5/site-packages/apache_be   

  am/pvalue.py", line 110, in apply
return self.pipeline.apply(*arglist, **kwargs)
  File "/home/USER/beam/lib/python3.5/site-packages/apache_be   

  am/pipeline.py", line 517, in apply
pvalueish_result = self.runner.apply(transform, pvalueish, self._options)
  File "/home/USER/beam/lib/python3.5/site-packages/apache_be   

  am/runners/runner.py", line 175, in apply
return m(transform, input, options)
  File "/home/USER/beam/lib/python3.5/site-packages/apache_be   

  am/runners/runner.py", line 181, in apply_PTransform
return transform.expand(input)
  File "/home/USER/beam/lib/python3.5/site-packages/apache_be   

  am/transforms/external.py", line 142, in expand
for tag, pcoll_id in self._expanded_transform.outputs.items()
  File "/home/USER/beam/lib/python3.5/site-packages/apache_be   

  am/transforms/external.py", line 142, in 
for tag, pcoll_id in self._expanded_transform.outputs.items()
  File "/home/USER/beam/lib/python3.5/site-packages/apache_be   

  am/runners/pipeline_context.py", line 94, in get_by_id
self._id_to_proto[id], self._pipeline_context)
  File "/home/USER/beam/lib/python3.5/site-packages/apache_be   

  am/pvalue.py", line 178, in from_runner_api