Thank you! On Mon, Nov 23, 2020 at 10:53 PM Chamikara Jayalath <[email protected]> wrote:
> We are working on this. ETA is the end of this quarter. Created > https://issues.apache.org/jira/browse/BEAM-11332 for tracking. > > Thanks, > Cham > > On Mon, Nov 23, 2020 at 5:42 AM Alan Krumholz <[email protected]> > wrote: > >> Hi Cham, >> I'm guessing this means I won't be able to use Snowflake IO with python >> on dataflow until then? >> Is there a timeline for that work to be completed? >> >> Thank you so much for your help >> >> >> >> On Fri, Nov 20, 2020 at 9:33 PM Chamikara Jayalath <[email protected]> >> wrote: >> >>> This is because Python does not understand that Java specific >>> "beam:transform:write_files:v1" transform. Hopefully this is one of those >>> issues that will get resolved when we update Dataflow to directly consume >>> portable protos (we are working on this now). >>> >>> Thanks, >>> Cham >>> >>> On Fri, Nov 20, 2020 at 11:14 AM Brian Hulette <[email protected]> >>> wrote: >>> >>>> +Chamikara Jayalath <[email protected]> any idea why this is still >>>> doing a runner api roundtrip and failing? It's a multi-language pipeline, >>>> and Alan has it configured to run on Dataflow runner V2. >>>> >>>> On Fri, Nov 20, 2020 at 10:36 AM Alan Krumholz < >>>> [email protected]> wrote: >>>> >>>>> Just tried that and still getting this: >>>>> >>>>> ---------------------------------------------------------------------------KeyError >>>>> Traceback (most recent call >>>>> last)<ipython-input-81-a936d3a5fa70> in <module>----> 1 bq_to_snowflake( >>>>> 2 'ml-betterup.coach_search.distances', 3 >>>>> 'analytics.ml.coach_search_distances', 4 'master') >>>>> <ipython-input-78-096896321fde> in bq_to_snowflake(bq_table, snow_table, >>>>> git_branch) 138 ) 139 --> 140 result = pipeline.run() >>>>> 141 result.wait_until_finish() 142 >>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py in >>>>> run(self, test_runner_api) 512 # When possible, invoke a round >>>>> trip through the runner API. 513 if test_runner_api and >>>>> self._verify_runner_api_compatible():--> 514 return >>>>> Pipeline.from_runner_api( 515 >>>>> self.to_runner_api(use_fake_coders=True), 516 self.runner, >>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py in >>>>> from_runner_api(proto, runner, options, return_context, >>>>> allow_proto_holders) 890 requirements=proto.requirements) >>>>> 891 root_transform_id, = proto.root_transform_ids--> 892 >>>>> p.transforms_stack = [context.transforms.get_by_id(root_transform_id)] >>>>> 893 # TODO(robertwb): These are only needed to continue construction. >>>>> Omit? 894 p.applied_labels = { >>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py >>>>> in get_by_id(self, id) 113 # type: (str) -> PortableObjectT >>>>> 114 if id not in self._id_to_obj:--> 115 self._id_to_obj[id] = >>>>> self._obj_type.from_runner_api( 116 self._id_to_proto[id], >>>>> self._pipeline_context) 117 return self._id_to_obj[id] >>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py in >>>>> from_runner_api(proto, context) 1277 result.parts = [] 1278 >>>>> for transform_id in proto.subtransforms:-> 1279 part = >>>>> context.transforms.get_by_id(transform_id) 1280 part.parent = >>>>> result 1281 result.parts.append(part) >>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py >>>>> in get_by_id(self, id) 113 # type: (str) -> PortableObjectT >>>>> 114 if id not in self._id_to_obj:--> 115 self._id_to_obj[id] = >>>>> self._obj_type.from_runner_api( 116 self._id_to_proto[id], >>>>> self._pipeline_context) 117 return self._id_to_obj[id] >>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py in >>>>> from_runner_api(proto, context) 1277 result.parts = [] 1278 >>>>> for transform_id in proto.subtransforms:-> 1279 part = >>>>> context.transforms.get_by_id(transform_id) 1280 part.parent = >>>>> result 1281 result.parts.append(part) >>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py >>>>> in get_by_id(self, id) 113 # type: (str) -> PortableObjectT >>>>> 114 if id not in self._id_to_obj:--> 115 self._id_to_obj[id] = >>>>> self._obj_type.from_runner_api( 116 self._id_to_proto[id], >>>>> self._pipeline_context) 117 return self._id_to_obj[id] >>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py in >>>>> from_runner_api(proto, context) 1277 result.parts = [] 1278 >>>>> for transform_id in proto.subtransforms:-> 1279 part = >>>>> context.transforms.get_by_id(transform_id) 1280 part.parent = >>>>> result 1281 result.parts.append(part) >>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py >>>>> in get_by_id(self, id) 113 # type: (str) -> PortableObjectT >>>>> 114 if id not in self._id_to_obj:--> 115 self._id_to_obj[id] = >>>>> self._obj_type.from_runner_api( 116 self._id_to_proto[id], >>>>> self._pipeline_context) 117 return self._id_to_obj[id] >>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py in >>>>> from_runner_api(proto, context) 1277 result.parts = [] 1278 >>>>> for transform_id in proto.subtransforms:-> 1279 part = >>>>> context.transforms.get_by_id(transform_id) 1280 part.parent = >>>>> result 1281 result.parts.append(part) >>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py >>>>> in get_by_id(self, id) 113 # type: (str) -> PortableObjectT >>>>> 114 if id not in self._id_to_obj:--> 115 self._id_to_obj[id] = >>>>> self._obj_type.from_runner_api( 116 self._id_to_proto[id], >>>>> self._pipeline_context) 117 return self._id_to_obj[id] >>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py in >>>>> from_runner_api(proto, context) 1216 >>>>> is_python_side_input(side_input_tags[0]) if side_input_tags else False) >>>>> 1217 -> 1218 transform = ptransform.PTransform.from_runner_api(proto, >>>>> context) 1219 if uses_python_sideinput_tags: 1220 # >>>>> Ordering is important here. >>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/transforms/ptransform.py >>>>> in from_runner_api(cls, proto, context) 688 if proto is None or >>>>> proto.spec is None or not proto.spec.urn: 689 return None--> 690 >>>>> parameter_type, constructor = cls._known_urns[proto.spec.urn] 691 >>>>> 692 try: >>>>> KeyError: 'beam:transform:write_files:v1' >>>>> >>>>> >>>>> >>>>> On Fri, Nov 20, 2020 at 11:18 AM Brian Hulette <[email protected]> >>>>> wrote: >>>>> >>>>>> Hm try passing in the args as they would appear in >>>>>> `sys.argv`, PipelineOptions(['--experiments=use_runner_v2']) >>>>>> >>>>>> On Thu, Nov 19, 2020 at 12:14 PM Alan Krumholz < >>>>>> [email protected]> wrote: >>>>>> >>>>>>> How can I pass that flag using the SDK? >>>>>>> Tried this: >>>>>>> >>>>>>> pipeline = beam.Pipeline(options=PipelineOptions(experiments= >>>>>>>> ['use_runner_v2'], ...) >>>>>>> >>>>>>> >>>>>>> but still getting a similar error: >>>>>>> >>>>>>> ---------------------------------------------------------------------------KeyError >>>>>>> Traceback (most recent call >>>>>>> last)<ipython-input-69-a936d3a5fa70> in <module>----> 1 >>>>>>> bq_to_snowflake( 2 'ml-betterup.coach_search.distances', >>>>>>> 3 'analytics.ml.coach_search_distances', 4 'master') >>>>>>> <ipython-input-66-2b6bf3c9334b> in bq_to_snowflake(bq_table, >>>>>>> snow_table, git_branch) 138 ) 139 --> 140 result = >>>>>>> pipeline.run() 141 result.wait_until_finish() 142 >>>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py in >>>>>>> run(self, test_runner_api) 512 # When possible, invoke a round >>>>>>> trip through the runner API. 513 if test_runner_api and >>>>>>> self._verify_runner_api_compatible():--> 514 return >>>>>>> Pipeline.from_runner_api( 515 >>>>>>> self.to_runner_api(use_fake_coders=True), 516 >>>>>>> self.runner, >>>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py in >>>>>>> from_runner_api(proto, runner, options, return_context, >>>>>>> allow_proto_holders) 890 requirements=proto.requirements) >>>>>>> 891 root_transform_id, = proto.root_transform_ids--> 892 >>>>>>> p.transforms_stack = [context.transforms.get_by_id(root_transform_id)] >>>>>>> 893 # TODO(robertwb): These are only needed to continue >>>>>>> construction. Omit? 894 p.applied_labels = { >>>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py >>>>>>> in get_by_id(self, id) 113 # type: (str) -> PortableObjectT >>>>>>> 114 if id not in self._id_to_obj:--> 115 self._id_to_obj[id] >>>>>>> = self._obj_type.from_runner_api( 116 >>>>>>> self._id_to_proto[id], self._pipeline_context) 117 return >>>>>>> self._id_to_obj[id] >>>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py in >>>>>>> from_runner_api(proto, context) 1277 result.parts = [] 1278 >>>>>>> for transform_id in proto.subtransforms:-> 1279 part = >>>>>>> context.transforms.get_by_id(transform_id) 1280 part.parent = >>>>>>> result 1281 result.parts.append(part) >>>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py >>>>>>> in get_by_id(self, id) 113 # type: (str) -> PortableObjectT >>>>>>> 114 if id not in self._id_to_obj:--> 115 self._id_to_obj[id] >>>>>>> = self._obj_type.from_runner_api( 116 >>>>>>> self._id_to_proto[id], self._pipeline_context) 117 return >>>>>>> self._id_to_obj[id] >>>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py in >>>>>>> from_runner_api(proto, context) 1277 result.parts = [] 1278 >>>>>>> for transform_id in proto.subtransforms:-> 1279 part = >>>>>>> context.transforms.get_by_id(transform_id) 1280 part.parent = >>>>>>> result 1281 result.parts.append(part) >>>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py >>>>>>> in get_by_id(self, id) 113 # type: (str) -> PortableObjectT >>>>>>> 114 if id not in self._id_to_obj:--> 115 self._id_to_obj[id] >>>>>>> = self._obj_type.from_runner_api( 116 >>>>>>> self._id_to_proto[id], self._pipeline_context) 117 return >>>>>>> self._id_to_obj[id] >>>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py in >>>>>>> from_runner_api(proto, context) 1277 result.parts = [] 1278 >>>>>>> for transform_id in proto.subtransforms:-> 1279 part = >>>>>>> context.transforms.get_by_id(transform_id) 1280 part.parent = >>>>>>> result 1281 result.parts.append(part) >>>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py >>>>>>> in get_by_id(self, id) 113 # type: (str) -> PortableObjectT >>>>>>> 114 if id not in self._id_to_obj:--> 115 self._id_to_obj[id] >>>>>>> = self._obj_type.from_runner_api( 116 >>>>>>> self._id_to_proto[id], self._pipeline_context) 117 return >>>>>>> self._id_to_obj[id] >>>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py in >>>>>>> from_runner_api(proto, context) 1277 result.parts = [] 1278 >>>>>>> for transform_id in proto.subtransforms:-> 1279 part = >>>>>>> context.transforms.get_by_id(transform_id) 1280 part.parent = >>>>>>> result 1281 result.parts.append(part) >>>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py >>>>>>> in get_by_id(self, id) 113 # type: (str) -> PortableObjectT >>>>>>> 114 if id not in self._id_to_obj:--> 115 self._id_to_obj[id] >>>>>>> = self._obj_type.from_runner_api( 116 >>>>>>> self._id_to_proto[id], self._pipeline_context) 117 return >>>>>>> self._id_to_obj[id] >>>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py in >>>>>>> from_runner_api(proto, context) 1216 >>>>>>> is_python_side_input(side_input_tags[0]) if side_input_tags else False) >>>>>>> 1217 -> 1218 transform = >>>>>>> ptransform.PTransform.from_runner_api(proto, context) 1219 if >>>>>>> uses_python_sideinput_tags: 1220 # Ordering is important here. >>>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/transforms/ptransform.py >>>>>>> in from_runner_api(cls, proto, context) 688 if proto is None or >>>>>>> proto.spec is None or not proto.spec.urn: 689 return None--> >>>>>>> 690 parameter_type, constructor = cls._known_urns[proto.spec.urn] >>>>>>> 691 692 try: >>>>>>> KeyError: 'beam:transform:write_files:v1' >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Thu, Nov 19, 2020 at 2:18 PM Brian Hulette <[email protected]> >>>>>>> wrote: >>>>>>> >>>>>>>> Ah ok, you'll need to use Dataflow runner v2 [1] to run this >>>>>>>> pipeline (add the flag '--experiments=use_runner_v2'). See also [2]. >>>>>>>> >>>>>>>> [1] >>>>>>>> https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#python_11 >>>>>>>> [2] >>>>>>>> https://cloud.google.com/blog/products/data-analytics/multi-language-sdks-for-building-cloud-pipelines >>>>>>>> >>>>>>>> On Thu, Nov 19, 2020 at 11:10 AM Alan Krumholz < >>>>>>>> [email protected]> wrote: >>>>>>>> >>>>>>>>> DataFlow runner >>>>>>>>> >>>>>>>>> On Thu, Nov 19, 2020 at 2:00 PM Brian Hulette <[email protected]> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Hm what runner are you using? It looks like we're trying to >>>>>>>>>> encode and decode the pipeline proto, which isn't possible for a >>>>>>>>>> multi-language pipeline. Are you using a portable runner? >>>>>>>>>> >>>>>>>>>> Brian >>>>>>>>>> >>>>>>>>>> On Thu, Nov 19, 2020 at 10:42 AM Alan Krumholz < >>>>>>>>>> [email protected]> wrote: >>>>>>>>>> >>>>>>>>>>> got it, thanks! >>>>>>>>>>> I was using: >>>>>>>>>>> 'xxxxxx.us-east-1' >>>>>>>>>>> Seems using this instead fixes that problem: >>>>>>>>>>> 'xxxxxx.us-east-1.snowflakecomputing.com >>>>>>>>>>> >>>>>>>>>>> I'm now hitting a different error though (now in python): >>>>>>>>>>> >>>>>>>>>>> <ipython-input-42-b27732a0a892> in bq_to_snowflake(bq_table, >>>>>>>>>>>> snow_table, git_branch) >>>>>>>>>>>> 161 ) >>>>>>>>>>>> 162 >>>>>>>>>>>> --> 163 result = pipeline.run() >>>>>>>>>>>> 164 result.wait_until_finish() >>>>>>>>>>>> 165 >>>>>>>>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py >>>>>>>>>>>> in run(self, test_runner_api) >>>>>>>>>>>> 512 # When possible, invoke a round trip through the runner >>>>>>>>>>>> API. >>>>>>>>>>>> 513 if test_runner_api and self._verify_runner_api_compatible() >>>>>>>>>>>> : >>>>>>>>>>>> --> 514 return Pipeline.from_runner_api( 515 self.to_runner_api >>>>>>>>>>>> (use_fake_coders=True), >>>>>>>>>>>> 516 self.runner, >>>>>>>>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py >>>>>>>>>>>> in from_runner_api(proto, runner, options, return_context, >>>>>>>>>>>> allow_proto_holders) >>>>>>>>>>>> 890 requirements=proto.requirements) >>>>>>>>>>>> 891 root_transform_id, = proto.root_transform_ids >>>>>>>>>>>> --> 892 p.transforms_stack = [context.transforms.get_by_id( >>>>>>>>>>>> root_transform_id)] >>>>>>>>>>>> 893 # TODO(robertwb): These are only needed to continue >>>>>>>>>>>> construction. Omit? >>>>>>>>>>>> 894 p.applied_labels = { >>>>>>>>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py >>>>>>>>>>>> in get_by_id(self, id) >>>>>>>>>>>> 113 # type: (str) -> PortableObjectT >>>>>>>>>>>> 114 if id not in self._id_to_obj: >>>>>>>>>>>> --> 115 self._id_to_obj[id] = self._obj_type.from_runner_api( >>>>>>>>>>>> 116 self._id_to_proto[id], self._pipeline_context) >>>>>>>>>>>> 117 return self._id_to_obj[id] >>>>>>>>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py >>>>>>>>>>>> in from_runner_api(proto, context) >>>>>>>>>>>> 1277 result.parts = [] >>>>>>>>>>>> 1278 for transform_id in proto.subtransforms: >>>>>>>>>>>> -> 1279 part = context.transforms.get_by_id(transform_id) >>>>>>>>>>>> 1280 part.parent = result >>>>>>>>>>>> 1281 result.parts.append(part) >>>>>>>>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py >>>>>>>>>>>> in get_by_id(self, id) >>>>>>>>>>>> 113 # type: (str) -> PortableObjectT >>>>>>>>>>>> 114 if id not in self._id_to_obj: >>>>>>>>>>>> --> 115 self._id_to_obj[id] = self._obj_type.from_runner_api( >>>>>>>>>>>> 116 self._id_to_proto[id], self._pipeline_context) >>>>>>>>>>>> 117 return self._id_to_obj[id] >>>>>>>>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py >>>>>>>>>>>> in from_runner_api(proto, context) >>>>>>>>>>>> 1277 result.parts = [] >>>>>>>>>>>> 1278 for transform_id in proto.subtransforms: >>>>>>>>>>>> -> 1279 part = context.transforms.get_by_id(transform_id) >>>>>>>>>>>> 1280 part.parent = result >>>>>>>>>>>> 1281 result.parts.append(part) >>>>>>>>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py >>>>>>>>>>>> in get_by_id(self, id) >>>>>>>>>>>> 113 # type: (str) -> PortableObjectT >>>>>>>>>>>> 114 if id not in self._id_to_obj: >>>>>>>>>>>> --> 115 self._id_to_obj[id] = self._obj_type.from_runner_api( >>>>>>>>>>>> 116 self._id_to_proto[id], self._pipeline_context) >>>>>>>>>>>> 117 return self._id_to_obj[id] >>>>>>>>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py >>>>>>>>>>>> in from_runner_api(proto, context) >>>>>>>>>>>> 1277 result.parts = [] >>>>>>>>>>>> 1278 for transform_id in proto.subtransforms: >>>>>>>>>>>> -> 1279 part = context.transforms.get_by_id(transform_id) >>>>>>>>>>>> 1280 part.parent = result >>>>>>>>>>>> 1281 result.parts.append(part) >>>>>>>>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py >>>>>>>>>>>> in get_by_id(self, id) >>>>>>>>>>>> 113 # type: (str) -> PortableObjectT >>>>>>>>>>>> 114 if id not in self._id_to_obj: >>>>>>>>>>>> --> 115 self._id_to_obj[id] = self._obj_type.from_runner_api( >>>>>>>>>>>> 116 self._id_to_proto[id], self._pipeline_context) >>>>>>>>>>>> 117 return self._id_to_obj[id] >>>>>>>>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py >>>>>>>>>>>> in from_runner_api(proto, context) >>>>>>>>>>>> 1277 result.parts = [] >>>>>>>>>>>> 1278 for transform_id in proto.subtransforms: >>>>>>>>>>>> -> 1279 part = context.transforms.get_by_id(transform_id) >>>>>>>>>>>> 1280 part.parent = result >>>>>>>>>>>> 1281 result.parts.append(part) >>>>>>>>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py >>>>>>>>>>>> in get_by_id(self, id) >>>>>>>>>>>> 113 # type: (str) -> PortableObjectT >>>>>>>>>>>> 114 if id not in self._id_to_obj: >>>>>>>>>>>> --> 115 self._id_to_obj[id] = self._obj_type.from_runner_api( >>>>>>>>>>>> 116 self._id_to_proto[id], self._pipeline_context) >>>>>>>>>>>> 117 return self._id_to_obj[id] >>>>>>>>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py >>>>>>>>>>>> in from_runner_api(proto, context) >>>>>>>>>>>> 1216 is_python_side_input(side_input_tags[0]) if >>>>>>>>>>>> side_input_tags else False) >>>>>>>>>>>> 1217 >>>>>>>>>>>> -> 1218 transform = ptransform.PTransform.from_runner_api(proto >>>>>>>>>>>> , context) >>>>>>>>>>>> 1219 if uses_python_sideinput_tags: >>>>>>>>>>>> 1220 # Ordering is important here. >>>>>>>>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/transforms/ptransform.py >>>>>>>>>>>> in from_runner_api(cls, proto, context) >>>>>>>>>>>> 688 if proto is None or proto.spec is None or not proto.spec. >>>>>>>>>>>> urn: >>>>>>>>>>>> 689 return None >>>>>>>>>>>> --> 690 parameter_type, constructor = cls._known_urns[proto. >>>>>>>>>>>> spec.urn] >>>>>>>>>>>> 691 >>>>>>>>>>>> 692 try: KeyError: 'beam:transform:write_files:v1' >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> I'll keep trying to make this work but sharing it in case you >>>>>>>>>>> can easily see what the problem is >>>>>>>>>>> >>>>>>>>>>> Thanks so much! >>>>>>>>>>> >>>>>>>>>>> On Thu, Nov 19, 2020 at 1:30 PM Brian Hulette < >>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hi Alan, >>>>>>>>>>>> Sorry this error message is so verbose. What are you passing >>>>>>>>>>>> for the server_name argument [1]? It looks like that's what the >>>>>>>>>>>> Java >>>>>>>>>>>> stacktrace is complaining about: >>>>>>>>>>>> >>>>>>>>>>>> java.lang.IllegalArgumentException: serverName must be in >>>>>>>>>>>> format <account_name>.snowflakecomputing.com >>>>>>>>>>>> >>>>>>>>>>>> [1] >>>>>>>>>>>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/snowflake.py#L302 >>>>>>>>>>>> >>>>>>>>>>>> On Thu, Nov 19, 2020 at 10:16 AM Alan Krumholz < >>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> I'm trying to replace my custom/problematic snowflake sink >>>>>>>>>>>>> with the new: >>>>>>>>>>>>> https://beam.apache.org/documentation/io/built-in/snowflake/#writing-to-snowflake >>>>>>>>>>>>> >>>>>>>>>>>>> However when I try to run my pipeline (using python) I get >>>>>>>>>>>>> this Java error: >>>>>>>>>>>>> >>>>>>>>>>>>> RuntimeError: java.lang.RuntimeException: Failed to build >>>>>>>>>>>>>> transform beam:external:java:snowflake:write:v1 from spec urn: >>>>>>>>>>>>>> "beam:external:java:snowflake:write:v1" >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> It is hard to understand why it is failing from reading the >>>>>>>>>>>>> partial java error trace I get in the output: >>>>>>>>>>>>> >>>>>>>>>>>>>> at >>>>>>>>>>>>>> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader.lambda$knownTransforms$0(ExpansionService.java:130) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> org.apache.beam.sdk.expansion.service.ExpansionService$TransformProvider.apply(ExpansionService.java:357) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> org.apache.beam.sdk.expansion.service.ExpansionService.expand(ExpansionService.java:433) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> org.apache.beam.sdk.expansion.service.ExpansionService.expand(ExpansionService.java:488) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> org.apache.beam.model.expansion.v1.ExpansionServiceGrpc$MethodHandlers.invoke(ExpansionServiceGrpc.java:232) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:172) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:331) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:817) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630) >>>>>>>>>>>>>> at java.base/java.lang.Thread.run(Thread.java:832) >>>>>>>>>>>>>> Caused by: java.lang.IllegalArgumentException: serverName must >>>>>>>>>>>>>> be in format <account_name>.snowflakecomputing.com >>>>>>>>>>>>>> at >>>>>>>>>>>>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument(Preconditions.java:141) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> org.apache.beam.sdk.io.snowflake.SnowflakeIO$DataSourceConfiguration.withServerName(SnowflakeIO.java:1700) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> org.apache.beam.sdk.io.snowflake.crosslanguage.CrossLanguageConfiguration.getDataSourceConfiguration(CrossLanguageConfiguration.java:166) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> org.apache.beam.sdk.io.snowflake.crosslanguage.WriteBuilder.buildExternal(WriteBuilder.java:78) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> org.apache.beam.sdk.io.snowflake.crosslanguage.WriteBuilder.buildExternal(WriteBuilder.java:34) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader.lambda$knownTransforms$0(ExpansionService.java:125) >>>>>>>>>>>>>> ... 12 more >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> any clue how I can debug/fix this using python? >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>>
