Problem with updating beam SDK

2020-02-07 Thread vivek chaurasiya
Hi team,

We had beam SDKs 2.5 running on AWS-EMR Spark distribution 5.17.

Essentially our beam code was just reading bunch of files from GCS and
pushing to ElasticSearch in AWS using beam's class ElasticSearchIO (
https://beam.apache.org/releases/javadoc/2.0.0/index.html?org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.html).
So there is just a Map step, no reduce/groupby/etc. in the beam code.

Basically my code is doing:
PCollection coll = // read GCS
coll.apply (ElasticSearchIO.write())

We submit spark command using 'spark-submit'
spark-submit --deploy-mode cluster --conf
spark.executor.extraJavaOptions=-DCLOUD_PLATFORM=AWS --conf
spark.driver.extraJavaOptions=-DCLOUD_PLATFORM=AWS --conf
spark.yarn.am.waitTime=300s --conf
spark.executor.extraClassPath=__app__.jar --driver-memory 8G
--num-executors 5 --executor-memory 20G --executor-cores 8 --jars
s3://snap-search-spark/cloud-dataflow-1.0.jar --class
com.snapchat.beam.common.pipeline.EMRSparkStartPipeline
s3://snap-search-spark/cloud-dataflow-1.0.jar --job=fgi-export
--isSolr=false --dateTime=2020-01-31T00:00:00 --isDev=true
--incrementalExport=false

The dump to ES was finishing in max 1hour.

This week we upgraded beam SDKs to 2.18 and running on AWS-EMR Spark
distribution 5.17. We observe that the export process becomes really slow
like 9 hours. The GCS filesize ~ 50gb (500 files of 100 mb each).

I am new to SparkUI and AWS EMR, but still i tried to see why this slowness
is happening. Few observations:

1) some executors died got SIGTERM. Then i tried this:
https://dev.sobeslavsky.net/apache-spark-sigterm-mystery-with-dynamic-allocation/
NO luck

2) I will try upgrading AWS-EMR Spark distribution 5.29 but will have to
test it.

Anyone seen similar issues in past?  Some suggestions will be highly
appreciated.

Thanks
Vivek


Re: Running Beam on Flink

2020-02-07 Thread Ankur Goenka
That seems to be a problem.

When I try the command, I get

$ telnet localhost 8099
Trying ::1...
Connected to localhost.
Escape character is '^]'.
�^CConnection closed by foreign host.

On Fri, Feb 7, 2020 at 5:34 PM Xander Song  wrote:

> Thanks for the response. After entering telnet localhost 8099, I receive
>
> Trying ::1...
>
> telnet: connect to address ::1: Connection refused
>
> Trying 127.0.0.1...
>
> telnet: connect to address 127.0.0.1: Connection refused
>
> telnet: Unable to connect to remote host
>
>
> On Fri, Feb 7, 2020 at 11:41 AM Ankur Goenka  wrote:
>
>> Seems that pipeline submission from sdk is not able to reach the job
>> server which was started in docker.
>>
>> Can you try running "telnet localhost 8099" to make sure that pipeline
>> submission can reach the job server.
>>
>> On Thu, Feb 6, 2020 at 8:16 PM Xander Song  wrote:
>>
>>> I am having difficulty following the Python guide for running Beam on
>>> Flink . I created
>>> a virtual environment with Apache Beam installed, then I started up the
>>> JobService Docker container with
>>>
>>> docker run --net=host apachebeam/flink1.9_job_server:latest
>>>
>>>
>>> I receive the following message confirming that the container is running.
>>>
>>>
>>> [main] INFO
>>> org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver -
>>> ArtifactStagingService started on localhost:8098
>>>
>>> [main] INFO
>>> org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver - Java
>>> ExpansionService started on localhost:8097
>>>
>>> [main] INFO
>>> org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver -
>>> JobService started on localhost:8099
>>>
>>>
>>> In another terminal, I execute a Beam script called
>>> test_beam_local_flink.py based on the example.
>>>
>>>
>>> from __future__ import print_function
>>> import apache_beamfrom apache_beam.options.pipeline_options import 
>>> PipelineOptions
>>>
>>> data = [1,2,3]
>>>
>>> options = PipelineOptions([
>>> "--runner=PortableRunner",
>>> "--job_endpoint=localhost:8099",
>>> "--environment_type=LOOPBACK"])
>>> with apache_beam.Pipeline(options=options) as p:
>>>   video_collection = (
>>> p | apache_beam.Create(data)
>>>   | apache_beam.Map(lambda x: x + 1)
>>>   | apache_beam.Map(lambda x: print(x))
>>>   )
>>> print('Done')
>>>
>>> After a wait, I get the following traceback.
>>>
>>> /Users/xander/Projects/flink-test/env2/lib/python2.7/site-packages/apache_beam/__init__.py:84:
>>>  UserWarning: You are using Apache Beam with Python 2. New releases of 
>>> Apache Beam will soon support Python 3 only.
>>>
>>>   'You are using Apache Beam with Python 2. '
>>>
>>> Traceback (most recent call last):
>>>
>>>   File "test_beam_local_flink.py", line 18, in 
>>>
>>> | apache_beam.Map(lambda x: print(x))
>>>
>>>   File 
>>> "/Users/xander/Projects/flink-test/env2/lib/python2.7/site-packages/apache_beam/pipeline.py",
>>>  line 481, in __exit__
>>>
>>> self.run().wait_until_finish()
>>>
>>>   File 
>>> "/Users/xander/Projects/flink-test/env2/lib/python2.7/site-packages/apache_beam/pipeline.py",
>>>  line 461, in run
>>>
>>> self._options).run(False)
>>>
>>>   File 
>>> "/Users/xander/Projects/flink-test/env2/lib/python2.7/site-packages/apache_beam/pipeline.py",
>>>  line 474, in run
>>>
>>> return self.runner.run_pipeline(self, self._options)
>>>
>>>   File 
>>> "/Users/xander/Projects/flink-test/env2/lib/python2.7/site-packages/apache_beam/runners/portability/portable_runner.py",
>>>  line 220, in run_pipeline
>>>
>>> job_service = self.create_job_service(options)
>>>
>>>   File 
>>> "/Users/xander/Projects/flink-test/env2/lib/python2.7/site-packages/apache_beam/runners/portability/portable_runner.py",
>>>  line 136, in create_job_service
>>>
>>> return server.start()
>>>
>>>   File 
>>> "/Users/xander/Projects/flink-test/env2/lib/python2.7/site-packages/apache_beam/runners/portability/job_server.py",
>>>  line 59, in start
>>>
>>> grpc.channel_ready_future(channel).result(timeout=self._timeout)
>>>
>>>   File 
>>> "/Users/xander/Projects/flink-test/env2/lib/python2.7/site-packages/grpc/_utilities.py",
>>>  line 140, in result
>>>
>>> self._block(timeout)
>>>
>>>   File 
>>> "/Users/xander/Projects/flink-test/env2/lib/python2.7/site-packages/grpc/_utilities.py",
>>>  line 86, in _block
>>>
>>> raise grpc.FutureTimeoutError()
>>>
>>> grpc.FutureTimeoutError
>>>
>>>
>>>
>>> Any help is greatly appreciated.
>>>
>>>


