This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new a8a595d Have the ability to send log messages to a topic in Python (#1353) a8a595d is described below commit a8a595d9124d274f030cabff230e3bddc76cc241 Author: Sanjeev Kulkarni <sanjee...@gmail.com> AuthorDate: Wed Mar 7 07:29:23 2018 -0800 Have the ability to send log messages to a topic in Python (#1353) * Have the ability to send log messages to a topic in Python * Address review comments --- pulsar-functions/instance/src/main/python/log.py | 26 ++++++++++++++++++++-- .../instance/src/main/python/python_instance.py | 16 ++++++++++++- .../src/main/python/python_instance_main.py | 4 +++- 3 files changed, 42 insertions(+), 4 deletions(-) diff --git a/pulsar-functions/instance/src/main/python/log.py b/pulsar-functions/instance/src/main/python/log.py index f36e684..85b2104 100644 --- a/pulsar-functions/instance/src/main/python/log.py +++ b/pulsar-functions/instance/src/main/python/log.py @@ -23,6 +23,7 @@ ''' log.py ''' import logging from logging.handlers import RotatingFileHandler +import pulsar # Create the logger # pylint: disable=invalid-name @@ -34,6 +35,19 @@ Log = logging.getLogger() # see time formatter documentation for more date_format = "%Y-%m-%d %H:%M:%S %z" +class LogTopicHandler(logging.Handler): + def __init__(self, topic_name, pulsar_client): + Log.info("Setting up producer for log topic %s" % topic_name) + self.producer = pulsar_client.create_producer( + str(topic_name), + block_if_queue_full=True, + batching_enabled=True, + batching_max_publish_delay_ms=100, + compression_type=pulsar._pulsar.CompressionType.LZ4) + + def emit(self, record): + self.producer.send_async(record) + def configure(level=logging.INFO): """ Configure logger which dumps log on terminal @@ -50,14 +64,22 @@ def configure(level=logging.INFO): Log.handlers.remove(handler) Log.setLevel(level) + stream_handler = logging.StreamHandler() + add_handler(stream_handler) +def remove_all_handlers(): + retval = None + for handler in Log.handlers: + Log.handlers.remove(handler) + retval = handler + return retval + +def add_handler(stream_handler): log_format = "[%(asctime)s] [%(levelname)s]: %(message)s" formatter = logging.Formatter(fmt=log_format, datefmt=date_format) - stream_handler = logging.StreamHandler() stream_handler.setFormatter(formatter) Log.addHandler(stream_handler) - def init_rotating_logger(level, logfile, max_files, max_bytes): """Initializes a rotating logger diff --git a/pulsar-functions/instance/src/main/python/python_instance.py b/pulsar-functions/instance/src/main/python/python_instance.py index 08fcef5..077cb72 100644 --- a/pulsar-functions/instance/src/main/python/python_instance.py +++ b/pulsar-functions/instance/src/main/python/python_instance.py @@ -94,10 +94,13 @@ class Stats(object): return self.latency / self.nsuccessfullyprocessed class PythonInstance(object): - def __init__(self, instance_id, function_id, function_version, function_config, max_buffered_tuples, user_code, pulsar_client): + def __init__(self, instance_id, function_id, function_version, function_config, max_buffered_tuples, user_code, log_topic, pulsar_client): self.instance_config = InstanceConfig(instance_id, function_id, function_version, function_config, max_buffered_tuples) self.user_code = user_code self.queue = Queue.Queue(max_buffered_tuples) + self.log_topic_handler = None + if log_topic is not None: + self.log_topic_handler = log.LogTopicHandler(str(log_topic), pulsar_client) self.pulsar_client = pulsar_client self.input_serdes = {} self.consumers = {} @@ -174,7 +177,11 @@ class PythonInstance(object): continue self.contextimpl.set_current_message_context(msg.message.message_id(), msg.topic) output_object = None + self.saved_log_handler = None try: + if self.log_topic_handler is not None: + self.saved_log_handler = log.remove_all_handlers() + log.add_handler(self.log_topic_handler) start_time = time.time() self.current_stats.increment_processed(int(start_time) * 1000) self.total_stats.increment_processed(int(start_time) * 1000) @@ -188,9 +195,16 @@ class PythonInstance(object): self.current_stats.increment_successfully_processed(latency) self.process_result(output_object, msg) except Exception as e: + if self.log_topic_handler is not None: + log.remove_all_handlers() + log.add_handler(self.saved_log_handler) Log.exception("Exception while executing user method") self.total_stats.record_user_exception(e) self.current_stats.record_user_exception(e) + finally: + if self.log_topic_handler is not None: + log.remove_all_handlers() + log.add_handler(self.saved_log_handler) def done_producing(self, consumer, orig_message, result, sent_message): if result == pulsar.Result.Ok and self.auto_ack and self.atleast_once: diff --git a/pulsar-functions/instance/src/main/python/python_instance_main.py b/pulsar-functions/instance/src/main/python/python_instance_main.py index 3fd4358..4fe1641 100644 --- a/pulsar-functions/instance/src/main/python/python_instance_main.py +++ b/pulsar-functions/instance/src/main/python/python_instance_main.py @@ -73,6 +73,7 @@ def main(): parser.add_argument('--logging_directory', required=True, help='Logging Directory') parser.add_argument('--logging_file', required=True, help='Log file name') parser.add_argument('--auto_ack', required=True, help='Enable Autoacking?') + parser.add_argument('--log_topic', required=False, help='Topic to send Log Messages') args = parser.parse_args() log_file = os.path.join(args.logging_directory, args.logging_file + ".log.0") @@ -117,7 +118,8 @@ def main(): pulsar_client = pulsar.Client(args.pulsar_serviceurl) pyinstance = python_instance.PythonInstance(str(args.instance_id), str(args.function_id), str(args.function_version), function_config, - int(args.max_buffered_tuples), str(args.py), pulsar_client) + int(args.max_buffered_tuples), str(args.py), + args.log_topic, pulsar_client) pyinstance.run() server_instance = server.serve(args.port, pyinstance) -- To stop receiving notification emails like this one, please contact mme...@apache.org.