Thank you!

On Mon, Nov 23, 2020 at 10:53 PM Chamikara Jayalath <[email protected]>
wrote:

> We are working on this. ETA is the end of this quarter. Created
> https://issues.apache.org/jira/browse/BEAM-11332 for tracking.
>
> Thanks,
> Cham
>
> On Mon, Nov 23, 2020 at 5:42 AM Alan Krumholz <[email protected]>
> wrote:
>
>> Hi Cham,
>> I'm guessing this means I won't be able to use Snowflake IO with python
>> on dataflow until then?
>> Is there a timeline for that work to be completed?
>>
>> Thank you so much for your help
>>
>>
>>
>> On Fri, Nov 20, 2020 at 9:33 PM Chamikara Jayalath <[email protected]>
>> wrote:
>>
>>> This is because Python does not understand that Java specific
>>> "beam:transform:write_files:v1" transform. Hopefully this is one of those
>>> issues that will get resolved when we update Dataflow to directly consume
>>> portable protos (we are working on this now).
>>>
>>> Thanks,
>>> Cham
>>>
>>> On Fri, Nov 20, 2020 at 11:14 AM Brian Hulette <[email protected]>
>>> wrote:
>>>
>>>> +Chamikara Jayalath <[email protected]> any idea why this is still
>>>> doing a runner api roundtrip and failing? It's a multi-language pipeline,
>>>> and Alan has it configured to run on Dataflow runner V2.
>>>>
>>>> On Fri, Nov 20, 2020 at 10:36 AM Alan Krumholz <
>>>> [email protected]> wrote:
>>>>
>>>>> Just tried that and still getting this:
>>>>>
>>>>> ---------------------------------------------------------------------------KeyError
>>>>>                                   Traceback (most recent call 
>>>>> last)<ipython-input-81-a936d3a5fa70> in <module>----> 1 bq_to_snowflake(  
>>>>>     2     'ml-betterup.coach_search.distances',      3     
>>>>> 'analytics.ml.coach_search_distances',      4     'master')
>>>>> <ipython-input-78-096896321fde> in bq_to_snowflake(bq_table, snow_table, 
>>>>> git_branch)    138     )    139 --> 140     result = pipeline.run()    
>>>>> 141     result.wait_until_finish()    142
>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py in 
>>>>> run(self, test_runner_api)    512       # When possible, invoke a round 
>>>>> trip through the runner API.    513       if test_runner_api and 
>>>>> self._verify_runner_api_compatible():--> 514         return 
>>>>> Pipeline.from_runner_api(    515             
>>>>> self.to_runner_api(use_fake_coders=True),    516             self.runner,
>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py in 
>>>>> from_runner_api(proto, runner, options, return_context, 
>>>>> allow_proto_holders)    890         requirements=proto.requirements)    
>>>>> 891     root_transform_id, = proto.root_transform_ids--> 892     
>>>>> p.transforms_stack = [context.transforms.get_by_id(root_transform_id)]    
>>>>> 893     # TODO(robertwb): These are only needed to continue construction. 
>>>>> Omit?    894     p.applied_labels = {
>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py
>>>>>  in get_by_id(self, id)    113     # type: (str) -> PortableObjectT    
>>>>> 114     if id not in self._id_to_obj:--> 115       self._id_to_obj[id] = 
>>>>> self._obj_type.from_runner_api(    116           self._id_to_proto[id], 
>>>>> self._pipeline_context)    117     return self._id_to_obj[id]
>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py in 
>>>>> from_runner_api(proto, context)   1277     result.parts = []   1278     
>>>>> for transform_id in proto.subtransforms:-> 1279       part = 
>>>>> context.transforms.get_by_id(transform_id)   1280       part.parent = 
>>>>> result   1281       result.parts.append(part)
>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py
>>>>>  in get_by_id(self, id)    113     # type: (str) -> PortableObjectT    
>>>>> 114     if id not in self._id_to_obj:--> 115       self._id_to_obj[id] = 
>>>>> self._obj_type.from_runner_api(    116           self._id_to_proto[id], 
>>>>> self._pipeline_context)    117     return self._id_to_obj[id]
>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py in 
>>>>> from_runner_api(proto, context)   1277     result.parts = []   1278     
>>>>> for transform_id in proto.subtransforms:-> 1279       part = 
>>>>> context.transforms.get_by_id(transform_id)   1280       part.parent = 
>>>>> result   1281       result.parts.append(part)
>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py
>>>>>  in get_by_id(self, id)    113     # type: (str) -> PortableObjectT    
>>>>> 114     if id not in self._id_to_obj:--> 115       self._id_to_obj[id] = 
>>>>> self._obj_type.from_runner_api(    116           self._id_to_proto[id], 
>>>>> self._pipeline_context)    117     return self._id_to_obj[id]
>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py in 
>>>>> from_runner_api(proto, context)   1277     result.parts = []   1278     
>>>>> for transform_id in proto.subtransforms:-> 1279       part = 
>>>>> context.transforms.get_by_id(transform_id)   1280       part.parent = 
>>>>> result   1281       result.parts.append(part)
>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py
>>>>>  in get_by_id(self, id)    113     # type: (str) -> PortableObjectT    
>>>>> 114     if id not in self._id_to_obj:--> 115       self._id_to_obj[id] = 
>>>>> self._obj_type.from_runner_api(    116           self._id_to_proto[id], 
>>>>> self._pipeline_context)    117     return self._id_to_obj[id]
>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py in 
>>>>> from_runner_api(proto, context)   1277     result.parts = []   1278     
>>>>> for transform_id in proto.subtransforms:-> 1279       part = 
>>>>> context.transforms.get_by_id(transform_id)   1280       part.parent = 
>>>>> result   1281       result.parts.append(part)
>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py
>>>>>  in get_by_id(self, id)    113     # type: (str) -> PortableObjectT    
>>>>> 114     if id not in self._id_to_obj:--> 115       self._id_to_obj[id] = 
>>>>> self._obj_type.from_runner_api(    116           self._id_to_proto[id], 
>>>>> self._pipeline_context)    117     return self._id_to_obj[id]
>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py in 
>>>>> from_runner_api(proto, context)   1216         
>>>>> is_python_side_input(side_input_tags[0]) if side_input_tags else False)   
>>>>> 1217 -> 1218     transform = ptransform.PTransform.from_runner_api(proto, 
>>>>> context)   1219     if uses_python_sideinput_tags:   1220       # 
>>>>> Ordering is important here.
>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/transforms/ptransform.py
>>>>>  in from_runner_api(cls, proto, context)    688     if proto is None or 
>>>>> proto.spec is None or not proto.spec.urn:    689       return None--> 690 
>>>>>     parameter_type, constructor = cls._known_urns[proto.spec.urn]    691  
>>>>>    692     try:
>>>>> KeyError: 'beam:transform:write_files:v1'
>>>>>
>>>>>
>>>>>
>>>>> On Fri, Nov 20, 2020 at 11:18 AM Brian Hulette <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> Hm try passing in the args as they would appear in
>>>>>> `sys.argv`, PipelineOptions(['--experiments=use_runner_v2'])
>>>>>>
>>>>>> On Thu, Nov 19, 2020 at 12:14 PM Alan Krumholz <
>>>>>> [email protected]> wrote:
>>>>>>
>>>>>>> How can I pass that flag using the SDK?
>>>>>>> Tried this:
>>>>>>>
>>>>>>> pipeline = beam.Pipeline(options=PipelineOptions(experiments=
>>>>>>>> ['use_runner_v2'], ...)
>>>>>>>
>>>>>>>
>>>>>>> but still getting a similar error:
>>>>>>>
>>>>>>> ---------------------------------------------------------------------------KeyError
>>>>>>>                                   Traceback (most recent call 
>>>>>>> last)<ipython-input-69-a936d3a5fa70> in <module>----> 1 
>>>>>>> bq_to_snowflake(      2     'ml-betterup.coach_search.distances',      
>>>>>>> 3     'analytics.ml.coach_search_distances',      4     'master')
>>>>>>> <ipython-input-66-2b6bf3c9334b> in bq_to_snowflake(bq_table, 
>>>>>>> snow_table, git_branch)    138     )    139 --> 140     result = 
>>>>>>> pipeline.run()    141     result.wait_until_finish()    142
>>>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py in 
>>>>>>> run(self, test_runner_api)    512       # When possible, invoke a round 
>>>>>>> trip through the runner API.    513       if test_runner_api and 
>>>>>>> self._verify_runner_api_compatible():--> 514         return 
>>>>>>> Pipeline.from_runner_api(    515             
>>>>>>> self.to_runner_api(use_fake_coders=True),    516             
>>>>>>> self.runner,
>>>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py in 
>>>>>>> from_runner_api(proto, runner, options, return_context, 
>>>>>>> allow_proto_holders)    890         requirements=proto.requirements)    
>>>>>>> 891     root_transform_id, = proto.root_transform_ids--> 892     
>>>>>>> p.transforms_stack = [context.transforms.get_by_id(root_transform_id)]  
>>>>>>>   893     # TODO(robertwb): These are only needed to continue 
>>>>>>> construction. Omit?    894     p.applied_labels = {
>>>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py
>>>>>>>  in get_by_id(self, id)    113     # type: (str) -> PortableObjectT    
>>>>>>> 114     if id not in self._id_to_obj:--> 115       self._id_to_obj[id] 
>>>>>>> = self._obj_type.from_runner_api(    116           
>>>>>>> self._id_to_proto[id], self._pipeline_context)    117     return 
>>>>>>> self._id_to_obj[id]
>>>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py in 
>>>>>>> from_runner_api(proto, context)   1277     result.parts = []   1278     
>>>>>>> for transform_id in proto.subtransforms:-> 1279       part = 
>>>>>>> context.transforms.get_by_id(transform_id)   1280       part.parent = 
>>>>>>> result   1281       result.parts.append(part)
>>>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py
>>>>>>>  in get_by_id(self, id)    113     # type: (str) -> PortableObjectT    
>>>>>>> 114     if id not in self._id_to_obj:--> 115       self._id_to_obj[id] 
>>>>>>> = self._obj_type.from_runner_api(    116           
>>>>>>> self._id_to_proto[id], self._pipeline_context)    117     return 
>>>>>>> self._id_to_obj[id]
>>>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py in 
>>>>>>> from_runner_api(proto, context)   1277     result.parts = []   1278     
>>>>>>> for transform_id in proto.subtransforms:-> 1279       part = 
>>>>>>> context.transforms.get_by_id(transform_id)   1280       part.parent = 
>>>>>>> result   1281       result.parts.append(part)
>>>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py
>>>>>>>  in get_by_id(self, id)    113     # type: (str) -> PortableObjectT    
>>>>>>> 114     if id not in self._id_to_obj:--> 115       self._id_to_obj[id] 
>>>>>>> = self._obj_type.from_runner_api(    116           
>>>>>>> self._id_to_proto[id], self._pipeline_context)    117     return 
>>>>>>> self._id_to_obj[id]
>>>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py in 
>>>>>>> from_runner_api(proto, context)   1277     result.parts = []   1278     
>>>>>>> for transform_id in proto.subtransforms:-> 1279       part = 
>>>>>>> context.transforms.get_by_id(transform_id)   1280       part.parent = 
>>>>>>> result   1281       result.parts.append(part)
>>>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py
>>>>>>>  in get_by_id(self, id)    113     # type: (str) -> PortableObjectT    
>>>>>>> 114     if id not in self._id_to_obj:--> 115       self._id_to_obj[id] 
>>>>>>> = self._obj_type.from_runner_api(    116           
>>>>>>> self._id_to_proto[id], self._pipeline_context)    117     return 
>>>>>>> self._id_to_obj[id]
>>>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py in 
>>>>>>> from_runner_api(proto, context)   1277     result.parts = []   1278     
>>>>>>> for transform_id in proto.subtransforms:-> 1279       part = 
>>>>>>> context.transforms.get_by_id(transform_id)   1280       part.parent = 
>>>>>>> result   1281       result.parts.append(part)
>>>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py
>>>>>>>  in get_by_id(self, id)    113     # type: (str) -> PortableObjectT    
>>>>>>> 114     if id not in self._id_to_obj:--> 115       self._id_to_obj[id] 
>>>>>>> = self._obj_type.from_runner_api(    116           
>>>>>>> self._id_to_proto[id], self._pipeline_context)    117     return 
>>>>>>> self._id_to_obj[id]
>>>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py in 
>>>>>>> from_runner_api(proto, context)   1216         
>>>>>>> is_python_side_input(side_input_tags[0]) if side_input_tags else False) 
>>>>>>>   1217 -> 1218     transform = 
>>>>>>> ptransform.PTransform.from_runner_api(proto, context)   1219     if 
>>>>>>> uses_python_sideinput_tags:   1220       # Ordering is important here.
>>>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/transforms/ptransform.py
>>>>>>>  in from_runner_api(cls, proto, context)    688     if proto is None or 
>>>>>>> proto.spec is None or not proto.spec.urn:    689       return None--> 
>>>>>>> 690     parameter_type, constructor = cls._known_urns[proto.spec.urn]   
>>>>>>>  691     692     try:
>>>>>>> KeyError: 'beam:transform:write_files:v1'
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Nov 19, 2020 at 2:18 PM Brian Hulette <[email protected]>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Ah ok, you'll need to use Dataflow runner v2 [1] to run this
>>>>>>>> pipeline (add the flag '--experiments=use_runner_v2'). See also [2].
>>>>>>>>
>>>>>>>> [1]
>>>>>>>> https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#python_11
>>>>>>>> [2]
>>>>>>>> https://cloud.google.com/blog/products/data-analytics/multi-language-sdks-for-building-cloud-pipelines
>>>>>>>>
>>>>>>>> On Thu, Nov 19, 2020 at 11:10 AM Alan Krumholz <
>>>>>>>> [email protected]> wrote:
>>>>>>>>
>>>>>>>>> DataFlow runner
>>>>>>>>>
>>>>>>>>> On Thu, Nov 19, 2020 at 2:00 PM Brian Hulette <[email protected]>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hm what runner are you using? It looks like we're trying to
>>>>>>>>>> encode and decode the pipeline proto, which isn't possible for a
>>>>>>>>>> multi-language pipeline. Are you using a portable runner?
>>>>>>>>>>
>>>>>>>>>> Brian
>>>>>>>>>>
>>>>>>>>>> On Thu, Nov 19, 2020 at 10:42 AM Alan Krumholz <
>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>
>>>>>>>>>>> got it, thanks!
>>>>>>>>>>> I was using:
>>>>>>>>>>> 'xxxxxx.us-east-1'
>>>>>>>>>>> Seems using this instead fixes that problem:
>>>>>>>>>>> 'xxxxxx.us-east-1.snowflakecomputing.com
>>>>>>>>>>>
>>>>>>>>>>> I'm now hitting a different error though (now in python):
>>>>>>>>>>>
>>>>>>>>>>> <ipython-input-42-b27732a0a892> in bq_to_snowflake(bq_table,
>>>>>>>>>>>> snow_table, git_branch)
>>>>>>>>>>>> 161 )
>>>>>>>>>>>> 162
>>>>>>>>>>>> --> 163 result = pipeline.run()
>>>>>>>>>>>> 164 result.wait_until_finish()
>>>>>>>>>>>> 165
>>>>>>>>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py
>>>>>>>>>>>> in run(self, test_runner_api)
>>>>>>>>>>>> 512 # When possible, invoke a round trip through the runner
>>>>>>>>>>>> API.
>>>>>>>>>>>> 513 if test_runner_api and self._verify_runner_api_compatible()
>>>>>>>>>>>> :
>>>>>>>>>>>> --> 514 return Pipeline.from_runner_api( 515 self.to_runner_api
>>>>>>>>>>>> (use_fake_coders=True),
>>>>>>>>>>>> 516 self.runner,
>>>>>>>>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py
>>>>>>>>>>>> in from_runner_api(proto, runner, options, return_context,
>>>>>>>>>>>> allow_proto_holders)
>>>>>>>>>>>> 890 requirements=proto.requirements)
>>>>>>>>>>>> 891 root_transform_id, = proto.root_transform_ids
>>>>>>>>>>>> --> 892 p.transforms_stack = [context.transforms.get_by_id(
>>>>>>>>>>>> root_transform_id)]
>>>>>>>>>>>> 893 # TODO(robertwb): These are only needed to continue
>>>>>>>>>>>> construction. Omit?
>>>>>>>>>>>> 894 p.applied_labels = {
>>>>>>>>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py
>>>>>>>>>>>> in get_by_id(self, id)
>>>>>>>>>>>> 113 # type: (str) -> PortableObjectT
>>>>>>>>>>>> 114 if id not in self._id_to_obj:
>>>>>>>>>>>> --> 115 self._id_to_obj[id] = self._obj_type.from_runner_api(
>>>>>>>>>>>> 116 self._id_to_proto[id], self._pipeline_context)
>>>>>>>>>>>> 117 return self._id_to_obj[id]
>>>>>>>>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py
>>>>>>>>>>>> in from_runner_api(proto, context)
>>>>>>>>>>>> 1277 result.parts = []
>>>>>>>>>>>> 1278 for transform_id in proto.subtransforms:
>>>>>>>>>>>> -> 1279 part = context.transforms.get_by_id(transform_id)
>>>>>>>>>>>> 1280 part.parent = result
>>>>>>>>>>>> 1281 result.parts.append(part)
>>>>>>>>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py
>>>>>>>>>>>> in get_by_id(self, id)
>>>>>>>>>>>> 113 # type: (str) -> PortableObjectT
>>>>>>>>>>>> 114 if id not in self._id_to_obj:
>>>>>>>>>>>> --> 115 self._id_to_obj[id] = self._obj_type.from_runner_api(
>>>>>>>>>>>> 116 self._id_to_proto[id], self._pipeline_context)
>>>>>>>>>>>> 117 return self._id_to_obj[id]
>>>>>>>>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py
>>>>>>>>>>>> in from_runner_api(proto, context)
>>>>>>>>>>>> 1277 result.parts = []
>>>>>>>>>>>> 1278 for transform_id in proto.subtransforms:
>>>>>>>>>>>> -> 1279 part = context.transforms.get_by_id(transform_id)
>>>>>>>>>>>> 1280 part.parent = result
>>>>>>>>>>>> 1281 result.parts.append(part)
>>>>>>>>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py
>>>>>>>>>>>> in get_by_id(self, id)
>>>>>>>>>>>> 113 # type: (str) -> PortableObjectT
>>>>>>>>>>>> 114 if id not in self._id_to_obj:
>>>>>>>>>>>> --> 115 self._id_to_obj[id] = self._obj_type.from_runner_api(
>>>>>>>>>>>> 116 self._id_to_proto[id], self._pipeline_context)
>>>>>>>>>>>> 117 return self._id_to_obj[id]
>>>>>>>>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py
>>>>>>>>>>>> in from_runner_api(proto, context)
>>>>>>>>>>>> 1277 result.parts = []
>>>>>>>>>>>> 1278 for transform_id in proto.subtransforms:
>>>>>>>>>>>> -> 1279 part = context.transforms.get_by_id(transform_id)
>>>>>>>>>>>> 1280 part.parent = result
>>>>>>>>>>>> 1281 result.parts.append(part)
>>>>>>>>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py
>>>>>>>>>>>> in get_by_id(self, id)
>>>>>>>>>>>> 113 # type: (str) -> PortableObjectT
>>>>>>>>>>>> 114 if id not in self._id_to_obj:
>>>>>>>>>>>> --> 115 self._id_to_obj[id] = self._obj_type.from_runner_api(
>>>>>>>>>>>> 116 self._id_to_proto[id], self._pipeline_context)
>>>>>>>>>>>> 117 return self._id_to_obj[id]
>>>>>>>>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py
>>>>>>>>>>>> in from_runner_api(proto, context)
>>>>>>>>>>>> 1277 result.parts = []
>>>>>>>>>>>> 1278 for transform_id in proto.subtransforms:
>>>>>>>>>>>> -> 1279 part = context.transforms.get_by_id(transform_id)
>>>>>>>>>>>> 1280 part.parent = result
>>>>>>>>>>>> 1281 result.parts.append(part)
>>>>>>>>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py
>>>>>>>>>>>> in get_by_id(self, id)
>>>>>>>>>>>> 113 # type: (str) -> PortableObjectT
>>>>>>>>>>>> 114 if id not in self._id_to_obj:
>>>>>>>>>>>> --> 115 self._id_to_obj[id] = self._obj_type.from_runner_api(
>>>>>>>>>>>> 116 self._id_to_proto[id], self._pipeline_context)
>>>>>>>>>>>> 117 return self._id_to_obj[id]
>>>>>>>>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py
>>>>>>>>>>>> in from_runner_api(proto, context)
>>>>>>>>>>>> 1216 is_python_side_input(side_input_tags[0]) if
>>>>>>>>>>>> side_input_tags else False)
>>>>>>>>>>>> 1217
>>>>>>>>>>>> -> 1218 transform = ptransform.PTransform.from_runner_api(proto
>>>>>>>>>>>> , context)
>>>>>>>>>>>> 1219 if uses_python_sideinput_tags:
>>>>>>>>>>>> 1220 # Ordering is important here.
>>>>>>>>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/transforms/ptransform.py
>>>>>>>>>>>> in from_runner_api(cls, proto, context)
>>>>>>>>>>>> 688 if proto is None or proto.spec is None or not proto.spec.
>>>>>>>>>>>> urn:
>>>>>>>>>>>> 689 return None
>>>>>>>>>>>> --> 690 parameter_type, constructor = cls._known_urns[proto.
>>>>>>>>>>>> spec.urn]
>>>>>>>>>>>> 691
>>>>>>>>>>>> 692 try: KeyError: 'beam:transform:write_files:v1'
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> I'll keep trying to make this work but sharing it in case you
>>>>>>>>>>> can easily see what the problem is
>>>>>>>>>>>
>>>>>>>>>>> Thanks so much!
>>>>>>>>>>>
>>>>>>>>>>> On Thu, Nov 19, 2020 at 1:30 PM Brian Hulette <
>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi Alan,
>>>>>>>>>>>> Sorry this error message is so verbose. What are you passing
>>>>>>>>>>>> for the server_name argument [1]? It looks like that's what the 
>>>>>>>>>>>> Java
>>>>>>>>>>>> stacktrace is complaining about:
>>>>>>>>>>>>
>>>>>>>>>>>> java.lang.IllegalArgumentException: serverName must be in
>>>>>>>>>>>> format <account_name>.snowflakecomputing.com
>>>>>>>>>>>>
>>>>>>>>>>>> [1]
>>>>>>>>>>>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/snowflake.py#L302
>>>>>>>>>>>>
>>>>>>>>>>>> On Thu, Nov 19, 2020 at 10:16 AM Alan Krumholz <
>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> I'm trying to replace my custom/problematic snowflake sink
>>>>>>>>>>>>> with the new:
>>>>>>>>>>>>> https://beam.apache.org/documentation/io/built-in/snowflake/#writing-to-snowflake
>>>>>>>>>>>>>
>>>>>>>>>>>>> However when I try to run my pipeline  (using python) I get
>>>>>>>>>>>>> this Java error:
>>>>>>>>>>>>>
>>>>>>>>>>>>> RuntimeError: java.lang.RuntimeException: Failed to build
>>>>>>>>>>>>>> transform beam:external:java:snowflake:write:v1 from spec urn:
>>>>>>>>>>>>>> "beam:external:java:snowflake:write:v1"
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> It is hard to understand why it is failing from reading the 
>>>>>>>>>>>>> partial java error trace I get in the output:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> at 
>>>>>>>>>>>>>> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader.lambda$knownTransforms$0(ExpansionService.java:130)
>>>>>>>>>>>>>>  at 
>>>>>>>>>>>>>> org.apache.beam.sdk.expansion.service.ExpansionService$TransformProvider.apply(ExpansionService.java:357)
>>>>>>>>>>>>>>  at 
>>>>>>>>>>>>>> org.apache.beam.sdk.expansion.service.ExpansionService.expand(ExpansionService.java:433)
>>>>>>>>>>>>>>  at 
>>>>>>>>>>>>>> org.apache.beam.sdk.expansion.service.ExpansionService.expand(ExpansionService.java:488)
>>>>>>>>>>>>>>  at 
>>>>>>>>>>>>>> org.apache.beam.model.expansion.v1.ExpansionServiceGrpc$MethodHandlers.invoke(ExpansionServiceGrpc.java:232)
>>>>>>>>>>>>>>  at 
>>>>>>>>>>>>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:172)
>>>>>>>>>>>>>>  at 
>>>>>>>>>>>>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:331)
>>>>>>>>>>>>>>  at 
>>>>>>>>>>>>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:817)
>>>>>>>>>>>>>>  at 
>>>>>>>>>>>>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
>>>>>>>>>>>>>>  at 
>>>>>>>>>>>>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
>>>>>>>>>>>>>>  at 
>>>>>>>>>>>>>> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
>>>>>>>>>>>>>>  at 
>>>>>>>>>>>>>> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
>>>>>>>>>>>>>>  at java.base/java.lang.Thread.run(Thread.java:832)
>>>>>>>>>>>>>> Caused by: java.lang.IllegalArgumentException: serverName must 
>>>>>>>>>>>>>> be in format <account_name>.snowflakecomputing.com
>>>>>>>>>>>>>>  at 
>>>>>>>>>>>>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument(Preconditions.java:141)
>>>>>>>>>>>>>>  at 
>>>>>>>>>>>>>> org.apache.beam.sdk.io.snowflake.SnowflakeIO$DataSourceConfiguration.withServerName(SnowflakeIO.java:1700)
>>>>>>>>>>>>>>  at 
>>>>>>>>>>>>>> org.apache.beam.sdk.io.snowflake.crosslanguage.CrossLanguageConfiguration.getDataSourceConfiguration(CrossLanguageConfiguration.java:166)
>>>>>>>>>>>>>>  at 
>>>>>>>>>>>>>> org.apache.beam.sdk.io.snowflake.crosslanguage.WriteBuilder.buildExternal(WriteBuilder.java:78)
>>>>>>>>>>>>>>  at 
>>>>>>>>>>>>>> org.apache.beam.sdk.io.snowflake.crosslanguage.WriteBuilder.buildExternal(WriteBuilder.java:34)
>>>>>>>>>>>>>>  at 
>>>>>>>>>>>>>> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader.lambda$knownTransforms$0(ExpansionService.java:125)
>>>>>>>>>>>>>>  ... 12 more
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>  any clue how I can debug/fix this using python?
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>

Reply via email to