Re: Running Beam on Flink

2020-02-07 Thread Xander Song
Thanks for the response. After entering telnet localhost 8099, I receive

Trying ::1...

telnet: connect to address ::1: Connection refused

Trying 127.0.0.1...

telnet: connect to address 127.0.0.1: Connection refused

telnet: Unable to connect to remote host


On Fri, Feb 7, 2020 at 11:41 AM Ankur Goenka  wrote:

> Seems that pipeline submission from sdk is not able to reach the job
> server which was started in docker.
>
> Can you try running "telnet localhost 8099" to make sure that pipeline
> submission can reach the job server.
>
> On Thu, Feb 6, 2020 at 8:16 PM Xander Song  wrote:
>
>> I am having difficulty following the Python guide for running Beam on
>> Flink . I created
>> a virtual environment with Apache Beam installed, then I started up the
>> JobService Docker container with
>>
>> docker run --net=host apachebeam/flink1.9_job_server:latest
>>
>>
>> I receive the following message confirming that the container is running.
>>
>>
>> [main] INFO
>> org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver -
>> ArtifactStagingService started on localhost:8098
>>
>> [main] INFO
>> org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver - Java
>> ExpansionService started on localhost:8097
>>
>> [main] INFO
>> org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver -
>> JobService started on localhost:8099
>>
>>
>> In another terminal, I execute a Beam script called
>> test_beam_local_flink.py based on the example.
>>
>>
>> from __future__ import print_function
>> import apache_beamfrom apache_beam.options.pipeline_options import 
>> PipelineOptions
>>
>> data = [1,2,3]
>>
>> options = PipelineOptions([
>> "--runner=PortableRunner",
>> "--job_endpoint=localhost:8099",
>> "--environment_type=LOOPBACK"])
>> with apache_beam.Pipeline(options=options) as p:
>>   video_collection = (
>> p | apache_beam.Create(data)
>>   | apache_beam.Map(lambda x: x + 1)
>>   | apache_beam.Map(lambda x: print(x))
>>   )
>> print('Done')
>>
>> After a wait, I get the following traceback.
>>
>> /Users/xander/Projects/flink-test/env2/lib/python2.7/site-packages/apache_beam/__init__.py:84:
>>  UserWarning: You are using Apache Beam with Python 2. New releases of 
>> Apache Beam will soon support Python 3 only.
>>
>>   'You are using Apache Beam with Python 2. '
>>
>> Traceback (most recent call last):
>>
>>   File "test_beam_local_flink.py", line 18, in 
>>
>> | apache_beam.Map(lambda x: print(x))
>>
>>   File 
>> "/Users/xander/Projects/flink-test/env2/lib/python2.7/site-packages/apache_beam/pipeline.py",
>>  line 481, in __exit__
>>
>> self.run().wait_until_finish()
>>
>>   File 
>> "/Users/xander/Projects/flink-test/env2/lib/python2.7/site-packages/apache_beam/pipeline.py",
>>  line 461, in run
>>
>> self._options).run(False)
>>
>>   File 
>> "/Users/xander/Projects/flink-test/env2/lib/python2.7/site-packages/apache_beam/pipeline.py",
>>  line 474, in run
>>
>> return self.runner.run_pipeline(self, self._options)
>>
>>   File 
>> "/Users/xander/Projects/flink-test/env2/lib/python2.7/site-packages/apache_beam/runners/portability/portable_runner.py",
>>  line 220, in run_pipeline
>>
>> job_service = self.create_job_service(options)
>>
>>   File 
>> "/Users/xander/Projects/flink-test/env2/lib/python2.7/site-packages/apache_beam/runners/portability/portable_runner.py",
>>  line 136, in create_job_service
>>
>> return server.start()
>>
>>   File 
>> "/Users/xander/Projects/flink-test/env2/lib/python2.7/site-packages/apache_beam/runners/portability/job_server.py",
>>  line 59, in start
>>
>> grpc.channel_ready_future(channel).result(timeout=self._timeout)
>>
>>   File 
>> "/Users/xander/Projects/flink-test/env2/lib/python2.7/site-packages/grpc/_utilities.py",
>>  line 140, in result
>>
>> self._block(timeout)
>>
>>   File 
>> "/Users/xander/Projects/flink-test/env2/lib/python2.7/site-packages/grpc/_utilities.py",
>>  line 86, in _block
>>
>> raise grpc.FutureTimeoutError()
>>
>> grpc.FutureTimeoutError
>>
>>
>>
>> Any help is greatly appreciated.
>>
>>


