[ 
https://issues.apache.org/jira/browse/BEAM-7446?focusedWorklogId=250288&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-250288
 ]

ASF GitHub Bot logged work on BEAM-7446:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 29/May/19 16:58
            Start Date: 29/May/19 16:58
    Worklog Time Spent: 10m 
      Work Description: aaltay commented on pull request #8708: [BEAM-7446] 
Close grpc channel after pulling messages from a Cloud Pub/Sub topic
URL: https://github.com/apache/beam/pull/8708
 
 
   
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 250288)
    Time Spent: 20m  (was: 10m)

> Reading from a Cloud Pub/Sub topic inside Python's DirectRunner results in a 
> "too many open files" error
> --------------------------------------------------------------------------------------------------------
>
>                 Key: BEAM-7446
>                 URL: https://issues.apache.org/jira/browse/BEAM-7446
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-py-core
>    Affects Versions: 2.12.0
>            Reporter: Alexey Strokach
>            Priority: Minor
>              Labels: pull-request-available
>          Time Spent: 20m
>  Remaining Estimate: 0h
>
> Reading from a Cloud Pub/Sub topic inside Python's DirectRunner results in a 
> "too many open files" error within 2 minutes for a busy topic.
> I am not exactly sure, but it appears that "pubsub.SubscriberClient()" 
> creates a grpc channel which must be closed explicitly after we are done 
> pulling messages from that channel. This may have been due to change that was 
> introduced in [grpc 
> v1.12.0|https://github.com/grpc/grpc/releases/tag/v1.12.0] ("_a new 
> grpc.Channel.close method is introduced and correct use of gRPC Python now 
> requires that channels be closed after use_"). If the underling grcp channel 
> is not closed and many "pubsub.SubscriberClient" instances are created, the 
> system may run out of available file handles, resulting in "too many open 
> files" errors. A similar issue is reported here: 
> [https://github.com/googleapis/google-cloud-python/issues/5523].
> The issue can be reproduced by running the following file:
> {code:java}
> # directrunner_streaming_tmof.py
> from __future__ import print_function
> import multiprocessing
> import os
> import subprocess
> import time
> import apache_beam as beam
> from google.cloud import pubsub_v1
> def count_open_files():
>     """Count the number of files opened by current process."""
>     pid = multiprocessing.current_process().pid
>     lsof_out = subprocess.check_output(["lsof", "-p", str(pid)])
>     num_open_files = len(lsof_out.strip().split("\n")) - 1
>     return num_open_files
> def start_streaming_pipeline(project_id, subscription_path):
>     """Create a simple streaming pipeline."""
>     runner = beam.runners.direct.DirectRunner()
>     pipeline_options = beam.pipeline.PipelineOptions(project=project_id, 
> streaming=True)
>     taxirides_pc = (
>         #
>         beam.Pipeline(runner=runner, options=pipeline_options)
>         | "Read" >> beam.io.ReadFromPubSub(subscription=subscription_path)
>     )
>     results = taxirides_pc.pipeline.run()
>     return results
> def monitor():
>     """Periodically print the number of open files."""
>     start_time = time.time()
>     for _ in range(20):
>         num_open_files = count_open_files()
>         time_elapsed = time.time() - start_time
>         print(
>             "Time elapsed: {:<3s}s, Number of open files: {}".format(
>                 str(round(time_elapsed, 0)), num_open_files
>             )
>         )
>         if num_open_files > 1000:
>             break
>         time.sleep(5)
> if __name__ == "__main__":
>     project_id = "{project_id}"
>     topic_path = "projects/pubsub-public-data/topics/taxirides-realtime"
>     client = pubsub_v1.SubscriberClient()
>     subscription_path = client.subscription_path(project_id, 
> "taxirides-realtime-sub")
>     subscription = client.create_subscription(subscription_path, topic_path)
>     print("Subscription created: {}".format(subscription_path))
>     try:
>         results = start_streaming_pipeline(project_id, subscription_path)
>         monitor()
>     finally:
>         client.delete_subscription(subscription_path)
>         print("Subscription deleted: {}".format(subscription_path))
>         pass
> {code}
> Currently, the output from running this script looks something like:
> {noformat}
> Subscription created: projects/project_id/subscriptions/taxirides-realtime-sub
> Time elapsed: 0.0s, Number of open files: 160
> Time elapsed: 5.0s, Number of open files: 179
> Time elapsed: 11.0s, Number of open files: 247
> Time elapsed: 16.0s, Number of open files: 339
> Time elapsed: 21.0s, Number of open files: 436
> Time elapsed: 26.0s, Number of open files: 523
> Time elapsed: 31.0s, Number of open files: 615
> Time elapsed: 36.0s, Number of open files: 713
> Time elapsed: 41.0s, Number of open files: 809
> Time elapsed: 46.0s, Number of open files: 903
> Time elapsed: 52.0s, Number of open files: 999
> Time elapsed: 57.0s, Number of open files: 1095
> Subscription deleted: projects/project_id/subscriptions/taxirides-realtime-sub
> WARNING:root:The DirectPipelineResult is being garbage-collected while the 
> DirectRunner is still running the corresponding pipeline. This may lead to 
> incomplete execution of the pipeline if the main thread exits before pipeline 
> completion. Consider using result.wait_until_finish() to wait for completion 
> of pipeline execution.
> {noformat}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to