Re: Running Beam on Flink

2020-02-07 Thread Ankur Goenka
Seems that pipeline submission from sdk is not able to reach the job server
which was started in docker.

Can you try running "telnet localhost 8099" to make sure that pipeline
submission can reach the job server.

On Thu, Feb 6, 2020 at 8:16 PM Xander Song  wrote:

> I am having difficulty following the Python guide for running Beam on
> Flink . I created a
> virtual environment with Apache Beam installed, then I started up the
> JobService Docker container with
>
> docker run --net=host apachebeam/flink1.9_job_server:latest
>
>
> I receive the following message confirming that the container is running.
>
>
> [main] INFO
> org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver -
> ArtifactStagingService started on localhost:8098
>
> [main] INFO
> org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver - Java
> ExpansionService started on localhost:8097
>
> [main] INFO
> org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver -
> JobService started on localhost:8099
>
>
> In another terminal, I execute a Beam script called
> test_beam_local_flink.py based on the example.
>
>
> from __future__ import print_function
> import apache_beamfrom apache_beam.options.pipeline_options import 
> PipelineOptions
>
> data = [1,2,3]
>
> options = PipelineOptions([
> "--runner=PortableRunner",
> "--job_endpoint=localhost:8099",
> "--environment_type=LOOPBACK"])
> with apache_beam.Pipeline(options=options) as p:
>   video_collection = (
> p | apache_beam.Create(data)
>   | apache_beam.Map(lambda x: x + 1)
>   | apache_beam.Map(lambda x: print(x))
>   )
> print('Done')
>
> After a wait, I get the following traceback.
>
> /Users/xander/Projects/flink-test/env2/lib/python2.7/site-packages/apache_beam/__init__.py:84:
>  UserWarning: You are using Apache Beam with Python 2. New releases of Apache 
> Beam will soon support Python 3 only.
>
>   'You are using Apache Beam with Python 2. '
>
> Traceback (most recent call last):
>
>   File "test_beam_local_flink.py", line 18, in 
>
> | apache_beam.Map(lambda x: print(x))
>
>   File 
> "/Users/xander/Projects/flink-test/env2/lib/python2.7/site-packages/apache_beam/pipeline.py",
>  line 481, in __exit__
>
> self.run().wait_until_finish()
>
>   File 
> "/Users/xander/Projects/flink-test/env2/lib/python2.7/site-packages/apache_beam/pipeline.py",
>  line 461, in run
>
> self._options).run(False)
>
>   File 
> "/Users/xander/Projects/flink-test/env2/lib/python2.7/site-packages/apache_beam/pipeline.py",
>  line 474, in run
>
> return self.runner.run_pipeline(self, self._options)
>
>   File 
> "/Users/xander/Projects/flink-test/env2/lib/python2.7/site-packages/apache_beam/runners/portability/portable_runner.py",
>  line 220, in run_pipeline
>
> job_service = self.create_job_service(options)
>
>   File 
> "/Users/xander/Projects/flink-test/env2/lib/python2.7/site-packages/apache_beam/runners/portability/portable_runner.py",
>  line 136, in create_job_service
>
> return server.start()
>
>   File 
> "/Users/xander/Projects/flink-test/env2/lib/python2.7/site-packages/apache_beam/runners/portability/job_server.py",
>  line 59, in start
>
> grpc.channel_ready_future(channel).result(timeout=self._timeout)
>
>   File 
> "/Users/xander/Projects/flink-test/env2/lib/python2.7/site-packages/grpc/_utilities.py",
>  line 140, in result
>
> self._block(timeout)
>
>   File 
> "/Users/xander/Projects/flink-test/env2/lib/python2.7/site-packages/grpc/_utilities.py",
>  line 86, in _block
>
> raise grpc.FutureTimeoutError()
>
> grpc.FutureTimeoutError
>
>
>
> Any help is greatly appreciated.
>
>


Re: dataflow job was working fine last night and it isn't now

2020-02-07 Thread Alan Krumholz
perfect! thank you!

On Fri, Feb 7, 2020 at 10:54 AM Valentyn Tymofieiev 
wrote:

> Thanks for your feedback. We expect that this issue will be fixed in
> cloudpickle==1.3.0. Per [1], this release may be available next week.
>
> After that you can install the fixed version of cloudpickle until the AI
> notebook image picks up the new version.
>
> [1] https://github.com/cloudpipe/cloudpickle/pull/337
>
> On Tue, Feb 4, 2020 at 12:44 PM Alan Krumholz 
> wrote:
>
>> Seems like the image we use in KFP to orchestrate the job has 
>> cloudpickle==0.8.1
>> and that one doesn't seem to cause issues.
>> I think I'm unblock for now but I'm sure I won't be the last one to try
>> to do this using GCP managed notebooks :(
>>
>> Thanks for all the help!
>>
>>
>> On Tue, Feb 4, 2020 at 12:24 PM Alan Krumholz 
>> wrote:
>>
>>> I'm using a managed notebook instance from GCP
>>> It seems those already come with cloudpickle==1.2.2 as soon as you
>>> provision it. apache-beam[gcp] will then install dill==0.3.1.1 I'm
>>> going to try to uninstall cloudpickle before installing apache-beam and see
>>> if this fixes the problem
>>>
>>> Thank you
>>>
>>> On Tue, Feb 4, 2020 at 11:54 AM Valentyn Tymofieiev 
>>> wrote:
>>>
 The fact that you have cloudpickle==1.2.2 further confirms that you
 may be hitting the same error as
 https://stackoverflow.com/questions/42960637/python-3-5-dill-pickling-unpickling-on-different-servers-keyerror-classtype
  .

 Could you try to start over with a clean virtual environment?

 On Tue, Feb 4, 2020 at 11:46 AM Alan Krumholz <
 alan.krumh...@betterup.co> wrote:

> Hi Valentyn,
>
> Here is my pip freeze on my machine (note that the error is in
> dataflow, the job runs fine in my machine)
>
> ansiwrap==0.8.4
> apache-beam==2.19.0
> arrow==0.15.5
> asn1crypto==1.3.0
> astroid==2.3.3
> astropy==3.2.3
> attrs==19.3.0
> avro-python3==1.9.1
> azure-common==1.1.24
> azure-storage-blob==2.1.0
> azure-storage-common==2.1.0
> backcall==0.1.0
> bcolz==1.2.1
> binaryornot==0.4.4
> bleach==3.1.0
> boto3==1.11.9
> botocore==1.14.9
> cachetools==3.1.1
> certifi==2019.11.28
> cffi==1.13.2
> chardet==3.0.4
> Click==7.0
> cloudpickle==1.2.2
> colorama==0.4.3
> configparser==4.0.2
> confuse==1.0.0
> cookiecutter==1.7.0
> crcmod==1.7
> cryptography==2.8
> cycler==0.10.0
> daal==2019.0
> datalab==1.1.5
> decorator==4.4.1
> defusedxml==0.6.0
> dill==0.3.1.1
> distro==1.0.1
> docker==4.1.0
> docopt==0.6.2
> docutils==0.15.2
> entrypoints==0.3
> enum34==1.1.6
> fairing==0.5.3
> fastavro==0.21.24
> fasteners==0.15
> fsspec==0.6.2
> future==0.18.2
> gcsfs==0.6.0
> gitdb2==2.0.6
> GitPython==3.0.5
> google-api-core==1.16.0
> google-api-python-client==1.7.11
> google-apitools==0.5.28
> google-auth==1.11.0
> google-auth-httplib2==0.0.3
> google-auth-oauthlib==0.4.1
> google-cloud-bigquery==1.17.1
> google-cloud-bigtable==1.0.0
> google-cloud-core==1.2.0
> google-cloud-dataproc==0.6.1
> google-cloud-datastore==1.7.4
> google-cloud-language==1.3.0
> google-cloud-logging==1.14.0
> google-cloud-monitoring==0.31.1
> google-cloud-pubsub==1.0.2
> google-cloud-secret-manager==0.1.1
> google-cloud-spanner==1.13.0
> google-cloud-storage==1.25.0
> google-cloud-translate==2.0.0
> google-compute-engine==20191210.0
> google-resumable-media==0.4.1
> googleapis-common-protos==1.51.0
> grpc-google-iam-v1==0.12.3
> grpcio==1.26.0
> h5py==2.10.0
> hdfs==2.5.8
> html5lib==1.0.1
> htmlmin==0.1.12
> httplib2==0.12.0
> icc-rt==2020.0.133
> idna==2.8
> ijson==2.6.1
> imageio==2.6.1
> importlib-metadata==1.4.0
> intel-numpy==1.15.1
> intel-openmp==2020.0.133
> intel-scikit-learn==0.19.2
> intel-scipy==1.1.0
> ipykernel==5.1.4
> ipython==7.9.0
> ipython-genutils==0.2.0
> ipython-sql==0.3.9
> ipywidgets==7.5.1
> isort==4.3.21
> jedi==0.16.0
> Jinja2==2.11.0
> jinja2-time==0.2.0
> jmespath==0.9.4
> joblib==0.14.1
> json5==0.8.5
> jsonschema==3.2.0
> jupyter==1.0.0
> jupyter-aihub-deploy-extension==0.1
> jupyter-client==5.3.4
> jupyter-console==6.1.0
> jupyter-contrib-core==0.3.3
> jupyter-contrib-nbextensions==0.5.1
> jupyter-core==4.6.1
> jupyter-highlight-selected-word==0.2.0
> jupyter-http-over-ws==0.0.7
> jupyter-latex-envs==1.4.6
> jupyter-nbextensions-configurator==0.4.1
> jupyterlab==1.2.6
> jupyterlab-git==0.9.0
> jupyterlab-server==1.0.6
> keyring==10.1
> keyrings.alt==1.3
> kiwisolver==1.1.0
> kubernetes==10.0.1
> lazy-object-proxy==1.4.3
> llvmlite==0.31.0
> lxml==4.4.2
> Markdown==3.1.1
> 

Re: dataflow job was working fine last night and it isn't now

2020-02-07 Thread Valentyn Tymofieiev
Thanks for your feedback. We expect that this issue will be fixed in
cloudpickle==1.3.0. Per [1], this release may be available next week.

After that you can install the fixed version of cloudpickle until the AI
notebook image picks up the new version.

[1] https://github.com/cloudpipe/cloudpickle/pull/337

On Tue, Feb 4, 2020 at 12:44 PM Alan Krumholz 
wrote:

> Seems like the image we use in KFP to orchestrate the job has 
> cloudpickle==0.8.1
> and that one doesn't seem to cause issues.
> I think I'm unblock for now but I'm sure I won't be the last one to try to
> do this using GCP managed notebooks :(
>
> Thanks for all the help!
>
>
> On Tue, Feb 4, 2020 at 12:24 PM Alan Krumholz 
> wrote:
>
>> I'm using a managed notebook instance from GCP
>> It seems those already come with cloudpickle==1.2.2 as soon as you
>> provision it. apache-beam[gcp] will then install dill==0.3.1.1 I'm going
>> to try to uninstall cloudpickle before installing apache-beam and see if
>> this fixes the problem
>>
>> Thank you
>>
>> On Tue, Feb 4, 2020 at 11:54 AM Valentyn Tymofieiev 
>> wrote:
>>
>>> The fact that you have cloudpickle==1.2.2 further confirms that you may
>>> be hitting the same error as
>>> https://stackoverflow.com/questions/42960637/python-3-5-dill-pickling-unpickling-on-different-servers-keyerror-classtype
>>>  .
>>>
>>> Could you try to start over with a clean virtual environment?
>>>
>>> On Tue, Feb 4, 2020 at 11:46 AM Alan Krumholz 
>>> wrote:
>>>
 Hi Valentyn,

 Here is my pip freeze on my machine (note that the error is in
 dataflow, the job runs fine in my machine)

 ansiwrap==0.8.4
 apache-beam==2.19.0
 arrow==0.15.5
 asn1crypto==1.3.0
 astroid==2.3.3
 astropy==3.2.3
 attrs==19.3.0
 avro-python3==1.9.1
 azure-common==1.1.24
 azure-storage-blob==2.1.0
 azure-storage-common==2.1.0
 backcall==0.1.0
 bcolz==1.2.1
 binaryornot==0.4.4
 bleach==3.1.0
 boto3==1.11.9
 botocore==1.14.9
 cachetools==3.1.1
 certifi==2019.11.28
 cffi==1.13.2
 chardet==3.0.4
 Click==7.0
 cloudpickle==1.2.2
 colorama==0.4.3
 configparser==4.0.2
 confuse==1.0.0
 cookiecutter==1.7.0
 crcmod==1.7
 cryptography==2.8
 cycler==0.10.0
 daal==2019.0
 datalab==1.1.5
 decorator==4.4.1
 defusedxml==0.6.0
 dill==0.3.1.1
 distro==1.0.1
 docker==4.1.0
 docopt==0.6.2
 docutils==0.15.2
 entrypoints==0.3
 enum34==1.1.6
 fairing==0.5.3
 fastavro==0.21.24
 fasteners==0.15
 fsspec==0.6.2
 future==0.18.2
 gcsfs==0.6.0
 gitdb2==2.0.6
 GitPython==3.0.5
 google-api-core==1.16.0
 google-api-python-client==1.7.11
 google-apitools==0.5.28
 google-auth==1.11.0
 google-auth-httplib2==0.0.3
 google-auth-oauthlib==0.4.1
 google-cloud-bigquery==1.17.1
 google-cloud-bigtable==1.0.0
 google-cloud-core==1.2.0
 google-cloud-dataproc==0.6.1
 google-cloud-datastore==1.7.4
 google-cloud-language==1.3.0
 google-cloud-logging==1.14.0
 google-cloud-monitoring==0.31.1
 google-cloud-pubsub==1.0.2
 google-cloud-secret-manager==0.1.1
 google-cloud-spanner==1.13.0
 google-cloud-storage==1.25.0
 google-cloud-translate==2.0.0
 google-compute-engine==20191210.0
 google-resumable-media==0.4.1
 googleapis-common-protos==1.51.0
 grpc-google-iam-v1==0.12.3
 grpcio==1.26.0
 h5py==2.10.0
 hdfs==2.5.8
 html5lib==1.0.1
 htmlmin==0.1.12
 httplib2==0.12.0
 icc-rt==2020.0.133
 idna==2.8
 ijson==2.6.1
 imageio==2.6.1
 importlib-metadata==1.4.0
 intel-numpy==1.15.1
 intel-openmp==2020.0.133
 intel-scikit-learn==0.19.2
 intel-scipy==1.1.0
 ipykernel==5.1.4
 ipython==7.9.0
 ipython-genutils==0.2.0
 ipython-sql==0.3.9
 ipywidgets==7.5.1
 isort==4.3.21
 jedi==0.16.0
 Jinja2==2.11.0
 jinja2-time==0.2.0
 jmespath==0.9.4
 joblib==0.14.1
 json5==0.8.5
 jsonschema==3.2.0
 jupyter==1.0.0
 jupyter-aihub-deploy-extension==0.1
 jupyter-client==5.3.4
 jupyter-console==6.1.0
 jupyter-contrib-core==0.3.3
 jupyter-contrib-nbextensions==0.5.1
 jupyter-core==4.6.1
 jupyter-highlight-selected-word==0.2.0
 jupyter-http-over-ws==0.0.7
 jupyter-latex-envs==1.4.6
 jupyter-nbextensions-configurator==0.4.1
 jupyterlab==1.2.6
 jupyterlab-git==0.9.0
 jupyterlab-server==1.0.6
 keyring==10.1
 keyrings.alt==1.3
 kiwisolver==1.1.0
 kubernetes==10.0.1
 lazy-object-proxy==1.4.3
 llvmlite==0.31.0
 lxml==4.4.2
 Markdown==3.1.1
 MarkupSafe==1.1.1
 matplotlib==3.0.3
 mccabe==0.6.1
 missingno==0.4.2
 mistune==0.8.4
 mkl==2019.0
 mkl-fft==1.0.6
 mkl-random==1.0.1.1
 mock==2.0.0
 monotonic==1.5
 more-itertools==8.1.0
 nbconvert==5.6.1
 nbdime==1.1.0
 nbformat==5.0.4
 

Re: Beam-SQL: How to create a UDF that returns a Map ?

2020-02-07 Thread Andrew Pilloud
Thanks for reporting and finding the root cause! Last I heard Calcite was
going to start a release shortly. We plan to update once the next version
is out.

Andrew

On Fri, Feb 7, 2020 at 4:38 AM Niels Basjes  wrote:

> Hi,
>
> I've done some serious debugging and traced the problem to what seems to
> be the root cause.
> The class
> org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.type.
> JavaToSqlTypeConversionRules
> does not have a mapping from java.util.Map to SqlTypeName.MAP.
>
> As a consequence the JavaType(MAP) is converted into SqlTypeName.OTHER which 
> breaks everything downstream.
>
> The nice thing is that this root cause seems to be fixed in 
> https://issues.apache.org/jira/browse/CALCITE-3429
>
> https://github.com/apache/calcite/commit/ff44204dc2899e0c34e94f70c2e0c301170daca3
>
> Which has not yet been released ...
>
> I created https://issues.apache.org/jira/browse/BEAM-9267 to track this.
>
> Niels Basjes
>
>
>
> On Fri, Feb 7, 2020 at 11:26 AM Niels Basjes  wrote:
>
>> Hi,
>>
>> My context: Java 8 , Beam 2.19.0
>>
>> *TLDR*: How do I create a Beam-SQL UDF that returns a Map> String> ?
>>
>> I have a library ( https://yauaa.basjes.nl ) that people would like to
>> use in combination with Beam-SQL.
>> The essence of this library is that a String goes in an a Key-Value set
>> (Map) comes out.
>>
>> I've already done this for Flink-SQL and there it was relatively easy:
>> Just implement the appropriate function and specify that the return type is
>> a Map.
>> See
>> https://github.com/nielsbasjes/yauaa/blob/master/udfs/flink-table/src/main/java/nl/basjes/parse/useragent/flink/table/AnalyzeUseragentFunction.java#L88
>>
>> Now for Beam-SQL I've been able to implement a function that returns a
>> String but if I try to return a Map I get a nasty error.
>>
>> If I use this smallest function possible in my SQL (Calcite)
>>
>> public class FooMap implements SerializableFunction> String>> {
>> @Override
>> public Map apply(String input) {
>> final HashMap hashMap = new HashMap<>();
>> hashMap.put("Some", "Thing");
>> return hashMap;
>> }
>> }
>>
>> I get this error
>>
>>
>> java.lang.NullPointerException: Null type
>>
>> at
>> org.apache.beam.sdk.schemas.AutoValue_Schema_Field$Builder.setType(AutoValue_Schema_Field.java:84)
>> at org.apache.beam.sdk.schemas.Schema$Field.of(Schema.java:893)
>> at
>> org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.toField(CalciteUtils.java:234)
>> at
>> org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.toField(CalciteUtils.java:230)
>> at
>> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
>> at java.util.Iterator.forEachRemaining(Iterator.java:116)
>> at
>> java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
>> at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
>> at
>> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
>> at
>> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
>> at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>> at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566)
>> at
>> org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.toSchema(CalciteUtils.java:189)
>> at
>> org.apache.beam.sdk.extensions.sql.impl.rel.BeamCalcRel$Transform.expand(BeamCalcRel.java:129)
>> at
>> org.apache.beam.sdk.extensions.sql.impl.rel.BeamCalcRel$Transform.expand(BeamCalcRel.java:110)
>> at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:539)
>> at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:490)
>> at
>> org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.toPCollection(BeamSqlRelUtils.java:69)
>> at
>> org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.toPCollection(BeamSqlRelUtils.java:39)
>> at
>> org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:125)
>> at
>> org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:83)
>> at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:539)
>> at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:490)
>> at
>> org.apache.beam.sdk.values.PCollectionTuple.apply(PCollectionTuple.java:261)
>> at
>> nl.basjes.parse.useragent.beam.TestFunctionReturnsMap.testUserAgentAnalysisSQL(TestFunctionReturnsMap.java:81)
>>
>>
>> If I have a function that returns a String it all works as expected
>>
>>
>> public class BarString implements SerializableFunction {
>> @Override
>> public String apply(String input) {
>> return new StringBuilder(input).reverse().toString();
>> }
>> }
>>
>> I already had a look at the Beam sourcecode and I have not yet been able to 
>> figure out how I can explicitly tell the framework the Schema that my 
>> function returns (like I did in the Flink implementation of the same).
>>
>> I found the built in functions that use a @UDF annotation that specifies the 

Re: Running a Beam Pipeline on GCP Dataproc Flink Cluster

2020-02-07 Thread Paweł Kordek
Hi

I had similar use-case recently, and adding a metadata key solved the issue 
https://github.com/GoogleCloudDataproc/initialization-actions/pull/334. You 
keep the original initialization action and add for example (using gcloud) 
'--metadata 
flink-snapshot-url=http://mirrors.up.pt/pub/apache/flink/flink-1.9.1/flink-1.9.1-bin-scala_2.11.tgz'

Cheers
Pawel

From: Ismaël Mejía 
Sent: Friday, February 7, 2020 2:24 PM
To: Xander Song ; user@beam.apache.org 

Cc: u...@flink.apache.org 
Subject: Re: Running a Beam Pipeline on GCP Dataproc Flink Cluster

+user@beam.apache.org


On Fri, Feb 7, 2020 at 12:54 AM Xander Song 
mailto:iamuuriw...@gmail.com>> wrote:
I am attempting to run a Beam pipeline on a GCP Dataproc Flink cluster. I have 
followed the instructions at this 
repo
 to create a Flink cluster on Dataproc using an initialization action. However, 
the resulting cluster uses version 1.5.6 of Flink, and my project requires a 
more recent version (version 1.7, 1.8, or 1.9) for compatibility with 
Beam.

Inside of the flink.sh script in the linked repo, there is a line for 
installing Flink from a snapshot URL instead of 
apt.
 Is this the correct mechanism for installing a different version of Flink 
using the initialization script? If so, how is it meant to be used?

Thank you in advance.


Re: Running a Beam Pipeline on GCP Dataproc Flink Cluster

2020-02-07 Thread Ismaël Mejía
+user@beam.apache.org 


On Fri, Feb 7, 2020 at 12:54 AM Xander Song  wrote:

> I am attempting to run a Beam pipeline on a GCP Dataproc Flink cluster. I
> have followed the instructions at this repo
> 
>  to
> create a Flink cluster on Dataproc using an initialization action. However,
> the resulting cluster uses version 1.5.6 of Flink, and my project requires
> a more recent version (version 1.7, 1.8, or 1.9) for compatibility with
> Beam .
>
> Inside of the flink.sh script in the linked repo, there is a line for
> installing Flink from a snapshot URL instead of apt
> .
> Is this the correct mechanism for installing a different version of Flink
> using the initialization script? If so, how is it meant to be used?
>
> Thank you in advance.
>


Re: Beam-SQL: How to create a UDF that returns a Map ?

2020-02-07 Thread Niels Basjes
Hi,

I've done some serious debugging and traced the problem to what seems to be
the root cause.
The class
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.type.
JavaToSqlTypeConversionRules
does not have a mapping from java.util.Map to SqlTypeName.MAP.

As a consequence the JavaType(MAP) is converted into SqlTypeName.OTHER
which breaks everything downstream.

The nice thing is that this root cause seems to be fixed in
https://issues.apache.org/jira/browse/CALCITE-3429

https://github.com/apache/calcite/commit/ff44204dc2899e0c34e94f70c2e0c301170daca3

Which has not yet been released ...

I created https://issues.apache.org/jira/browse/BEAM-9267 to track this.

Niels Basjes



On Fri, Feb 7, 2020 at 11:26 AM Niels Basjes  wrote:

> Hi,
>
> My context: Java 8 , Beam 2.19.0
>
> *TLDR*: How do I create a Beam-SQL UDF that returns a Map
> ?
>
> I have a library ( https://yauaa.basjes.nl ) that people would like to
> use in combination with Beam-SQL.
> The essence of this library is that a String goes in an a Key-Value set
> (Map) comes out.
>
> I've already done this for Flink-SQL and there it was relatively easy:
> Just implement the appropriate function and specify that the return type is
> a Map.
> See
> https://github.com/nielsbasjes/yauaa/blob/master/udfs/flink-table/src/main/java/nl/basjes/parse/useragent/flink/table/AnalyzeUseragentFunction.java#L88
>
> Now for Beam-SQL I've been able to implement a function that returns a
> String but if I try to return a Map I get a nasty error.
>
> If I use this smallest function possible in my SQL (Calcite)
>
> public class FooMap implements SerializableFunction String>> {
> @Override
> public Map apply(String input) {
> final HashMap hashMap = new HashMap<>();
> hashMap.put("Some", "Thing");
> return hashMap;
> }
> }
>
> I get this error
>
>
> java.lang.NullPointerException: Null type
>
> at
> org.apache.beam.sdk.schemas.AutoValue_Schema_Field$Builder.setType(AutoValue_Schema_Field.java:84)
> at org.apache.beam.sdk.schemas.Schema$Field.of(Schema.java:893)
> at
> org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.toField(CalciteUtils.java:234)
> at
> org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.toField(CalciteUtils.java:230)
> at
> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> at java.util.Iterator.forEachRemaining(Iterator.java:116)
> at
> java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
> at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> at
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> at
> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
> at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566)
> at
> org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.toSchema(CalciteUtils.java:189)
> at
> org.apache.beam.sdk.extensions.sql.impl.rel.BeamCalcRel$Transform.expand(BeamCalcRel.java:129)
> at
> org.apache.beam.sdk.extensions.sql.impl.rel.BeamCalcRel$Transform.expand(BeamCalcRel.java:110)
> at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:539)
> at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:490)
> at
> org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.toPCollection(BeamSqlRelUtils.java:69)
> at
> org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.toPCollection(BeamSqlRelUtils.java:39)
> at
> org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:125)
> at
> org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:83)
> at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:539)
> at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:490)
> at
> org.apache.beam.sdk.values.PCollectionTuple.apply(PCollectionTuple.java:261)
> at
> nl.basjes.parse.useragent.beam.TestFunctionReturnsMap.testUserAgentAnalysisSQL(TestFunctionReturnsMap.java:81)
>
>
> If I have a function that returns a String it all works as expected
>
>
> public class BarString implements SerializableFunction {
> @Override
> public String apply(String input) {
> return new StringBuilder(input).reverse().toString();
> }
> }
>
> I already had a look at the Beam sourcecode and I have not yet been able to 
> figure out how I can explicitly tell the framework the Schema that my 
> function returns (like I did in the Flink implementation of the same).
>
> I found the built in functions that use a @UDF annotation that specifies the 
> returnType ... but as far as I can tell this returnType is never actually 
> used. So I cannot trace how it is done there.
>
> https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/udf/BuiltinStringFunctions.java
>
>
> So my question is really simple: How do I do this correctly?
>
>
> NOTE: The 

Beam-SQL: How to create a UDF that returns a Map ?

2020-02-07 Thread Niels Basjes
Hi,

My context: Java 8 , Beam 2.19.0

*TLDR*: How do I create a Beam-SQL UDF that returns a Map ?

I have a library ( https://yauaa.basjes.nl ) that people would like to use
in combination with Beam-SQL.
The essence of this library is that a String goes in an a Key-Value set
(Map) comes out.

I've already done this for Flink-SQL and there it was relatively easy: Just
implement the appropriate function and specify that the return type is a
Map.
See
https://github.com/nielsbasjes/yauaa/blob/master/udfs/flink-table/src/main/java/nl/basjes/parse/useragent/flink/table/AnalyzeUseragentFunction.java#L88

Now for Beam-SQL I've been able to implement a function that returns a
String but if I try to return a Map I get a nasty error.

If I use this smallest function possible in my SQL (Calcite)

public class FooMap implements SerializableFunction> {
@Override
public Map apply(String input) {
final HashMap hashMap = new HashMap<>();
hashMap.put("Some", "Thing");
return hashMap;
}
}

I get this error


java.lang.NullPointerException: Null type

at
org.apache.beam.sdk.schemas.AutoValue_Schema_Field$Builder.setType(AutoValue_Schema_Field.java:84)
at org.apache.beam.sdk.schemas.Schema$Field.of(Schema.java:893)
at
org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.toField(CalciteUtils.java:234)
at
org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.toField(CalciteUtils.java:230)
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at java.util.Iterator.forEachRemaining(Iterator.java:116)
at
java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
at
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
at
java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566)
at
org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.toSchema(CalciteUtils.java:189)
at
org.apache.beam.sdk.extensions.sql.impl.rel.BeamCalcRel$Transform.expand(BeamCalcRel.java:129)
at
org.apache.beam.sdk.extensions.sql.impl.rel.BeamCalcRel$Transform.expand(BeamCalcRel.java:110)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:539)
at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:490)
at
org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.toPCollection(BeamSqlRelUtils.java:69)
at
org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.toPCollection(BeamSqlRelUtils.java:39)
at
org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:125)
at
org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:83)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:539)
at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:490)
at
org.apache.beam.sdk.values.PCollectionTuple.apply(PCollectionTuple.java:261)
at
nl.basjes.parse.useragent.beam.TestFunctionReturnsMap.testUserAgentAnalysisSQL(TestFunctionReturnsMap.java:81)


If I have a function that returns a String it all works as expected


public class BarString implements SerializableFunction {
@Override
public String apply(String input) {
return new StringBuilder(input).reverse().toString();
}
}

I already had a look at the Beam sourcecode and I have not yet been
able to figure out how I can explicitly tell the framework the Schema
that my function returns (like I did in the Flink implementation of
the same).

I found the built in functions that use a @UDF annotation that
specifies the returnType ... but as far as I can tell this returnType
is never actually used. So I cannot trace how it is done there.

https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/udf/BuiltinStringFunctions.java


So my question is really simple: How do I do this correctly?


NOTE: The code I run the tests with:

@Category(ValidatesRunner.class)
public class TestFunctionReturnsMap implements Serializable {

private static final Logger LOG =
LoggerFactory.getLogger(TestFunctionReturnsMap.class);

@Rule
public final transient TestPipeline pipeline = TestPipeline.create();

@Test
@Category(NeedsRunner.class)
public void testUserAgentAnalysisSQL() {

// 
// Create input PCollection
Schema inputSchema = Schema
.builder()
.addStringField("bar")
.build();

PCollection input = pipeline
.apply(Create.of(Arrays.asList("One", "Two", "Three")))
.setCoder(StringUtf8Coder.of())
.apply(ParDo.of(new DoFn() {
@ProcessElement
public void processElement(ProcessContext c) {
c.output(Row