Re: [PR] [AIP-49] OpenTelemetry Traces for Apache Airflow [airflow]

2024-06-12 Thread via GitHub


howardyoo commented on PR #37948:
URL: https://github.com/apache/airflow/pull/37948#issuecomment-2163029741

   > I had a lot of conversation about open-telemetry and traces at Berlin 
Buzzwords and watched some talks and I really think it is going to make problem 
diagnosis and resolution much easier @howardyoo @ferruzzi - let's continue with 
the provider and get it out in 2.10 for people to start using it.
   
   I'd like to express my gratitude to @potiuk and @ferruzzi for getting this 
done! Thank you so much. Yes, our next mountain should be the OTEL 
instrumentation piece, and also the OTEL providers. We haven't made the PR for 
part 2, but I'll start working on it. Thank you!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [AIP-49] OpenTelemetry Traces for Apache Airflow [airflow]

2024-06-11 Thread via GitHub


potiuk commented on PR #37948:
URL: https://github.com/apache/airflow/pull/37948#issuecomment-2161203256

   I had a lot of conversation about open-telemetry and traces at Berlin 
Buzzwords and watched some talks and I really think it is going to make problem 
diagnosis and resolution much easier @howardyoo @ferruzzi - let's continue with 
the provider and get it out in 2.10 for people to start using it. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [AIP-49] OpenTelemetry Traces for Apache Airflow [airflow]

2024-06-11 Thread via GitHub


potiuk merged PR #37948:
URL: https://github.com/apache/airflow/pull/37948


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [AIP-49] OpenTelemetry Traces for Apache Airflow [airflow]

2024-06-09 Thread via GitHub


potiuk commented on PR #37948:
URL: https://github.com/apache/airflow/pull/37948#issuecomment-2156502709

   cc: @hussein-awala -> WDYT? I would love to merge that one now :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [AIP-49] OpenTelemetry Traces for Apache Airflow [airflow]

2024-06-08 Thread via GitHub


potiuk commented on PR #37948:
URL: https://github.com/apache/airflow/pull/37948#issuecomment-2156175784

   Any comments @hussein-awala ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [AIP-49] OpenTelemetry Traces for Apache Airflow [airflow]

2024-06-01 Thread via GitHub


potiuk commented on PR #37948:
URL: https://github.com/apache/airflow/pull/37948#issuecomment-2143718995

   > I'm not sure what we do usually means, here. Can you clarify? Do you mean 
we as Apache Airflow, or someone else (OTEL community) ? Also, I'm not sure 
what using two configs: class or module to use + kwargs to pass means.
   
   The thing here is that we are mostly following the "regular" way how OTEL is 
onfigured (or that's what I understand - @howardyoo to confirm). When you look 
at OTEL documentation, many of the configuration options there are by the 
choice of classes and configuring them is mostly about setting the right 
environment variables or passing a config.yaml file - they read to configure 
itself. And I think we should keep it this way:
   
   * in Airlfow configuration we can configure enabling OTEL and some "airlfow" 
side  of it
   * we leave the detailed configuration of specific OTEL classes to env 
variables / yaml files or whatever they need
   
   This way we have greater flexibility, do not have to write our own 
configuration documentation.
   
   Also see 
https://opentelemetry.io/docs/collector/configuration/#environment-variables 
   
   Hey @hussein-awala -> do you have the questions answered


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [AIP-49] OpenTelemetry Traces for Apache Airflow [airflow]

2024-05-30 Thread via GitHub


howardyoo commented on PR #37948:
URL: https://github.com/apache/airflow/pull/37948#issuecomment-2139467681

   > I'm very late to the party, but I have two comments:
   Hey @hussein-awala, thank you for your comments. It's pretty late alright, 
but I do welcome comments always!
   > 
   > * why do we need to configure the OTEL connection via Airflow configs 
while we can create an Airflow connection contains the credentials and the 
configurations in the extra and just provide its name? For me this step should 
be done before merging the PR as it's our recommended way for such things.
   
   For the configuration schemes used for OTEL traces, I have decided to have 
it closely resemble our previous implementation of OTEL metrics, due to the 
fact that I did not want to introduce a new way to configure it (and might risk 
people from getting confused). However, there is another PR that I've created 
called 'OTEL providers for Apache Airflow', which in that case, it will use the 
airflow connections. The PR is still far from getting reviewed :-). In order 
for the OTEL integration to use the airflow connection as you mentioned, this 
may need to involve a new PR to perhaps modify the OTEL metrics support to also 
use the connection, something that I believe could be considered as an 
enhancement in the future.
   
   > * I see that there is an intention to support multiple tracers, but using 
configuration dedicated for OTEL would complicate the integration later, what 
we do usually is using two configs: class or module to use + kwargs to pass. In 
your case you need two configs: the module to use to create the tracer + a 
connection name as I explained in the first comment.
   
   I'm not sure `what we do usually` means, here. Can you clarify? Do you mean 
we as Apache Airflow, or someone else (OTEL community) ? Also, I'm not sure 
what `using two configs: class or module to use + kwargs to pass` means.
   
   > 
   > WDYT?
   > 
   > For the OTEL tracer it looks good, great job!
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [AIP-49] OpenTelemetry Traces for Apache Airflow [airflow]

2024-05-29 Thread via GitHub


potiuk commented on code in PR #37948:
URL: https://github.com/apache/airflow/pull/37948#discussion_r1619311763


##
newsfragments/37948.feature.rst:
##
@@ -1,2 +1,2 @@
 OpenTelemetry Traces for Apache Airflow
-This new feature adds capability for Apache Airflow to emit 1) airflow system 
traces of scheduler, triggerer, executor, processor 2) DAG run traces for 
deployed DAG runs in OpenTelemetry format. Previously, only metrics were 
supported which emitted metrics in OpenTelemetry. This new feature will add 
richer data for users to use OpenTelemetry standard to emitt and send their 
trace data to OTLP compatible endpoints.
\ No newline at end of file
+This new feature adds capability for Apache Airflow to emit 1) airflow system 
traces of scheduler, triggerer, executor, processor 2) DAG run traces for 
deployed DAG runs in OpenTelemetry format. Previously, only metrics were 
supported which emitted metrics in OpenTelemetry. This new feature will add 
richer data for users to use OpenTelemetry standard to emitt and send their 
trace data to OTLP compatible endpoints.

Review Comment:
   Just one line possible for features :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [AIP-49] OpenTelemetry Traces for Apache Airflow [airflow]

2024-05-29 Thread via GitHub


howardyoo commented on PR #37948:
URL: https://github.com/apache/airflow/pull/37948#issuecomment-2137452322

   > One more request before we merge it @howardyoo -> could you please add 
newsfragment describing the traces?
   
   No problem! Just added and committed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [AIP-49] OpenTelemetry Traces for Apache Airflow [airflow]

2024-05-29 Thread via GitHub


potiuk commented on PR #37948:
URL: https://github.com/apache/airflow/pull/37948#issuecomment-2137153918

   One more request before we merge it @howardyoo -> could you please add 
newsfragment describing the traces?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [AIP-49] OpenTelemetry Traces for Apache Airflow [airflow]

2024-05-28 Thread via GitHub


potiuk commented on PR #37948:
URL: https://github.com/apache/airflow/pull/37948#issuecomment-2134771031

   Anyone wants to take another pass ? @ferruzzi @uranusjr ? Or should I just 
merge it ? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [AIP-49] OpenTelemetry Traces for Apache Airflow [airflow]

2024-05-27 Thread via GitHub


howardyoo commented on PR #37948:
URL: https://github.com/apache/airflow/pull/37948#issuecomment-2134378466

   Okay... ah..
   
   On Tue, May 28, 2024 at 12:28 AM Jarek Potiuk ***@***.***>
   wrote:
   
   > Static test + docs fix left to go :)
   >
   > —
   > Reply to this email directly, view it on GitHub
   > ,
   > or unsubscribe
   > 

   > .
   > You are receiving this because you were mentioned.Message ID:
   > ***@***.***>
   >
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [AIP-49] OpenTelemetry Traces for Apache Airflow [airflow]

2024-05-27 Thread via GitHub


potiuk commented on PR #37948:
URL: https://github.com/apache/airflow/pull/37948#issuecomment-2134371250

   Static test + docs fix left to go :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [AIP-49] OpenTelemetry Traces for Apache Airflow [airflow]

2024-05-27 Thread via GitHub


howardyoo commented on PR #37948:
URL: https://github.com/apache/airflow/pull/37948#issuecomment-2134324736

   > > Actually I reviewed the change and I would be ready to approve/merge 
that one when you rebase and fix all tests @howardyoo
   > 
   > Thank you, @potiuk!
   
   I have fixed all the tests!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [AIP-49] OpenTelemetry Traces for Apache Airflow [airflow]

2024-05-27 Thread via GitHub


howardyoo commented on PR #37948:
URL: https://github.com/apache/airflow/pull/37948#issuecomment-2134297561

   > Actually I reviewed the change and I would be ready to approve/merge that 
one when you rebase and fix all tests @howardyoo
   
   Thank you, @potiuk!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [AIP-49] OpenTelemetry Traces for Apache Airflow [airflow]

2024-05-27 Thread via GitHub


potiuk commented on PR #37948:
URL: https://github.com/apache/airflow/pull/37948#issuecomment-2134005638

   Actually I reviewed the change and I would be ready to approve/merge that 
one when you rebase and fix all tests @howardyoo 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [AIP-49] OpenTelemetry Traces for Apache Airflow [airflow]

2024-04-14 Thread via GitHub


howardyoo commented on PR #37948:
URL: https://github.com/apache/airflow/pull/37948#issuecomment-2055263616

   @potiuk , I am going to split this PR into part A, and the rest (Part B, C, 
D..), where part A would have the basic trace capabilities and necessary 
configuration, and part B will have the actual instrumentation of traces on 
various parts of Airflow. This will ensure the PR's are well kept in a 
reasonable size and scope, and when part A is merged, we can generate 
consecutive PR's that covers instrumentations on the following (and more if 
necessary) parts:
   - dag processing
   - executors (base, local, and sequential)
   - jobs (local job runner, scheduler, trigger)
   - models (dagrun and taskinstance)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [AIP-49] OpenTelemetry Traces for Apache Airflow [airflow]

2024-04-14 Thread via GitHub


howardyoo commented on code in PR #37948:
URL: https://github.com/apache/airflow/pull/37948#discussion_r1565189532


##
airflow/dag_processing/manager.py:
##
@@ -1029,6 +1051,25 @@ def _collect_results_from_processor(self, processor) -> 
None:
 )
 self._file_stats[processor.file_path] = stat
 file_name = Path(processor.file_path).stem
+
+"""crude exposure of instrumentation code which may need to be 
furnished"""
+span = Trace.get_tracer("DagFileProcessorManager").start_span(
+"dag_processing", start_time=datetime_to_nano(processor.start_time)

Review Comment:
   Unfortunately, it is not possible, as OTEL's time unit is not compatible 
with datetime type.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [AIP-49] OpenTelemetry Traces for Apache Airflow [airflow]

2024-04-14 Thread via GitHub


howardyoo commented on code in PR #37948:
URL: https://github.com/apache/airflow/pull/37948#discussion_r1565189095


##
airflow/traces/tracer.py:
##
@@ -0,0 +1,280 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import inspect
+import logging
+import socket
+from typing import TYPE_CHECKING, Any, Callable
+
+from airflow.configuration import conf
+from airflow.typing_compat import Protocol
+
+log = logging.getLogger(__name__)
+
+
+def gen_context(trace_id, span_id):
+"""Generate span context from trace_id and span_id."""
+from airflow.traces.otel_tracer import gen_context as otel_gen_context
+
+return otel_gen_context(trace_id, span_id)
+
+
+def gen_links_from_kv_list(list):
+"""Generate links from kv list of {trace_id:int, span_id:int}."""
+from airflow.traces.otel_tracer import gen_links_from_kv_list
+
+return gen_links_from_kv_list(list)
+
+
+def span(func):
+"""Decorate a function with span."""
+
+def wrapper(*args, **kwargs):
+func_name = func.__name__
+qual_name = func.__qualname__
+module_name = func.__module__
+if "." in qual_name:
+component = f"{qual_name.rsplit('.', 1)[0]}"
+else:
+component = module_name
+with Trace.start_span(span_name=func_name, component=component):
+if len(inspect.signature(func).parameters) > 0:
+return func(*args, **kwargs)
+else:
+return func()
+
+return wrapper
+
+
+class EmptyContext:
+"""If no Tracer is configured, EmptyContext is used as a fallback."""
+
+def __init__(self):
+self.trace_id = 1
+
+
+class EmptySpan:
+"""If no Tracer is configured, EmptySpan is used as a fallback."""
+
+def __enter__(self):
+"""Enter."""
+return self
+
+def __exit__(self, *args, **kwargs):
+"""Exit."""
+pass
+
+def __call__(self, obj):
+"""Call."""
+return obj
+
+def get_span_context(self):
+"""Get span context."""
+return EMPTY_CTX
+
+def set_attribute(self, key, value) -> None:
+"""Set an attribute to the span."""
+pass
+
+def set_attributes(self, attributes) -> None:
+"""Set multiple attributes at once."""
+pass
+
+def add_event(
+self,
+name: str,
+attributes: Any | None = None,
+timestamp: int | None = None,
+) -> None:
+"""Add event to span."""
+pass
+
+def add_link(
+self,
+context: Any,
+attributes: Any | None = None,
+) -> None:
+"""Add link to the span."""
+pass
+
+def end(self, end_time=None, *args, **kwargs) -> None:
+"""End."""
+pass
+
+
+EMPTY_SPAN = EmptySpan()
+EMPTY_CTX = EmptyContext()
+
+
+class Tracer(Protocol):
+"""This class is only used for TypeChecking (for IDEs, mypy, etc)."""
+
+instance: Tracer | EmptyTrace | None = None
+
+@classmethod
+def get_tracer(cls, component):
+"""Get a tracer."""
+raise NotImplementedError()
+
+@classmethod
+def start_span(
+cls,
+span_name: str,
+component: str | None = None,
+parent_sc=None,
+span_id=None,
+links=None,
+start_time=None,
+):
+"""Start a span."""
+raise NotImplementedError()
+
+@classmethod
+def use_span(cls, span):
+"""Use a span as current."""
+raise NotImplementedError()
+
+@classmethod
+def get_current_span(self):
+raise NotImplementedError()
+
+@classmethod
+def start_span_from_dagrun(
+cls,
+dagrun,
+span_name=None,
+service_name=None,
+component=None,
+links=None,
+):
+"""Start a span from dagrun."""
+raise NotImplementedError()
+
+@classmethod
+def start_span_from_taskinstance(
+cls,
+ti,
+span_name=None,
+component=None,
+child=False,
+links=None,
+):
+"""Start a span from taskinstance."""
+raise NotImplementedError()
+
+
+class EmptyTrace:
+"""If no Tracer is configured, EmptyTrac

Re: [PR] [AIP-49] OpenTelemetry Traces for Apache Airflow [airflow]

2024-04-14 Thread via GitHub


howardyoo closed pull request #37948: [AIP-49] OpenTelemetry Traces for Apache 
Airflow
URL: https://github.com/apache/airflow/pull/37948


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [AIP-49] OpenTelemetry Traces for Apache Airflow [airflow]

2024-04-10 Thread via GitHub


uranusjr commented on code in PR #37948:
URL: https://github.com/apache/airflow/pull/37948#discussion_r1560428293


##
airflow/dag_processing/manager.py:
##
@@ -1029,6 +1051,25 @@ def _collect_results_from_processor(self, processor) -> 
None:
 )
 self._file_stats[processor.file_path] = stat
 file_name = Path(processor.file_path).stem
+
+"""crude exposure of instrumentation code which may need to be 
furnished"""
+span = Trace.get_tracer("DagFileProcessorManager").start_span(
+"dag_processing", start_time=datetime_to_nano(processor.start_time)

Review Comment:
   Is it possible for tracers to just accept datetime objects instead? From 
past experience with StatsD, using a raw number as value is prone to user 
errors, especially since Airflow does not otherwise use nanoseconds, but either 
milliseconds or seconds instead. Having two time units is already one too many, 
adding a third one isn’t a good idea.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [AIP-49] OpenTelemetry Traces for Apache Airflow [airflow]

2024-04-10 Thread via GitHub


uranusjr commented on code in PR #37948:
URL: https://github.com/apache/airflow/pull/37948#discussion_r1560427149


##
airflow/traces/tracer.py:
##
@@ -0,0 +1,280 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import inspect
+import logging
+import socket
+from typing import TYPE_CHECKING, Any, Callable
+
+from airflow.configuration import conf
+from airflow.typing_compat import Protocol
+
+log = logging.getLogger(__name__)
+
+
+def gen_context(trace_id, span_id):
+"""Generate span context from trace_id and span_id."""
+from airflow.traces.otel_tracer import gen_context as otel_gen_context
+
+return otel_gen_context(trace_id, span_id)
+
+
+def gen_links_from_kv_list(list):
+"""Generate links from kv list of {trace_id:int, span_id:int}."""
+from airflow.traces.otel_tracer import gen_links_from_kv_list
+
+return gen_links_from_kv_list(list)
+
+
+def span(func):
+"""Decorate a function with span."""
+
+def wrapper(*args, **kwargs):
+func_name = func.__name__
+qual_name = func.__qualname__
+module_name = func.__module__
+if "." in qual_name:
+component = f"{qual_name.rsplit('.', 1)[0]}"
+else:
+component = module_name
+with Trace.start_span(span_name=func_name, component=component):
+if len(inspect.signature(func).parameters) > 0:
+return func(*args, **kwargs)
+else:
+return func()
+
+return wrapper
+
+
+class EmptyContext:
+"""If no Tracer is configured, EmptyContext is used as a fallback."""
+
+def __init__(self):
+self.trace_id = 1
+
+
+class EmptySpan:
+"""If no Tracer is configured, EmptySpan is used as a fallback."""
+
+def __enter__(self):
+"""Enter."""
+return self
+
+def __exit__(self, *args, **kwargs):
+"""Exit."""
+pass
+
+def __call__(self, obj):
+"""Call."""
+return obj
+
+def get_span_context(self):
+"""Get span context."""
+return EMPTY_CTX
+
+def set_attribute(self, key, value) -> None:
+"""Set an attribute to the span."""
+pass
+
+def set_attributes(self, attributes) -> None:
+"""Set multiple attributes at once."""
+pass
+
+def add_event(
+self,
+name: str,
+attributes: Any | None = None,
+timestamp: int | None = None,
+) -> None:
+"""Add event to span."""
+pass
+
+def add_link(
+self,
+context: Any,
+attributes: Any | None = None,
+) -> None:
+"""Add link to the span."""
+pass
+
+def end(self, end_time=None, *args, **kwargs) -> None:
+"""End."""
+pass
+
+
+EMPTY_SPAN = EmptySpan()
+EMPTY_CTX = EmptyContext()
+
+
+class Tracer(Protocol):
+"""This class is only used for TypeChecking (for IDEs, mypy, etc)."""
+
+instance: Tracer | EmptyTrace | None = None
+
+@classmethod
+def get_tracer(cls, component):
+"""Get a tracer."""
+raise NotImplementedError()
+
+@classmethod
+def start_span(
+cls,
+span_name: str,
+component: str | None = None,
+parent_sc=None,
+span_id=None,
+links=None,
+start_time=None,
+):
+"""Start a span."""
+raise NotImplementedError()
+
+@classmethod
+def use_span(cls, span):
+"""Use a span as current."""
+raise NotImplementedError()
+
+@classmethod
+def get_current_span(self):
+raise NotImplementedError()
+
+@classmethod
+def start_span_from_dagrun(
+cls,
+dagrun,
+span_name=None,
+service_name=None,
+component=None,
+links=None,
+):
+"""Start a span from dagrun."""
+raise NotImplementedError()
+
+@classmethod
+def start_span_from_taskinstance(
+cls,
+ti,
+span_name=None,
+component=None,
+child=False,
+links=None,
+):
+"""Start a span from taskinstance."""
+raise NotImplementedError()
+
+
+class EmptyTrace:
+"""If no Tracer is configured, EmptyTrace

Re: [PR] [AIP-49] OpenTelemetry Traces for Apache Airflow [airflow]

2024-04-10 Thread via GitHub


dstandish commented on PR #37948:
URL: https://github.com/apache/airflow/pull/37948#issuecomment-2048324458

   Yeah @howardyoo I think it's a good practice and will result in fewer 
errors.  Big bang PRs, while sometimes necessary, have a tendency for things to 
fall between the cracks, bugs and things to go unnoticed.
   
   Let me share what I have been doing over the last month or so with my work 
on AIP-44 which I think has worked pretty well for all parties.
   1. "get it sorta working"
   2. do a soft reset to main
   3. Look at your local changes, and identify small changes that make sense as 
a single unit.  Commit that, with a good name for the commit message.
   4. Step through and repeat (3) till all your local changes are recommitted.
   5. Copy the output of `git log --oneline` to text editor
   6. Manipulate the lines to be 
   ```
   git checkout main
   git checkout -b 
   git cherry-pick 
   ```
   So e.g. each line I do a replace of ` ` with `-` and make lowercase then I 
can use multiline editing to quickly convert to that format.
   So then e.g. 
   ```
   5db845e493   Do not log event when using db isolation (4 hours ago) 
   eb4117c50f   Fix error when setting try_number from TaskInstancePydantic (4 
hours ago) 
   a1d4eb0362   Remove unused attr _try_number on TaskInstancePydantic (4 hours 
ago) 
   19dd3f2277   Fix check of correct dag when remote call for _get_ti (4 hours 
ago) 
   4c6255b0c9   Add retry logic for RPC calls (4 hours ago) 
   ```
   becomes
   ```
   gco main
   git checkout -b do-not-log-event-when-using-db-isolation
   git cherry-pick 5db845e493
   gpsup
   
   gco main
   git checkout -b fix-error-when-setting-try_number-from-taskinstancepydantic
   git cherry-pick eb4117c50f
   gpsup
   
   gco main
   git checkout -b remove-unused-attr-_try_number-on-taskinstancepydantic
   git cherry-pick a1d4eb0362
   gpsup
   
   gco main
   git checkout -b fix-check-of-correct-dag-when-remote-call-for-_get_ti
   git cherry-pick 19dd3f2277
   gpsup
   
   gco main
   git checkout -b add-retry-logic-for-rpc-calls
   git cherry-pick 4c6255b0c9
   gpsup
   ```
   
   (gpsup is `git push --set-upstream origin $(git_current_branch)`)
   
   It's less painful than I thought it might be originally.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [AIP-49] OpenTelemetry Traces for Apache Airflow [airflow]

2024-04-10 Thread via GitHub


potiuk commented on PR #37948:
URL: https://github.com/apache/airflow/pull/37948#issuecomment-2047713345

   That works, maybe even split it to smaller pieces. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [AIP-49] OpenTelemetry Traces for Apache Airflow [airflow]

2024-04-09 Thread via GitHub


howardyoo commented on PR #37948:
URL: https://github.com/apache/airflow/pull/37948#issuecomment-2046329683

   > Also - purely practically - I think this change has a 
high-cherry-pick-breaking potential. We should deliberately start merging it 
from the changes that have the least potential conflict generation potential 
(where we break indents and merge the more conflict-inducing chnages later on 
as we get a bit closer to 2.10.0, otherwise it will be difficult to cherry-pick 
changes after those merges.
   
   Sure, that would be the part where OTEL tracing part is implemented. Without 
any actual instrumentations, those will introduce the least conflicts / impact.
   
   The first part should be:
   ===
   airflow/config_templates/config.yml
   airflow/www/views.py
   airflow/www/templates/airflow/dags.html
   
   
docs/apache-airflow/administration-and-deployment/logging-monitoring/metrics.rst
   
   scripts/ci/docker-compose/integration-otel.yml
   scripts/ci/docker-compose/otel-collector-config.yml
   
   airflow/traces/
   - __init__.py
   - otel_tracer.py
   - tracer.py
   - utils.py
   - airflow/utils/dates.py
   
   tests/core/test_otel_tracer.py
   
   And the second part should be:
   
   airflow/dag_processing/manager.py
   airflow/executors/base_executor.py
   airflow/executors/local_executor.py
   airflow/executors/sequential_executor.py
   airflow/jobs/job.py
   airflow/jobs/local_task_job_runner.py
   airflow/jobs/scheduler_job_runner.py
   airflow/jobs/trigger_job_runner.py
   airflow/models/dagrun.py
   airflow/models/taskinstance.py
   
   I think this second part could also be further separated per modules:
   1. dag_processing
   2. executors
   3. jobs
   4. models
   
   What do you think?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [AIP-49] OpenTelemetry Traces for Apache Airflow [airflow]

2024-04-09 Thread via GitHub


howardyoo commented on PR #37948:
URL: https://github.com/apache/airflow/pull/37948#issuecomment-2046324781

   > I tried to look at this PR finally and it is huge to review. I have a 
proposal though @howardyoo : Can you attempt to split split out adding traces 
separately for smaller subset a) add general functionality of enabling OTEL 
first, and then add "piece by piece" adding spans in different parts of Airflow 
that are focusing on one part of the code/functionality only?
   > 
   > I think this will be far easier to review and we could pull in other 
people who would be more familiar with different parts of the code. The way I 
did it in the past - I kept my original PR as a DRAFT and then extracted part 
of it which could be separated out as standalone - much smaller and much more 
focused. Then After merging each small PR, I'd rebase the "complete" one and 
get it smaller and smaller once individual parts of it are merged.
   
   Hi Jarek,
   I think that's possible, as when I was doing the major rebase, I pretty much 
know which part the OTEL trace enablement is, vs. which part is the 
instrumentations themselves. Let me see what I can do.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [AIP-49] OpenTelemetry Traces for Apache Airflow [airflow]

2024-04-09 Thread via GitHub


potiuk commented on PR #37948:
URL: https://github.com/apache/airflow/pull/37948#issuecomment-2044682811

   Also - purely practically - I think this change has a 
high-cherry-pick-breaking potential. We should deliberately start merging it 
from the changes that have the least potential conflict generation potential 
(where we break indents and merge the more conflict-inducing chnages later on 
as we get a bit closer to 2.10.0, otherwise it will be difficult to cherry-pick 
changes after those merges.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [AIP-49] OpenTelemetry Traces for Apache Airflow [airflow]

2024-04-09 Thread via GitHub


potiuk commented on PR #37948:
URL: https://github.com/apache/airflow/pull/37948#issuecomment-2044624684

   I tried to look at this PR finally and it is huge to review. I have a 
proposal though @howardyoo : Can you attempt to split split out adding traces 
separately for smaller subset a) add general functionality of enabling OTEL 
first, and then add "piece by piece" adding spans in different parts of Airflow 
that are focusing on one part of the code/functionality only? 
   
   I think this will be far easier to review and we could pull in other people 
who would be more familiar with different parts of the code. The way I did it 
in the past - I kept my original PR as a DRAFT and then extracted part of it 
which could be separated out as standalone - much smaller and much more 
focused. Then After merging each small PR, I'd rebase the "complete" one and 
get it smaller and smaller once individual parts of it are merged.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [AIP-49] OpenTelemetry Traces for Apache Airflow [airflow]

2024-04-09 Thread via GitHub


potiuk commented on code in PR #37948:
URL: https://github.com/apache/airflow/pull/37948#discussion_r1557334927


##
airflow/config_templates/config.yml:
##
@@ -1149,6 +1149,64 @@ metrics:
   type: string
   example: ~
   default: "False"
+traces:
+  description: |
+Distributed traces integration settings.
+  options:
+otel_on:
+  description: |
+Enables sending traces to OpenTelemetry.
+  version_added: 2.9.0

Review Comment:
   And other places.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [AIP-49] OpenTelemetry Traces for Apache Airflow [airflow]

2024-04-09 Thread via GitHub


potiuk commented on code in PR #37948:
URL: https://github.com/apache/airflow/pull/37948#discussion_r1557333993


##
airflow/config_templates/config.yml:
##
@@ -1149,6 +1149,64 @@ metrics:
   type: string
   example: ~
   default: "False"
+traces:
+  description: |
+Distributed traces integration settings.
+  options:
+otel_on:
+  description: |
+Enables sending traces to OpenTelemetry.
+  version_added: 2.9.0

Review Comment:
   ```suggestion
 version_added: 2.10.0
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [AIP-49] OpenTelemetry Traces for Apache Airflow [airflow]

2024-03-31 Thread via GitHub


howardyoo opened a new pull request, #37948:
URL: https://github.com/apache/airflow/pull/37948

   
   
   
   closes #37752 
   
   
   ---
   
   This is a PR for 
[AIP-49](https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-49+OpenTelemetry+Support+for+Apache+Airflow)
 which is Open Telemetry support for Airflow. In last year, a group of 
contributors pushed out the first release of Airflow's commitment to 
OpenTelemetry by providing _OTEL metrics support_. This PR addresses the second 
phase of the OTEL implementation for Airflow, which provides emitting 
**Traces**.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [AIP-49] OpenTelemetry Traces for Apache Airflow [airflow]

2024-03-31 Thread via GitHub


howardyoo closed pull request #37948: [AIP-49] OpenTelemetry Traces for Apache 
Airflow
URL: https://github.com/apache/airflow/pull/37948


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [AIP-49] OpenTelemetry Traces for Apache Airflow [airflow]

2024-03-25 Thread via GitHub


ferruzzi commented on PR #37948:
URL: https://github.com/apache/airflow/pull/37948#issuecomment-2019006751

   Looks like you made a ton of progress last week.  I'll try to make another 
(final??) pass in the next day or two.   Thanks!!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [AIP-49] OpenTelemetry Traces for Apache Airflow [airflow]

2024-03-25 Thread via GitHub


ferruzzi commented on code in PR #37948:
URL: https://github.com/apache/airflow/pull/37948#discussion_r1538293841


##
airflow/traces/otel_tracer.py:
##
@@ -0,0 +1,316 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import logging
+import random
+
+from opentelemetry import trace
+from opentelemetry.context import create_key
+from opentelemetry.exporter.otlp.proto.http.trace_exporter import 
OTLPSpanExporter
+from opentelemetry.sdk.resources import HOST_NAME, SERVICE_NAME, Resource
+from opentelemetry.sdk.trace import Span, Tracer as OpenTelemetryTracer, 
TracerProvider
+from opentelemetry.sdk.trace.export import BatchSpanProcessor, 
ConsoleSpanExporter
+from opentelemetry.sdk.trace.id_generator import IdGenerator
+from opentelemetry.trace import Link, NonRecordingSpan, SpanContext, 
TraceFlags, Tracer
+from opentelemetry.trace.span import INVALID_SPAN_ID, INVALID_TRACE_ID
+
+from airflow.configuration import conf
+from airflow.traces import (
+TRACEPARENT,
+TRACESTATE,
+)
+from airflow.traces.utils import (
+gen_dag_span_id,
+gen_span_id,
+gen_trace_id,
+parse_traceparent,
+parse_tracestate,
+)
+from airflow.utils.dates import datetime_to_nano
+from airflow.utils.net import get_hostname
+
+log = logging.getLogger(__name__)
+
+_NEXT_ID = create_key("next_id")
+
+
+class OtelTrace:
+"""
+Handle all tracing requirements such as getting the tracer, and starting a 
new span.
+
+When OTEL is enabled, the Trace class will be replaced by this class.
+"""
+
+def __init__(self, span_exporter: ConsoleSpanExporter | OTLPSpanExporter, 
tag_string: str | None = None):
+self.span_exporter = span_exporter
+self.span_processor = BatchSpanProcessor(self.span_exporter)
+self.tag_string = tag_string
+self.otel_service = conf.get("traces", "otel_service")
+
+def get_tracer(
+self, component: str, trace_id: int | None = None, span_id: int | None 
= None
+) -> OpenTelemetryTracer | Tracer:
+"""Tracer that will use special AirflowOtelIdGenerator to control 
producing certain span and trace id."""
+resource = Resource(attributes={HOST_NAME: get_hostname(), 
SERVICE_NAME: self.otel_service})
+if trace_id or span_id:
+# in case where trace_id or span_id was given
+tracer_provider = TracerProvider(
+resource=resource, 
id_generator=AirflowOtelIdGenerator(span_id=span_id, trace_id=trace_id)
+)
+else:
+tracer_provider = TracerProvider(resource=resource)
+tracer_provider.add_span_processor(self.span_processor)
+tracer = tracer_provider.get_tracer(component)
+"""
+Tracer will produce a single ID value if value is provided. Note that 
this is one-time only, so any
+subsequent call will produce the normal random ids.
+"""
+return tracer
+
+def get_current_span(self):
+return trace.get_current_span()
+
+def use_span(self, span: Span):
+return trace.use_span(span=span)
+
+def start_span(
+self,
+span_name: str,
+component: str | None = None,
+parent_sc: SpanContext | None = None,
+span_id=None,
+links=None,
+start_time=None,
+):
+"""Start a span; if service_name is not given, otel_service is used."""
+if component is None:
+component = self.otel_service
+
+trace_id = self.get_current_span().get_span_context().trace_id
+tracer = self.get_tracer(component=component, trace_id=trace_id, 
span_id=span_id)
+
+attributes = parse_tracestate(self.tag_string) if self.tag_string else 
{}
+
+if links is not None:
+_links = gen_links_from_kv_list(links)
+else:
+_links = []
+
+if start_time is not None:
+start_time = datetime_to_nano(start_time)
+
+if parent_sc is not None:
+ctx = trace.set_span_in_context(NonRecordingSpan(parent_sc))
+span = tracer.start_as_current_span(
+span_name, context=ctx, attributes=attributes, links=_links, 
start_time=start_time
+   

Re: [PR] [AIP-49] OpenTelemetry Traces for Apache Airflow [airflow]

2024-03-25 Thread via GitHub


ferruzzi commented on code in PR #37948:
URL: https://github.com/apache/airflow/pull/37948#discussion_r1538292247


##
airflow/traces/utils.py:
##
@@ -0,0 +1,98 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import logging
+from typing import TYPE_CHECKING
+
+from airflow.utils.hashlib_wrapper import md5
+
+if TYPE_CHECKING:
+from airflow.models import DagRun, TaskInstance
+from airflow.models.taskinstancekey import TaskInstanceKey
+
+TRACE_ID = 0
+SPAN_ID = 16
+
+log = logging.getLogger(__name__)
+
+
+def _gen_id(seeds: list[str], as_int: bool = False, type: int = TRACE_ID) -> 
str | int:
+seed_str = "_".join(seeds).encode("utf-8")
+hash_hex = md5(seed_str).hexdigest()[type:]
+return int(hash_hex, 16) if as_int else hash_hex
+
+
+def gen_trace_id(dag_run: DagRun, as_int: bool = False) -> str | int:
+"""Generate trace id from DagRun."""
+return _gen_id([dag_run.dag_id, dag_run.run_id, 
str(dag_run.start_date.timestamp())], as_int)
+
+
+def gen_span_id_from_ti_key(ti_key: TaskInstanceKey, as_int: bool = False) -> 
str | int:
+"""Generate span id from TI key."""
+return _gen_id([ti_key.dag_id, ti_key.run_id, ti_key.task_id, 
str(ti_key.try_number)], as_int, SPAN_ID)
+
+
+def gen_dag_span_id(dag_run: DagRun, as_int: bool = False) -> str | int:
+"""Generate dag's root span id using dag_run."""
+return _gen_id([dag_run.dag_id, dag_run.run_id, 
str(dag_run.start_date.timestamp())], as_int, SPAN_ID)
+
+
+def gen_span_id(ti: TaskInstance, as_int: bool = False) -> str | int:
+"""Generate span id from the task instance."""
+dag_run = ti.dag_run
+"""When this is called, the try_number of ti is already set to next(+1), 
hence the subtraction"""
+return _gen_id([dag_run.dag_id, dag_run.run_id, ti.task_id, 
str(ti.try_number - 1)], as_int, SPAN_ID)
+
+
+def parse_traceparent(traceparent_str: str | None = None) -> dict:
+"""Parse traceparent string: 
00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01."""
+if traceparent_str is None:
+return {}
+tokens = traceparent_str.split("-")
+if len(tokens) != 4:
+raise ValueError("The traceparent string does not have the correct 
format.")
+return {"version": tokens[0], "trace_id": tokens[1], "parent_id": 
tokens[2], "flags": tokens[3]}
+
+
+def parse_tracestate(tracestate_str: str | None = None) -> dict:
+"""Parse tracestate string: rojo=00f067aa0ba902b7,congo=t61rcWkgMzE."""
+if tracestate_str is None:
+return {}
+tokens = tracestate_str.split(",")
+result = {}
+for pair in tokens:
+key, value = pair.split("=")
+result[key.strip()] = value.strip()
+return result
+
+
+def is_valid_trace_id(trace_id: str) -> bool:
+"""Check whether trace id is valid."""
+if trace_id is not None and len(trace_id) == 34 and int(trace_id, 16) != 0:
+return True
+else:
+return False

Review Comment:
   Sorry for the delay, if you don't like the parens in my last suggestion, 
that's fine.  Another option worth considering in that case would be to leave 
yours but drop the `else`.
   
   ```
   if (stuff):
   return True
   else:
   return False
   ```
   
   is functionally identical to 
   
   ```
   if (stuff):
   return True
   return False
   ```
   
   
   But it's not a big deal, up to you.  Feel free to resolve this one if you 
want.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [AIP-49] OpenTelemetry Traces for Apache Airflow [airflow]

2024-03-15 Thread via GitHub


howardyoo commented on code in PR #37948:
URL: https://github.com/apache/airflow/pull/37948#discussion_r1526937457


##
airflow/traces/otel_tracer.py:
##
@@ -0,0 +1,316 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import logging
+import random
+
+from opentelemetry import trace
+from opentelemetry.context import create_key
+from opentelemetry.exporter.otlp.proto.http.trace_exporter import 
OTLPSpanExporter
+from opentelemetry.sdk.resources import HOST_NAME, SERVICE_NAME, Resource
+from opentelemetry.sdk.trace import Span, Tracer as OpenTelemetryTracer, 
TracerProvider
+from opentelemetry.sdk.trace.export import BatchSpanProcessor, 
ConsoleSpanExporter
+from opentelemetry.sdk.trace.id_generator import IdGenerator
+from opentelemetry.trace import Link, NonRecordingSpan, SpanContext, 
TraceFlags, Tracer
+from opentelemetry.trace.span import INVALID_SPAN_ID, INVALID_TRACE_ID
+
+from airflow.configuration import conf
+from airflow.traces import (
+TRACEPARENT,
+TRACESTATE,
+)
+from airflow.traces.utils import (
+gen_dag_span_id,
+gen_span_id,
+gen_trace_id,
+parse_traceparent,
+parse_tracestate,
+)
+from airflow.utils.dates import datetime_to_nano
+from airflow.utils.net import get_hostname
+
+log = logging.getLogger(__name__)
+
+_NEXT_ID = create_key("next_id")
+
+
+class OtelTrace:
+"""
+Handle all tracing requirements such as getting the tracer, and starting a 
new span.
+
+When OTEL is enabled, the Trace class will be replaced by this class.
+"""
+
+def __init__(self, span_exporter: ConsoleSpanExporter | OTLPSpanExporter, 
tag_string: str | None = None):
+self.span_exporter = span_exporter
+self.span_processor = BatchSpanProcessor(self.span_exporter)
+self.tag_string = tag_string
+self.otel_service = conf.get("traces", "otel_service")
+
+def get_tracer(
+self, component: str, trace_id: int | None = None, span_id: int | None 
= None
+) -> OpenTelemetryTracer | Tracer:
+"""Tracer that will use special AirflowOtelIdGenerator to control 
producing certain span and trace id."""
+resource = Resource(attributes={HOST_NAME: get_hostname(), 
SERVICE_NAME: self.otel_service})
+if trace_id or span_id:
+# in case where trace_id or span_id was given
+tracer_provider = TracerProvider(
+resource=resource, 
id_generator=AirflowOtelIdGenerator(span_id=span_id, trace_id=trace_id)
+)
+else:
+tracer_provider = TracerProvider(resource=resource)
+tracer_provider.add_span_processor(self.span_processor)
+tracer = tracer_provider.get_tracer(component)
+"""
+Tracer will produce a single ID value if value is provided. Note that 
this is one-time only, so any
+subsequent call will produce the normal random ids.
+"""
+return tracer
+
+def get_current_span(self):
+return trace.get_current_span()
+
+def use_span(self, span: Span):
+return trace.use_span(span=span)
+
+def start_span(
+self,
+span_name: str,
+component: str | None = None,
+parent_sc: SpanContext | None = None,
+span_id=None,
+links=None,
+start_time=None,
+):
+"""Start a span; if service_name is not given, otel_service is used."""
+if component is None:
+component = self.otel_service
+
+trace_id = self.get_current_span().get_span_context().trace_id
+tracer = self.get_tracer(component=component, trace_id=trace_id, 
span_id=span_id)
+
+attributes = parse_tracestate(self.tag_string) if self.tag_string else 
{}
+
+if links is not None:
+_links = gen_links_from_kv_list(links)
+else:
+_links = []
+
+if start_time is not None:
+start_time = datetime_to_nano(start_time)
+
+if parent_sc is not None:
+ctx = trace.set_span_in_context(NonRecordingSpan(parent_sc))
+span = tracer.start_as_current_span(
+span_name, context=ctx, attributes=attributes, links=_links, 
start_time=start_time
+  

Re: [PR] [AIP-49] OpenTelemetry Traces for Apache Airflow [airflow]

2024-03-15 Thread via GitHub


howardyoo commented on code in PR #37948:
URL: https://github.com/apache/airflow/pull/37948#discussion_r1526937064


##
airflow/traces/utils.py:
##
@@ -0,0 +1,98 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import logging
+from typing import TYPE_CHECKING
+
+from airflow.utils.hashlib_wrapper import md5
+
+if TYPE_CHECKING:
+from airflow.models import DagRun, TaskInstance
+from airflow.models.taskinstancekey import TaskInstanceKey
+
+TRACE_ID = 0
+SPAN_ID = 16
+
+log = logging.getLogger(__name__)
+
+
+def _gen_id(seeds: list[str], as_int: bool = False, type: int = TRACE_ID) -> 
str | int:
+seed_str = "_".join(seeds).encode("utf-8")
+hash_hex = md5(seed_str).hexdigest()[type:]
+return int(hash_hex, 16) if as_int else hash_hex
+
+
+def gen_trace_id(dag_run: DagRun, as_int: bool = False) -> str | int:
+"""Generate trace id from DagRun."""
+return _gen_id([dag_run.dag_id, dag_run.run_id, 
str(dag_run.start_date.timestamp())], as_int)
+
+
+def gen_span_id_from_ti_key(ti_key: TaskInstanceKey, as_int: bool = False) -> 
str | int:
+"""Generate span id from TI key."""
+return _gen_id([ti_key.dag_id, ti_key.run_id, ti_key.task_id, 
str(ti_key.try_number)], as_int, SPAN_ID)
+
+
+def gen_dag_span_id(dag_run: DagRun, as_int: bool = False) -> str | int:
+"""Generate dag's root span id using dag_run."""
+return _gen_id([dag_run.dag_id, dag_run.run_id, 
str(dag_run.start_date.timestamp())], as_int, SPAN_ID)
+
+
+def gen_span_id(ti: TaskInstance, as_int: bool = False) -> str | int:
+"""Generate span id from the task instance."""
+dag_run = ti.dag_run
+"""When this is called, the try_number of ti is already set to next(+1), 
hence the subtraction"""
+return _gen_id([dag_run.dag_id, dag_run.run_id, ti.task_id, 
str(ti.try_number - 1)], as_int, SPAN_ID)
+
+
+def parse_traceparent(traceparent_str: str | None = None) -> dict:
+"""Parse traceparent string: 
00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01."""
+if traceparent_str is None:
+return {}
+tokens = traceparent_str.split("-")
+if len(tokens) != 4:
+raise ValueError("The traceparent string does not have the correct 
format.")
+return {"version": tokens[0], "trace_id": tokens[1], "parent_id": 
tokens[2], "flags": tokens[3]}
+
+
+def parse_tracestate(tracestate_str: str | None = None) -> dict:
+"""Parse tracestate string: rojo=00f067aa0ba902b7,congo=t61rcWkgMzE."""
+if tracestate_str is None:
+return {}
+tokens = tracestate_str.split(",")
+result = {}
+for pair in tokens:
+key, value = pair.split("=")
+result[key.strip()] = value.strip()
+return result
+
+
+def is_valid_trace_id(trace_id: str) -> bool:
+"""Check whether trace id is valid."""
+if trace_id is not None and len(trace_id) == 34 and int(trace_id, 16) != 0:
+return True
+else:
+return False

Review Comment:
   yeah I think wrapping  it in () would also work, but I guess my version 
might be more clear.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [AIP-49] OpenTelemetry Traces for Apache Airflow [airflow]

2024-03-15 Thread via GitHub


ferruzzi commented on code in PR #37948:
URL: https://github.com/apache/airflow/pull/37948#discussion_r1526570614


##
airflow/traces/utils.py:
##
@@ -0,0 +1,98 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import logging
+from typing import TYPE_CHECKING
+
+from airflow.utils.hashlib_wrapper import md5
+
+if TYPE_CHECKING:
+from airflow.models import DagRun, TaskInstance
+from airflow.models.taskinstancekey import TaskInstanceKey
+
+TRACE_ID = 0
+SPAN_ID = 16
+
+log = logging.getLogger(__name__)
+
+
+def _gen_id(seeds: list[str], as_int: bool = False, type: int = TRACE_ID) -> 
str | int:
+seed_str = "_".join(seeds).encode("utf-8")
+hash_hex = md5(seed_str).hexdigest()[type:]
+return int(hash_hex, 16) if as_int else hash_hex
+
+
+def gen_trace_id(dag_run: DagRun, as_int: bool = False) -> str | int:
+"""Generate trace id from DagRun."""
+return _gen_id([dag_run.dag_id, dag_run.run_id, 
str(dag_run.start_date.timestamp())], as_int)
+
+
+def gen_span_id_from_ti_key(ti_key: TaskInstanceKey, as_int: bool = False) -> 
str | int:
+"""Generate span id from TI key."""
+return _gen_id([ti_key.dag_id, ti_key.run_id, ti_key.task_id, 
str(ti_key.try_number)], as_int, SPAN_ID)
+
+
+def gen_dag_span_id(dag_run: DagRun, as_int: bool = False) -> str | int:
+"""Generate dag's root span id using dag_run."""
+return _gen_id([dag_run.dag_id, dag_run.run_id, 
str(dag_run.start_date.timestamp())], as_int, SPAN_ID)
+
+
+def gen_span_id(ti: TaskInstance, as_int: bool = False) -> str | int:
+"""Generate span id from the task instance."""
+dag_run = ti.dag_run
+"""When this is called, the try_number of ti is already set to next(+1), 
hence the subtraction"""
+return _gen_id([dag_run.dag_id, dag_run.run_id, ti.task_id, 
str(ti.try_number - 1)], as_int, SPAN_ID)
+
+
+def parse_traceparent(traceparent_str: str | None = None) -> dict:
+"""Parse traceparent string: 
00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01."""
+if traceparent_str is None:
+return {}
+tokens = traceparent_str.split("-")
+if len(tokens) != 4:
+raise ValueError("The traceparent string does not have the correct 
format.")
+return {"version": tokens[0], "trace_id": tokens[1], "parent_id": 
tokens[2], "flags": tokens[3]}
+
+
+def parse_tracestate(tracestate_str: str | None = None) -> dict:
+"""Parse tracestate string: rojo=00f067aa0ba902b7,congo=t61rcWkgMzE."""
+if tracestate_str is None:
+return {}
+tokens = tracestate_str.split(",")
+result = {}
+for pair in tokens:
+key, value = pair.split("=")
+result[key.strip()] = value.strip()
+return result
+
+
+def is_valid_trace_id(trace_id: str) -> bool:
+"""Check whether trace id is valid."""
+if trace_id is not None and len(trace_id) == 34 and int(trace_id, 16) != 0:
+return True
+else:
+return False

Review Comment:
   Huh, interesting. It seems to work fine 
   ```
   In [54]: x = '0x04200'
   
   In [55]: result = x and len(x) < 10 and int(x, 16) != 0
   
   In [56]: result
   Out[56]: True
   
   In [57]: type(result)
   Out[57]: bool
   
   ```
   
   Maybe try wrapping the return statement in (), otherwise yeah, yours will be 
fine.
   
   ```
   return (trace_id and len(trace_id) == 34 and int(trace_id, 16) != 0)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [AIP-49] OpenTelemetry Traces for Apache Airflow [airflow]

2024-03-15 Thread via GitHub


ferruzzi commented on code in PR #37948:
URL: https://github.com/apache/airflow/pull/37948#discussion_r1526555009


##
airflow/traces/otel_tracer.py:
##
@@ -0,0 +1,316 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import logging
+import random
+
+from opentelemetry import trace
+from opentelemetry.context import create_key
+from opentelemetry.exporter.otlp.proto.http.trace_exporter import 
OTLPSpanExporter
+from opentelemetry.sdk.resources import HOST_NAME, SERVICE_NAME, Resource
+from opentelemetry.sdk.trace import Span, Tracer as OpenTelemetryTracer, 
TracerProvider
+from opentelemetry.sdk.trace.export import BatchSpanProcessor, 
ConsoleSpanExporter
+from opentelemetry.sdk.trace.id_generator import IdGenerator
+from opentelemetry.trace import Link, NonRecordingSpan, SpanContext, 
TraceFlags, Tracer
+from opentelemetry.trace.span import INVALID_SPAN_ID, INVALID_TRACE_ID
+
+from airflow.configuration import conf
+from airflow.traces import (
+TRACEPARENT,
+TRACESTATE,
+)
+from airflow.traces.utils import (
+gen_dag_span_id,
+gen_span_id,
+gen_trace_id,
+parse_traceparent,
+parse_tracestate,
+)
+from airflow.utils.dates import datetime_to_nano
+from airflow.utils.net import get_hostname
+
+log = logging.getLogger(__name__)
+
+_NEXT_ID = create_key("next_id")
+
+
+class OtelTrace:
+"""
+Handle all tracing requirements such as getting the tracer, and starting a 
new span.
+
+When OTEL is enabled, the Trace class will be replaced by this class.
+"""
+
+def __init__(self, span_exporter: ConsoleSpanExporter | OTLPSpanExporter, 
tag_string: str | None = None):
+self.span_exporter = span_exporter
+self.span_processor = BatchSpanProcessor(self.span_exporter)
+self.tag_string = tag_string
+self.otel_service = conf.get("traces", "otel_service")
+
+def get_tracer(
+self, component: str, trace_id: int | None = None, span_id: int | None 
= None
+) -> OpenTelemetryTracer | Tracer:
+"""Tracer that will use special AirflowOtelIdGenerator to control 
producing certain span and trace id."""
+resource = Resource(attributes={HOST_NAME: get_hostname(), 
SERVICE_NAME: self.otel_service})
+if trace_id or span_id:
+# in case where trace_id or span_id was given
+tracer_provider = TracerProvider(
+resource=resource, 
id_generator=AirflowOtelIdGenerator(span_id=span_id, trace_id=trace_id)
+)
+else:
+tracer_provider = TracerProvider(resource=resource)
+tracer_provider.add_span_processor(self.span_processor)
+tracer = tracer_provider.get_tracer(component)
+"""
+Tracer will produce a single ID value if value is provided. Note that 
this is one-time only, so any
+subsequent call will produce the normal random ids.
+"""
+return tracer
+
+def get_current_span(self):
+return trace.get_current_span()
+
+def use_span(self, span: Span):
+return trace.use_span(span=span)
+
+def start_span(
+self,
+span_name: str,
+component: str | None = None,
+parent_sc: SpanContext | None = None,
+span_id=None,
+links=None,
+start_time=None,
+):
+"""Start a span; if service_name is not given, otel_service is used."""
+if component is None:
+component = self.otel_service
+
+trace_id = self.get_current_span().get_span_context().trace_id
+tracer = self.get_tracer(component=component, trace_id=trace_id, 
span_id=span_id)
+
+attributes = parse_tracestate(self.tag_string) if self.tag_string else 
{}
+
+if links is not None:
+_links = gen_links_from_kv_list(links)
+else:
+_links = []
+
+if start_time is not None:
+start_time = datetime_to_nano(start_time)
+
+if parent_sc is not None:
+ctx = trace.set_span_in_context(NonRecordingSpan(parent_sc))
+span = tracer.start_as_current_span(
+span_name, context=ctx, attributes=attributes, links=_links, 
start_time=start_time
+   

Re: [PR] [AIP-49] OpenTelemetry Traces for Apache Airflow [airflow]

2024-03-14 Thread via GitHub


howardyoo commented on code in PR #37948:
URL: https://github.com/apache/airflow/pull/37948#discussion_r1525761290


##
airflow/traces/utils.py:
##
@@ -0,0 +1,98 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import logging
+from typing import TYPE_CHECKING
+
+from airflow.utils.hashlib_wrapper import md5
+
+if TYPE_CHECKING:
+from airflow.models import DagRun, TaskInstance
+from airflow.models.taskinstancekey import TaskInstanceKey
+
+TRACE_ID = 0
+SPAN_ID = 16
+
+log = logging.getLogger(__name__)
+
+
+def _gen_id(seeds: list[str], as_int: bool = False, type: int = TRACE_ID) -> 
str | int:
+seed_str = "_".join(seeds).encode("utf-8")
+hash_hex = md5(seed_str).hexdigest()[type:]
+return int(hash_hex, 16) if as_int else hash_hex
+
+
+def gen_trace_id(dag_run: DagRun, as_int: bool = False) -> str | int:
+"""Generate trace id from DagRun."""
+return _gen_id([dag_run.dag_id, dag_run.run_id, 
str(dag_run.start_date.timestamp())], as_int)
+
+
+def gen_span_id_from_ti_key(ti_key: TaskInstanceKey, as_int: bool = False) -> 
str | int:
+"""Generate span id from TI key."""
+return _gen_id([ti_key.dag_id, ti_key.run_id, ti_key.task_id, 
str(ti_key.try_number)], as_int, SPAN_ID)
+
+
+def gen_dag_span_id(dag_run: DagRun, as_int: bool = False) -> str | int:
+"""Generate dag's root span id using dag_run."""
+return _gen_id([dag_run.dag_id, dag_run.run_id, 
str(dag_run.start_date.timestamp())], as_int, SPAN_ID)
+
+
+def gen_span_id(ti: TaskInstance, as_int: bool = False) -> str | int:
+"""Generate span id from the task instance."""
+dag_run = ti.dag_run
+"""When this is called, the try_number of ti is already set to next(+1), 
hence the subtraction"""
+return _gen_id([dag_run.dag_id, dag_run.run_id, ti.task_id, 
str(ti.try_number - 1)], as_int, SPAN_ID)
+
+
+def parse_traceparent(traceparent_str: str | None = None) -> dict:
+"""Parse traceparent string: 
00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01."""
+if traceparent_str is None:
+return {}
+tokens = traceparent_str.split("-")
+if len(tokens) != 4:
+raise ValueError("The traceparent string does not have the correct 
format.")
+return {"version": tokens[0], "trace_id": tokens[1], "parent_id": 
tokens[2], "flags": tokens[3]}
+
+
+def parse_tracestate(tracestate_str: str | None = None) -> dict:
+"""Parse tracestate string: rojo=00f067aa0ba902b7,congo=t61rcWkgMzE."""
+if tracestate_str is None:
+return {}
+tokens = tracestate_str.split(",")
+result = {}
+for pair in tokens:
+key, value = pair.split("=")
+result[key.strip()] = value.strip()
+return result
+
+
+def is_valid_trace_id(trace_id: str) -> bool:
+"""Check whether trace id is valid."""
+if trace_id is not None and len(trace_id) == 34 and int(trace_id, 16) != 0:
+return True
+else:
+return False

Review Comment:
   So, converting the source code per your suggestion actually causes the 
following errors during static checks:
   ```
   airflow/traces/utils.py:87: error: Incompatible return value type (got
   "Union[str, bool]", expected "bool")  [return-value]
   return trace_id and len(trace_id) == 34 and int(trace_id, 16) != 0
  ^~~
   airflow/traces/utils.py:92: error: Incompatible return value type (got
   "Union[str, bool]", expected "bool")  [return-value]
   return span_id and len(span_id) == 18 and int(span_id, 16) != 0
  ^~~~
   Found 2 errors in 1 file (checked 1151 source files)
   
   ```
   Not sure, but perhaps the following would work?:
   ```
   def is_valid_trace_id(trace_id: str) -> bool:
   """Check whether trace id is valid."""
   return trace_id is not None and len(trace_id) == 34 and int(trace_id, 
16) != 0
   
   
   def is_valid_span_id(span_id: str) -> bool:
   """Check whether span id is valid."""
   return span_id is not None and len(span_id) == 18 and int(span_id, 16) 
!= 0
   ```



-- 
This is an automated message from the Apache Git Service.

Re: [PR] [AIP-49] OpenTelemetry Traces for Apache Airflow [airflow]

2024-03-14 Thread via GitHub


howardyoo commented on code in PR #37948:
URL: https://github.com/apache/airflow/pull/37948#discussion_r1525761290


##
airflow/traces/utils.py:
##
@@ -0,0 +1,98 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import logging
+from typing import TYPE_CHECKING
+
+from airflow.utils.hashlib_wrapper import md5
+
+if TYPE_CHECKING:
+from airflow.models import DagRun, TaskInstance
+from airflow.models.taskinstancekey import TaskInstanceKey
+
+TRACE_ID = 0
+SPAN_ID = 16
+
+log = logging.getLogger(__name__)
+
+
+def _gen_id(seeds: list[str], as_int: bool = False, type: int = TRACE_ID) -> 
str | int:
+seed_str = "_".join(seeds).encode("utf-8")
+hash_hex = md5(seed_str).hexdigest()[type:]
+return int(hash_hex, 16) if as_int else hash_hex
+
+
+def gen_trace_id(dag_run: DagRun, as_int: bool = False) -> str | int:
+"""Generate trace id from DagRun."""
+return _gen_id([dag_run.dag_id, dag_run.run_id, 
str(dag_run.start_date.timestamp())], as_int)
+
+
+def gen_span_id_from_ti_key(ti_key: TaskInstanceKey, as_int: bool = False) -> 
str | int:
+"""Generate span id from TI key."""
+return _gen_id([ti_key.dag_id, ti_key.run_id, ti_key.task_id, 
str(ti_key.try_number)], as_int, SPAN_ID)
+
+
+def gen_dag_span_id(dag_run: DagRun, as_int: bool = False) -> str | int:
+"""Generate dag's root span id using dag_run."""
+return _gen_id([dag_run.dag_id, dag_run.run_id, 
str(dag_run.start_date.timestamp())], as_int, SPAN_ID)
+
+
+def gen_span_id(ti: TaskInstance, as_int: bool = False) -> str | int:
+"""Generate span id from the task instance."""
+dag_run = ti.dag_run
+"""When this is called, the try_number of ti is already set to next(+1), 
hence the subtraction"""
+return _gen_id([dag_run.dag_id, dag_run.run_id, ti.task_id, 
str(ti.try_number - 1)], as_int, SPAN_ID)
+
+
+def parse_traceparent(traceparent_str: str | None = None) -> dict:
+"""Parse traceparent string: 
00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01."""
+if traceparent_str is None:
+return {}
+tokens = traceparent_str.split("-")
+if len(tokens) != 4:
+raise ValueError("The traceparent string does not have the correct 
format.")
+return {"version": tokens[0], "trace_id": tokens[1], "parent_id": 
tokens[2], "flags": tokens[3]}
+
+
+def parse_tracestate(tracestate_str: str | None = None) -> dict:
+"""Parse tracestate string: rojo=00f067aa0ba902b7,congo=t61rcWkgMzE."""
+if tracestate_str is None:
+return {}
+tokens = tracestate_str.split(",")
+result = {}
+for pair in tokens:
+key, value = pair.split("=")
+result[key.strip()] = value.strip()
+return result
+
+
+def is_valid_trace_id(trace_id: str) -> bool:
+"""Check whether trace id is valid."""
+if trace_id is not None and len(trace_id) == 34 and int(trace_id, 16) != 0:
+return True
+else:
+return False

Review Comment:
   So, converting the source code per your suggestion actually causes the 
following errors during static checks:
   ```
   airflow/traces/utils.py:87: error: Incompatible return value type (got
   "Union[str, bool]", expected "bool")  [return-value]
   return trace_id and len(trace_id) == 34 and int(trace_id, 16) != 0
  ^~~
   airflow/traces/utils.py:92: error: Incompatible return value type (got
   "Union[str, bool]", expected "bool")  [return-value]
   return span_id and len(span_id) == 18 and int(span_id, 16) != 0
  ^~~~
   Found 2 errors in 1 file (checked 1151 source files)
   
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [AIP-49] OpenTelemetry Traces for Apache Airflow [airflow]

2024-03-14 Thread via GitHub


howardyoo commented on code in PR #37948:
URL: https://github.com/apache/airflow/pull/37948#discussion_r1525754290


##
airflow/traces/utils.py:
##
@@ -0,0 +1,98 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import logging
+from typing import TYPE_CHECKING
+
+from airflow.utils.hashlib_wrapper import md5
+
+if TYPE_CHECKING:
+from airflow.models import DagRun, TaskInstance
+from airflow.models.taskinstancekey import TaskInstanceKey
+
+TRACE_ID = 0
+SPAN_ID = 16
+
+log = logging.getLogger(__name__)
+
+
+def _gen_id(seeds: list[str], as_int: bool = False, type: int = TRACE_ID) -> 
str | int:
+seed_str = "_".join(seeds).encode("utf-8")
+hash_hex = md5(seed_str).hexdigest()[type:]
+return int(hash_hex, 16) if as_int else hash_hex
+
+
+def gen_trace_id(dag_run: DagRun, as_int: bool = False) -> str | int:
+"""Generate trace id from DagRun."""
+return _gen_id([dag_run.dag_id, dag_run.run_id, 
str(dag_run.start_date.timestamp())], as_int)
+
+

Review Comment:
   I think it's valid idea, and definitely looks much more readible.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [AIP-49] OpenTelemetry Traces for Apache Airflow [airflow]

2024-03-14 Thread via GitHub


howardyoo commented on code in PR #37948:
URL: https://github.com/apache/airflow/pull/37948#discussion_r1525753637


##
airflow/traces/otel_tracer.py:
##
@@ -0,0 +1,316 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import logging
+import random
+
+from opentelemetry import trace
+from opentelemetry.context import create_key
+from opentelemetry.exporter.otlp.proto.http.trace_exporter import 
OTLPSpanExporter
+from opentelemetry.sdk.resources import HOST_NAME, SERVICE_NAME, Resource
+from opentelemetry.sdk.trace import Span, Tracer as OpenTelemetryTracer, 
TracerProvider
+from opentelemetry.sdk.trace.export import BatchSpanProcessor, 
ConsoleSpanExporter
+from opentelemetry.sdk.trace.id_generator import IdGenerator
+from opentelemetry.trace import Link, NonRecordingSpan, SpanContext, 
TraceFlags, Tracer
+from opentelemetry.trace.span import INVALID_SPAN_ID, INVALID_TRACE_ID
+
+from airflow.configuration import conf
+from airflow.traces import (
+TRACEPARENT,
+TRACESTATE,
+)
+from airflow.traces.utils import (
+gen_dag_span_id,
+gen_span_id,
+gen_trace_id,
+parse_traceparent,
+parse_tracestate,
+)
+from airflow.utils.dates import datetime_to_nano
+from airflow.utils.net import get_hostname
+
+log = logging.getLogger(__name__)
+
+_NEXT_ID = create_key("next_id")
+
+
+class OtelTrace:
+"""
+Handle all tracing requirements such as getting the tracer, and starting a 
new span.
+
+When OTEL is enabled, the Trace class will be replaced by this class.
+"""
+
+def __init__(self, span_exporter: ConsoleSpanExporter | OTLPSpanExporter, 
tag_string: str | None = None):
+self.span_exporter = span_exporter
+self.span_processor = BatchSpanProcessor(self.span_exporter)
+self.tag_string = tag_string
+self.otel_service = conf.get("traces", "otel_service")
+
+def get_tracer(
+self, component: str, trace_id: int | None = None, span_id: int | None 
= None
+) -> OpenTelemetryTracer | Tracer:
+"""Tracer that will use special AirflowOtelIdGenerator to control 
producing certain span and trace id."""
+resource = Resource(attributes={HOST_NAME: get_hostname(), 
SERVICE_NAME: self.otel_service})
+if trace_id or span_id:
+# in case where trace_id or span_id was given
+tracer_provider = TracerProvider(
+resource=resource, 
id_generator=AirflowOtelIdGenerator(span_id=span_id, trace_id=trace_id)
+)
+else:
+tracer_provider = TracerProvider(resource=resource)
+tracer_provider.add_span_processor(self.span_processor)
+tracer = tracer_provider.get_tracer(component)
+"""
+Tracer will produce a single ID value if value is provided. Note that 
this is one-time only, so any
+subsequent call will produce the normal random ids.
+"""
+return tracer
+
+def get_current_span(self):
+return trace.get_current_span()
+
+def use_span(self, span: Span):
+return trace.use_span(span=span)
+
+def start_span(
+self,
+span_name: str,
+component: str | None = None,
+parent_sc: SpanContext | None = None,
+span_id=None,
+links=None,
+start_time=None,
+):
+"""Start a span; if service_name is not given, otel_service is used."""
+if component is None:
+component = self.otel_service
+
+trace_id = self.get_current_span().get_span_context().trace_id
+tracer = self.get_tracer(component=component, trace_id=trace_id, 
span_id=span_id)
+
+attributes = parse_tracestate(self.tag_string) if self.tag_string else 
{}
+
+if links is not None:
+_links = gen_links_from_kv_list(links)
+else:
+_links = []
+
+if start_time is not None:
+start_time = datetime_to_nano(start_time)
+
+if parent_sc is not None:
+ctx = trace.set_span_in_context(NonRecordingSpan(parent_sc))
+span = tracer.start_as_current_span(
+span_name, context=ctx, attributes=attributes, links=_links, 
start_time=start_time
+  

Re: [PR] [AIP-49] OpenTelemetry Traces for Apache Airflow [airflow]

2024-03-14 Thread via GitHub


ferruzzi commented on code in PR #37948:
URL: https://github.com/apache/airflow/pull/37948#discussion_r1525342981


##
airflow/traces/tracer.py:
##
@@ -0,0 +1,280 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import inspect
+import logging
+import socket
+from typing import TYPE_CHECKING, Any, Callable
+
+from airflow.configuration import conf
+from airflow.typing_compat import Protocol
+
+log = logging.getLogger(__name__)
+
+
+def gen_context(trace_id, span_id):
+"""Generate span context from trace_id and span_id."""
+from airflow.traces.otel_tracer import gen_context as otel_gen_context

Review Comment:
   Yeah, something is definitely up, I've already seen a commit where you moved 
these to the top import list.  Going to skip looking ta this file as well for 
now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [AIP-49] OpenTelemetry Traces for Apache Airflow [airflow]

2024-03-14 Thread via GitHub


ferruzzi commented on code in PR #37948:
URL: https://github.com/apache/airflow/pull/37948#discussion_r1525319386


##
airflow/traces/otel_tracer.py:
##
@@ -0,0 +1,316 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import logging
+import random
+
+from opentelemetry import trace
+from opentelemetry.context import create_key
+from opentelemetry.exporter.otlp.proto.http.trace_exporter import 
OTLPSpanExporter
+from opentelemetry.sdk.resources import HOST_NAME, SERVICE_NAME, Resource
+from opentelemetry.sdk.trace import Span, Tracer as OpenTelemetryTracer, 
TracerProvider
+from opentelemetry.sdk.trace.export import BatchSpanProcessor, 
ConsoleSpanExporter
+from opentelemetry.sdk.trace.id_generator import IdGenerator
+from opentelemetry.trace import Link, NonRecordingSpan, SpanContext, 
TraceFlags, Tracer
+from opentelemetry.trace.span import INVALID_SPAN_ID, INVALID_TRACE_ID
+
+from airflow.configuration import conf
+from airflow.traces import (
+TRACEPARENT,
+TRACESTATE,
+)
+from airflow.traces.utils import (
+gen_dag_span_id,
+gen_span_id,
+gen_trace_id,
+parse_traceparent,
+parse_tracestate,
+)
+from airflow.utils.dates import datetime_to_nano
+from airflow.utils.net import get_hostname
+
+log = logging.getLogger(__name__)
+
+_NEXT_ID = create_key("next_id")
+
+
+class OtelTrace:
+"""
+Handle all tracing requirements such as getting the tracer, and starting a 
new span.
+
+When OTEL is enabled, the Trace class will be replaced by this class.
+"""
+
+def __init__(self, span_exporter: ConsoleSpanExporter | OTLPSpanExporter, 
tag_string: str | None = None):
+self.span_exporter = span_exporter
+self.span_processor = BatchSpanProcessor(self.span_exporter)
+self.tag_string = tag_string
+self.otel_service = conf.get("traces", "otel_service")
+
+def get_tracer(
+self, component: str, trace_id: int | None = None, span_id: int | None 
= None
+) -> OpenTelemetryTracer | Tracer:
+"""Tracer that will use special AirflowOtelIdGenerator to control 
producing certain span and trace id."""
+resource = Resource(attributes={HOST_NAME: get_hostname(), 
SERVICE_NAME: self.otel_service})
+if trace_id or span_id:
+# in case where trace_id or span_id was given
+tracer_provider = TracerProvider(
+resource=resource, 
id_generator=AirflowOtelIdGenerator(span_id=span_id, trace_id=trace_id)
+)
+else:
+tracer_provider = TracerProvider(resource=resource)
+tracer_provider.add_span_processor(self.span_processor)
+tracer = tracer_provider.get_tracer(component)
+"""
+Tracer will produce a single ID value if value is provided. Note that 
this is one-time only, so any
+subsequent call will produce the normal random ids.
+"""
+return tracer
+
+def get_current_span(self):
+return trace.get_current_span()
+
+def use_span(self, span: Span):
+return trace.use_span(span=span)
+
+def start_span(
+self,
+span_name: str,
+component: str | None = None,
+parent_sc: SpanContext | None = None,
+span_id=None,
+links=None,
+start_time=None,
+):
+"""Start a span; if service_name is not given, otel_service is used."""
+if component is None:
+component = self.otel_service
+
+trace_id = self.get_current_span().get_span_context().trace_id
+tracer = self.get_tracer(component=component, trace_id=trace_id, 
span_id=span_id)
+
+attributes = parse_tracestate(self.tag_string) if self.tag_string else 
{}
+
+if links is not None:
+_links = gen_links_from_kv_list(links)
+else:
+_links = []
+
+if start_time is not None:
+start_time = datetime_to_nano(start_time)
+
+if parent_sc is not None:
+ctx = trace.set_span_in_context(NonRecordingSpan(parent_sc))
+span = tracer.start_as_current_span(
+span_name, context=ctx, attributes=attributes, links=_links, 
start_time=start_time
+   

Re: [PR] [AIP-49] OpenTelemetry Traces for Apache Airflow [airflow]

2024-03-14 Thread via GitHub


potiuk commented on PR #37948:
URL: https://github.com/apache/airflow/pull/37948#issuecomment-1998090772

   Actually `breeze static-checks --only-my-changes` should run WAY faster and 
do 9X% up to 100% of the job.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [AIP-49] OpenTelemetry Traces for Apache Airflow [airflow]

2024-03-14 Thread via GitHub


ferruzzi commented on PR #37948:
URL: https://github.com/apache/airflow/pull/37948#issuecomment-1997956996

   We need to get these static checks passing and it looks like it's an issue 
that breeze should be able to autofix.   When you get time, please run `breeze 
static-checks --all-files`.  It will fail but should automatically fix the 
issue, then run it again and see if it passes.  If it fails the second time, 
then it's something you'll need to fix manually.
   
   
   Note, each run is going to take a while... depending on your computer, 
something like 20 minutes, so maybe run it while you are away from your desk or 
in a meeting or something.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [AIP-49] OpenTelemetry Traces for Apache Airflow [airflow]

2024-03-14 Thread via GitHub


ferruzzi commented on code in PR #37948:
URL: https://github.com/apache/airflow/pull/37948#discussion_r1525240559


##
airflow/traces/utils.py:
##
@@ -18,52 +18,66 @@
 from __future__ import annotations
 
 import logging
+from typing import TYPE_CHECKING
 
 from airflow.utils.hashlib_wrapper import md5
 
+if TYPE_CHECKING:
+from airflow.models import DagRun, TaskInstance
+from airflow.models.taskinstancekey import TaskInstanceKey
+
 log = logging.getLogger(__name__)
 
 

Review Comment:
   Yup, I was initially thinking take an int "size" but then thought if there 
are only two options (16 or 32) then that sounds like a bool.  Either way would 
work. :+1: 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [AIP-49] OpenTelemetry Traces for Apache Airflow [airflow]

2024-03-14 Thread via GitHub


ferruzzi commented on code in PR #37948:
URL: https://github.com/apache/airflow/pull/37948#discussion_r1525237761


##
airflow/traces/otel_tracer.py:
##
@@ -143,25 +135,24 @@ def start_span_from_dagrun(
 """Produce a span from dag run."""
 # check if dagrun has configs
 conf = dagrun.conf
-trace_id = int(gen_trace_id(dag_run=dagrun), 16)
-span_id = int(gen_dag_span_id(dag_run=dagrun), 16)
+trace_id = int(gen_trace_id(dag_run=dagrun, as_int=True))
+span_id = int(gen_dag_span_id(dag_run=dagrun, as_int=True))
 
 if conf is not None:
 traceparent = conf.get(TRACEPARENT)
 tracestate = conf.get(TRACESTATE)
 
-tracer = self.get_tracer_with_id(component=component, span_id=span_id, 
trace_id=trace_id)
+tracer = self.get_tracer(component=component, span_id=span_id, 
trace_id=trace_id)
 
-kvstr = None
+tag_string = None
 # merge attributes from tags and tracestate
-if self.tags is not None:
-kvstr = self.tags
+if self.tag_string is not None:
+tag_string = self.tag_string

Review Comment:
   It happens on occasion, don't sound so surprised :P 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [AIP-49] OpenTelemetry Traces for Apache Airflow [airflow]

2024-03-14 Thread via GitHub


ferruzzi commented on code in PR #37948:
URL: https://github.com/apache/airflow/pull/37948#discussion_r1525236936


##
airflow/traces/otel_tracer.py:
##
@@ -104,36 +98,34 @@ def start_span(
 links=None,
 start_time=None,
 ):
-"""Start a span. if service_name is not given, otel_service is used."""
+"""Start a span; if service_name is not given, otel_service is used."""
 if component is None:
 component = self.otel_service
 
 trace_id = self.get_current_span().get_span_context().trace_id
 if span_id is not None:
-tracer = self.get_tracer_with_id(component=component, 
trace_id=trace_id, span_id=span_id)
+tracer = self.get_tracer(component=component, trace_id=trace_id, 
span_id=span_id)
 else:
 tracer = self.get_tracer(component)
 
-kvs = {}
-if self.tags is not None:
-kvs = parse_tracestate(self.tags)
+attributes = parse_tracestate(self.tag_string) if self.tag_string else 
{}
 
 if links is not None:
 _links = gen_links_from_kv_list(links)
 else:
 _links = []
 
 if start_time is not None:
-start_time = int(start_time.timestamp() * 10)
+start_time = datetime_to_nano(start_time)
 
 if parent_sc is not None:
 ctx = trace.set_span_in_context(NonRecordingSpan(parent_sc))
 span = tracer.start_as_current_span(
-span_name, context=ctx, attributes=kvs, links=_links, 
start_time=start_time
+span_name, context=ctx, attributes=attributes, links=_links, 
start_time=start_time
 )
 else:
 span = tracer.start_as_current_span(
-span_name, attributes=kvs, links=_links, start_time=start_time
+span_name, attributes=attributes, links=_links, 
start_time=start_time
 )

Review Comment:
   Ah, That's what I was afraid of.  Thanks for trying it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [AIP-49] OpenTelemetry Traces for Apache Airflow [airflow]

2024-03-14 Thread via GitHub


howardyoo commented on code in PR #37948:
URL: https://github.com/apache/airflow/pull/37948#discussion_r1524876807


##
airflow/jobs/scheduler_job_runner.py:
##
@@ -1391,11 +1392,11 @@ def _start_queued_dagruns(self, session: Session) -> 
None:
 
 @span
 def _update_state(dag: DAG, dag_run: DagRun):

Review Comment:
   I think so!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [AIP-49] OpenTelemetry Traces for Apache Airflow [airflow]

2024-03-14 Thread via GitHub


howardyoo commented on code in PR #37948:
URL: https://github.com/apache/airflow/pull/37948#discussion_r1524874339


##
airflow/traces/otel_tracer.py:
##
@@ -104,36 +98,34 @@ def start_span(
 links=None,
 start_time=None,
 ):
-"""Start a span. if service_name is not given, otel_service is used."""
+"""Start a span; if service_name is not given, otel_service is used."""
 if component is None:
 component = self.otel_service
 
 trace_id = self.get_current_span().get_span_context().trace_id
 if span_id is not None:
-tracer = self.get_tracer_with_id(component=component, 
trace_id=trace_id, span_id=span_id)
+tracer = self.get_tracer(component=component, trace_id=trace_id, 
span_id=span_id)
 else:
 tracer = self.get_tracer(component)

Review Comment:
   Yup, that's right! Will simplify the code



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [AIP-49] OpenTelemetry Traces for Apache Airflow [airflow]

2024-03-14 Thread via GitHub


howardyoo commented on code in PR #37948:
URL: https://github.com/apache/airflow/pull/37948#discussion_r1524781847


##
airflow/traces/utils.py:
##
@@ -18,52 +18,66 @@
 from __future__ import annotations
 
 import logging
+from typing import TYPE_CHECKING
 
 from airflow.utils.hashlib_wrapper import md5
 
+if TYPE_CHECKING:
+from airflow.models import DagRun, TaskInstance
+from airflow.models.taskinstancekey import TaskInstanceKey
+
 log = logging.getLogger(__name__)
 
 

Review Comment:
   I guess by feeding the seed into the gen_id function, that could be done - 
we could even have `size` instead of `shorten` so I could simply feed in 16 as 
an input. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [AIP-49] OpenTelemetry Traces for Apache Airflow [airflow]

2024-03-14 Thread via GitHub


howardyoo commented on code in PR #37948:
URL: https://github.com/apache/airflow/pull/37948#discussion_r1524768949


##
airflow/traces/tracer.py:
##
@@ -0,0 +1,256 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import logging
+import socket
+import typing
+from typing import (
+TYPE_CHECKING,
+Callable,
+)
+
+from airflow.configuration import conf
+from airflow.typing_compat import Protocol
+
+log = logging.getLogger(__name__)
+
+
+def gen_context(trace_id, span_id):
+"""Generate span context from trace_id and span_id."""
+from airflow.traces.otel_tracer import gen_context as otel_gen_context
+
+return otel_gen_context(trace_id, span_id)
+
+
+def gen_links_from_kv_list(list):
+"""Generate links from kv list of {trace_id:int, span_id:int}."""
+from airflow.traces.otel_tracer import gen_links_from_kv_list
+
+return gen_links_from_kv_list(list)
+
+
+def span(func):
+"""Decorate a function with span."""
+
+def wrapper(*args, **kwargs):
+func_name = func.__name__
+qual_name = func.__qualname__
+module_name = func.__module__
+if "." in qual_name:
+component = f"{qual_name.rsplit('.', 1)[0]}"
+else:
+component = module_name
+with Trace.start_span(span_name=func_name, component=component):
+return func(*args, **kwargs)
+
+return wrapper
+
+
+class DummyContext:
+"""If no Tracer is configured, DummyContext is used as a fallback."""
+
+def __init__(self):
+self.trace_id = 1
+
+
+class DummySpan:
+"""If no Tracer is configured, DummySpan is used as a fallback."""
+
+def __enter__(self):
+"""Enter."""
+return self
+
+def __exit__(self, *args, **kwargs):
+"""Exit."""
+pass
+
+def __call__(self, obj):
+"""Call."""
+return obj
+
+def get_span_context(self):
+"""Get span context."""
+return DUMMY_CTX
+
+def set_attribute(self, key, value) -> None:
+"""Set an attribute to the span."""
+pass
+
+def set_attributes(self, attributes) -> None:
+"""Set multiple attributes at once."""
+pass
+
+def add_event(
+self,
+name: str,
+attributes=None,

Review Comment:
   I guess the problem with the typing of `attribute` is more so not on the 
tracer itself, but on the `Span` object that gets returned by the tracer. I'd 
say technically, this `attribute` doesn't always have to be a dict of key/value 
(even though Otel version currently does that), so I'd say let's keep it as Any 
for now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [AIP-49] OpenTelemetry Traces for Apache Airflow [airflow]

2024-03-14 Thread via GitHub


howardyoo commented on code in PR #37948:
URL: https://github.com/apache/airflow/pull/37948#discussion_r1524758585


##
airflow/traces/tracer.py:
##
@@ -0,0 +1,256 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import logging
+import socket
+import typing
+from typing import (
+TYPE_CHECKING,
+Callable,
+)
+
+from airflow.configuration import conf
+from airflow.typing_compat import Protocol
+
+log = logging.getLogger(__name__)
+
+
+def gen_context(trace_id, span_id):
+"""Generate span context from trace_id and span_id."""
+from airflow.traces.otel_tracer import gen_context as otel_gen_context
+
+return otel_gen_context(trace_id, span_id)
+
+
+def gen_links_from_kv_list(list):
+"""Generate links from kv list of {trace_id:int, span_id:int}."""
+from airflow.traces.otel_tracer import gen_links_from_kv_list
+
+return gen_links_from_kv_list(list)
+
+
+def span(func):
+"""Decorate a function with span."""
+
+def wrapper(*args, **kwargs):
+func_name = func.__name__
+qual_name = func.__qualname__
+module_name = func.__module__
+if "." in qual_name:
+component = f"{qual_name.rsplit('.', 1)[0]}"
+else:
+component = module_name
+with Trace.start_span(span_name=func_name, component=component):
+return func(*args, **kwargs)
+
+return wrapper
+
+
+class DummyContext:
+"""If no Tracer is configured, DummyContext is used as a fallback."""
+
+def __init__(self):
+self.trace_id = 1
+
+
+class DummySpan:
+"""If no Tracer is configured, DummySpan is used as a fallback."""
+
+def __enter__(self):
+"""Enter."""
+return self
+
+def __exit__(self, *args, **kwargs):
+"""Exit."""
+pass
+
+def __call__(self, obj):
+"""Call."""
+return obj
+
+def get_span_context(self):
+"""Get span context."""
+return DUMMY_CTX
+
+def set_attribute(self, key, value) -> None:
+"""Set an attribute to the span."""
+pass
+
+def set_attributes(self, attributes) -> None:
+"""Set multiple attributes at once."""
+pass
+
+def add_event(
+self,
+name: str,
+attributes=None,

Review Comment:
   Could be.
   This pattern was borrowed (or referenced) from how the existing Stats module 
works (otel logger, datadog logger, stats logger, etc), and that sort of patter 
is observed in `class Tracer`. Oh I gotta change that DummyTracer name as well!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [AIP-49] OpenTelemetry Traces for Apache Airflow [airflow]

2024-03-13 Thread via GitHub


howardyoo commented on code in PR #37948:
URL: https://github.com/apache/airflow/pull/37948#discussion_r1524186027


##
airflow/traces/utils.py:
##
@@ -18,52 +18,66 @@
 from __future__ import annotations
 
 import logging
+from typing import TYPE_CHECKING
 
 from airflow.utils.hashlib_wrapper import md5
 
+if TYPE_CHECKING:
+from airflow.models import DagRun, TaskInstance
+from airflow.models.taskinstancekey import TaskInstanceKey
+
 log = logging.getLogger(__name__)
 
 

Review Comment:
   I'll give it a few thoughts!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [AIP-49] OpenTelemetry Traces for Apache Airflow [airflow]

2024-03-13 Thread via GitHub


howardyoo commented on code in PR #37948:
URL: https://github.com/apache/airflow/pull/37948#discussion_r1524183403


##
airflow/traces/otel_tracer.py:
##
@@ -104,36 +98,34 @@ def start_span(
 links=None,
 start_time=None,
 ):
-"""Start a span. if service_name is not given, otel_service is used."""
+"""Start a span; if service_name is not given, otel_service is used."""
 if component is None:
 component = self.otel_service
 
 trace_id = self.get_current_span().get_span_context().trace_id
 if span_id is not None:
-tracer = self.get_tracer_with_id(component=component, 
trace_id=trace_id, span_id=span_id)
+tracer = self.get_tracer(component=component, trace_id=trace_id, 
span_id=span_id)
 else:
 tracer = self.get_tracer(component)
 
-kvs = {}
-if self.tags is not None:
-kvs = parse_tracestate(self.tags)
+attributes = parse_tracestate(self.tag_string) if self.tag_string else 
{}
 
 if links is not None:
 _links = gen_links_from_kv_list(links)
 else:
 _links = []
 
 if start_time is not None:
-start_time = int(start_time.timestamp() * 10)
+start_time = datetime_to_nano(start_time)
 
 if parent_sc is not None:
 ctx = trace.set_span_in_context(NonRecordingSpan(parent_sc))
 span = tracer.start_as_current_span(
-span_name, context=ctx, attributes=kvs, links=_links, 
start_time=start_time
+span_name, context=ctx, attributes=attributes, links=_links, 
start_time=start_time
 )
 else:
 span = tracer.start_as_current_span(
-span_name, attributes=kvs, links=_links, start_time=start_time
+span_name, attributes=attributes, links=_links, 
start_time=start_time
 )

Review Comment:
   I tried setting context = None, and the type checking did not allow this. 
Looked like when you are specifying the context, it cannot be None. :-( 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [AIP-49] OpenTelemetry Traces for Apache Airflow [airflow]

2024-03-13 Thread via GitHub


howardyoo commented on code in PR #37948:
URL: https://github.com/apache/airflow/pull/37948#discussion_r1524176295


##
airflow/traces/otel_tracer.py:
##
@@ -143,25 +135,24 @@ def start_span_from_dagrun(
 """Produce a span from dag run."""
 # check if dagrun has configs
 conf = dagrun.conf
-trace_id = int(gen_trace_id(dag_run=dagrun), 16)
-span_id = int(gen_dag_span_id(dag_run=dagrun), 16)
+trace_id = int(gen_trace_id(dag_run=dagrun, as_int=True))
+span_id = int(gen_dag_span_id(dag_run=dagrun, as_int=True))
 
 if conf is not None:
 traceparent = conf.get(TRACEPARENT)
 tracestate = conf.get(TRACESTATE)
 
-tracer = self.get_tracer_with_id(component=component, span_id=span_id, 
trace_id=trace_id)
+tracer = self.get_tracer(component=component, span_id=span_id, 
trace_id=trace_id)
 
-kvstr = None
+tag_string = None
 # merge attributes from tags and tracestate
-if self.tags is not None:
-kvstr = self.tags
+if self.tag_string is not None:
+tag_string = self.tag_string

Review Comment:
   OMG you're right!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [AIP-49] OpenTelemetry Traces for Apache Airflow [airflow]

2024-03-13 Thread via GitHub


howardyoo commented on code in PR #37948:
URL: https://github.com/apache/airflow/pull/37948#discussion_r1524169050


##
airflow/traces/utils.py:
##
@@ -18,52 +18,66 @@
 from __future__ import annotations
 
 import logging
+from typing import TYPE_CHECKING
 
 from airflow.utils.hashlib_wrapper import md5
 
+if TYPE_CHECKING:
+from airflow.models import DagRun, TaskInstance
+from airflow.models.taskinstancekey import TaskInstanceKey
+
 log = logging.getLogger(__name__)
 
 
-def gen_trace_id(dag_run) -> str:
+def gen_trace_id(dag_run: DagRun, as_int: bool = False) -> str | int:
+"""Generate trace id from DagRun."""
 dag_id = dag_run.dag_id
 run_id = dag_run.run_id
 start_dt = dag_run.start_date
 hash_seed = f"{dag_id}_{run_id}_{start_dt.timestamp()}"
 hash_hex = md5(hash_seed.encode("utf-8")).hexdigest()
+if as_int is True:
+return int(hash_hex, 16)

Review Comment:
   Right. I'd say as_int should be okay, as what the trace_id and span_id needs 
is the 'int' type.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [AIP-49] OpenTelemetry Traces for Apache Airflow [airflow]

2024-03-13 Thread via GitHub


howardyoo commented on code in PR #37948:
URL: https://github.com/apache/airflow/pull/37948#discussion_r1524167075


##
airflow/jobs/job.py:
##
@@ -211,35 +211,29 @@ def heartbeat(
 heartbeat_callback(session)
 self.log.debug("[heartbeat]")
 except OperationalError:
-Stats.incr(convert_camel_to_snake(self.__class__.__name__) + 
"_heartbeat_failure", 1, 1)
+class_name = self.__class__.__name__
+Stats.incr(convert_camel_to_snake(class_name) + 
"_heartbeat_failure", 1, 1)
 if not self.heartbeat_failed:
-self.log.exception("%s heartbeat got an exception", 
self.__class__.__name__)
+msg = f"{class_name} heartbeat got an exception"
+self.log.exception(msg)
 self.heartbeat_failed = True
-s.add_event(
+span.add_event(
 name="error",
-attributes={"message": f"{self.__class__.__name__} 
heartbeat got an exception"},
+attributes={"message": msg},

Review Comment:
   I'd prefer to have a shorter code as much as possible! So, will def. remove 
those dangling commas



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [AIP-49] OpenTelemetry Traces for Apache Airflow [airflow]

2024-03-13 Thread via GitHub


howardyoo commented on code in PR #37948:
URL: https://github.com/apache/airflow/pull/37948#discussion_r1524165158


##
airflow/jobs/job.py:
##
@@ -211,35 +211,29 @@ def heartbeat(
 heartbeat_callback(session)
 self.log.debug("[heartbeat]")
 except OperationalError:
-Stats.incr(convert_camel_to_snake(self.__class__.__name__) + 
"_heartbeat_failure", 1, 1)
+class_name = self.__class__.__name__
+Stats.incr(convert_camel_to_snake(class_name) + 
"_heartbeat_failure", 1, 1)

Review Comment:
   I think I'd prefer the code to be simpler - so will remove those 1,1 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [AIP-49] OpenTelemetry Traces for Apache Airflow [airflow]

2024-03-13 Thread via GitHub


howardyoo commented on code in PR #37948:
URL: https://github.com/apache/airflow/pull/37948#discussion_r1524164314


##
airflow/executors/base_executor.py:
##
@@ -311,12 +313,10 @@ def trigger_tasks(self, open_slots: int) -> None:
 
 @span
 def _process_tasks(self, task_tuples: list[TaskTuple]) -> None:
-from airflow.traces.utils import gen_span_id_from_ti_key, gen_trace_id
-
 for key, command, queue, executor_config in task_tuples:
-qt = self.queued_tasks[key][3]
-trace_id = int(gen_trace_id(qt.dag_run), 16)  # TaskInstance in 
fourth element
-span_id = int(gen_span_id_from_ti_key(key), 16)
+task_instance = self.queued_tasks[key][3]
+trace_id = int(gen_trace_id(task_instance.dag_run, as_int=True))  
# TaskInstance in fourth element

Review Comment:
   Yes, that's correct - will move it to line 317!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [AIP-49] OpenTelemetry Traces for Apache Airflow [airflow]

2024-03-11 Thread via GitHub


ferruzzi commented on code in PR #37948:
URL: https://github.com/apache/airflow/pull/37948#discussion_r1520538400


##
airflow/executors/base_executor.py:
##
@@ -311,12 +313,10 @@ def trigger_tasks(self, open_slots: int) -> None:
 
 @span
 def _process_tasks(self, task_tuples: list[TaskTuple]) -> None:
-from airflow.traces.utils import gen_span_id_from_ti_key, gen_trace_id
-
 for key, command, queue, executor_config in task_tuples:
-qt = self.queued_tasks[key][3]
-trace_id = int(gen_trace_id(qt.dag_run), 16)  # TaskInstance in 
fourth element
-span_id = int(gen_span_id_from_ti_key(key), 16)
+task_instance = self.queued_tasks[key][3]
+trace_id = int(gen_trace_id(task_instance.dag_run, as_int=True))  
# TaskInstance in fourth element

Review Comment:
   I just saw your comment above about MyPy still wanting this to be cast... 
that's annoying, let me think of a better way to et around that.



##
airflow/executors/base_executor.py:
##
@@ -311,12 +313,10 @@ def trigger_tasks(self, open_slots: int) -> None:
 
 @span
 def _process_tasks(self, task_tuples: list[TaskTuple]) -> None:
-from airflow.traces.utils import gen_span_id_from_ti_key, gen_trace_id
-
 for key, command, queue, executor_config in task_tuples:
-qt = self.queued_tasks[key][3]
-trace_id = int(gen_trace_id(qt.dag_run), 16)  # TaskInstance in 
fourth element
-span_id = int(gen_span_id_from_ti_key(key), 16)
+task_instance = self.queued_tasks[key][3]
+trace_id = int(gen_trace_id(task_instance.dag_run, as_int=True))  
# TaskInstance in fourth element

Review Comment:
   I just saw your comment above about MyPy still wanting this to be cast... 
that's annoying, let me think of a better way to get around that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [AIP-49] OpenTelemetry Traces for Apache Airflow [airflow]

2024-03-11 Thread via GitHub


ferruzzi commented on code in PR #37948:
URL: https://github.com/apache/airflow/pull/37948#discussion_r1518281626


##
airflow/traces/otel_tracer.py:
##
@@ -0,0 +1,333 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import logging
+import random
+
+from opentelemetry import trace
+from opentelemetry.context import create_key
+from opentelemetry.exporter.otlp.proto.http.trace_exporter import 
OTLPSpanExporter
+from opentelemetry.sdk.resources import HOST_NAME, SERVICE_NAME, Resource
+from opentelemetry.sdk.trace import Span, Tracer as OpenTelemetryTracer, 
TracerProvider
+from opentelemetry.sdk.trace.export import BatchSpanProcessor, 
ConsoleSpanExporter
+from opentelemetry.sdk.trace.id_generator import IdGenerator
+from opentelemetry.trace import Link, NonRecordingSpan, SpanContext, 
TraceFlags, Tracer
+from opentelemetry.trace.span import INVALID_SPAN_ID, INVALID_TRACE_ID
+
+from airflow.configuration import conf
+from airflow.traces import (
+TRACEPARENT,
+TRACESTATE,
+)
+from airflow.traces.utils import (
+gen_dag_span_id,
+gen_span_id,
+gen_trace_id,
+parse_traceparent,
+parse_tracestate,
+)
+from airflow.utils.net import get_hostname
+
+log = logging.getLogger(__name__)
+
+_NEXT_ID = create_key("next_id")
+
+
+class OtelTrace:
+"""
+OpenTelemetry Tracing Class.
+
+Handles all tracing requirements such as getting the tracer, and starting 
a new span.
+When OTEL is enabled, the Trace class will be replaced by this class.
+"""
+
+def __init__(self, span_exporter: ConsoleSpanExporter | OTLPSpanExporter, 
tags=None):
+self.span_exporter = span_exporter
+self.span_processor = BatchSpanProcessor(self.span_exporter)
+self.tags = tags
+self.otel_service = conf.get("traces", "otel_service")
+
+def get_tracer(self, component: str) -> OpenTelemetryTracer | Tracer:
+"""Get tracer from a given component."""
+resource = Resource(attributes={HOST_NAME: get_hostname(), 
SERVICE_NAME: self.otel_service})
+tracer_provider = TracerProvider(resource=resource)
+# span_processor = BatchSpanProcessor(self.span_exporter)
+tracer_provider.add_span_processor(self.span_processor)
+tracer = tracer_provider.get_tracer(component)
+return tracer
+
+def get_tracer_with_id(
+self, component: str, trace_id: int | None = None, span_id: int | None 
= None
+) -> OpenTelemetryTracer | Tracer:
+"""Tracer that will use special AirflowOtelIdGenerator to control 
producing certain span and trace id."""
+resource = Resource(attributes={HOST_NAME: get_hostname(), 
SERVICE_NAME: self.otel_service})
+tracer_provider = TracerProvider(
+resource=resource, 
id_generator=AirflowOtelIdGenerator(span_id=span_id, trace_id=trace_id)
+)
+# span_processor = BatchSpanProcessor(self.span_exporter, 
schedule_delay_millis=1)
+tracer_provider.add_span_processor(self.span_processor)
+tracer = tracer_provider.get_tracer(component)
+"""
+Tracer will product a Single ID value if value is provided. Note that 
this is one-time only, so any
+subsequent call will produce the normal random ids.
+"""
+return tracer
+
+def get_current_span(self):

Review Comment:
   Your class getters might maybe be a little easier in use if you use the 
@property decorator instead of making them helper methods?  Maybe?
   
   ```
   @property
   def current_span(self):
   return trace.get_current_span()
   ```
   
   If you did this with all your helpers then  
`self.get_current_span().get_span_context().trace_id` turns into 
`self.current_span.span_context.trace_id`
   
   
   Feel free to ignore this on though.  Up to you.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [AIP-49] OpenTelemetry Traces for Apache Airflow [airflow]

2024-03-11 Thread via GitHub


ferruzzi commented on code in PR #37948:
URL: https://github.com/apache/airflow/pull/37948#discussion_r1520536156


##
airflow/traces/otel_tracer.py:
##
@@ -0,0 +1,333 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import logging
+import random
+
+from opentelemetry import trace
+from opentelemetry.context import create_key
+from opentelemetry.exporter.otlp.proto.http.trace_exporter import 
OTLPSpanExporter
+from opentelemetry.sdk.resources import HOST_NAME, SERVICE_NAME, Resource
+from opentelemetry.sdk.trace import Span, Tracer as OpenTelemetryTracer, 
TracerProvider
+from opentelemetry.sdk.trace.export import BatchSpanProcessor, 
ConsoleSpanExporter
+from opentelemetry.sdk.trace.id_generator import IdGenerator
+from opentelemetry.trace import Link, NonRecordingSpan, SpanContext, 
TraceFlags, Tracer
+from opentelemetry.trace.span import INVALID_SPAN_ID, INVALID_TRACE_ID
+
+from airflow.configuration import conf
+from airflow.traces import (
+TRACEPARENT,
+TRACESTATE,
+)
+from airflow.traces.utils import (
+gen_dag_span_id,
+gen_span_id,
+gen_trace_id,
+parse_traceparent,
+parse_tracestate,
+)
+from airflow.utils.net import get_hostname
+
+log = logging.getLogger(__name__)
+
+_NEXT_ID = create_key("next_id")
+
+
+class OtelTrace:
+"""
+OpenTelemetry Tracing Class.
+
+Handles all tracing requirements such as getting the tracer, and starting 
a new span.
+When OTEL is enabled, the Trace class will be replaced by this class.
+"""
+
+def __init__(self, span_exporter: ConsoleSpanExporter | OTLPSpanExporter, 
tags=None):
+self.span_exporter = span_exporter
+self.span_processor = BatchSpanProcessor(self.span_exporter)
+self.tags = tags
+self.otel_service = conf.get("traces", "otel_service")
+
+def get_tracer(self, component: str) -> OpenTelemetryTracer | Tracer:
+"""Get tracer from a given component."""
+resource = Resource(attributes={HOST_NAME: get_hostname(), 
SERVICE_NAME: self.otel_service})
+tracer_provider = TracerProvider(resource=resource)
+# span_processor = BatchSpanProcessor(self.span_exporter)
+tracer_provider.add_span_processor(self.span_processor)
+tracer = tracer_provider.get_tracer(component)
+return tracer
+
+def get_tracer_with_id(
+self, component: str, trace_id: int | None = None, span_id: int | None 
= None
+) -> OpenTelemetryTracer | Tracer:
+"""Tracer that will use special AirflowOtelIdGenerator to control 
producing certain span and trace id."""
+resource = Resource(attributes={HOST_NAME: get_hostname(), 
SERVICE_NAME: self.otel_service})
+tracer_provider = TracerProvider(
+resource=resource, 
id_generator=AirflowOtelIdGenerator(span_id=span_id, trace_id=trace_id)
+)
+# span_processor = BatchSpanProcessor(self.span_exporter, 
schedule_delay_millis=1)
+tracer_provider.add_span_processor(self.span_processor)
+tracer = tracer_provider.get_tracer(component)
+"""
+Tracer will product a Single ID value if value is provided. Note that 
this is one-time only, so any
+subsequent call will produce the normal random ids.
+"""
+return tracer
+
+def get_current_span(self):
+return trace.get_current_span()
+
+def use_span(self, span: Span):
+return trace.use_span(span=span)
+
+def start_span(
+self,
+span_name: str,
+component: str | None = None,
+parent_sc: SpanContext | None = None,
+span_id=None,
+links=None,
+start_time=None,
+):
+"""Start a span. if service_name is not given, otel_service is used."""
+if component is None:
+component = self.otel_service

Review Comment:
   Right, of course, my mistake.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Re: [PR] [AIP-49] OpenTelemetry Traces for Apache Airflow [airflow]

2024-03-11 Thread via GitHub


ferruzzi commented on code in PR #37948:
URL: https://github.com/apache/airflow/pull/37948#discussion_r1518243419


##
airflow/jobs/scheduler_job_runner.py:
##
@@ -1415,70 +1478,101 @@ def _schedule_dag_run(
 :param dag_run: The DagRun to schedule
 :return: Callback that needs to be executed
 """
-callback: DagCallbackRequest | None = None
+trace_id = int(trace_utils.gen_trace_id(dag_run=dag_run), 16)
+span_id = int(trace_utils.gen_dag_span_id(dag_run=dag_run), 16)
+links = [{"trace_id": trace_id, "span_id": span_id}]
+
+with Trace.start_span(
+span_name="_schedule_dag_run", component="SchedulerJobRunner", 
links=links
+) as s:
+s.set_attribute("dag_id", dag_run.dag_id)
+s.set_attribute("run_id", dag_run.run_id)
+s.set_attribute("run_type", dag_run.run_type)
+
+callback: DagCallbackRequest | None = None
+
+dag = dag_run.dag = self.dagbag.get_dag(dag_run.dag_id, 
session=session)
+dag_model = DM.get_dagmodel(dag_run.dag_id, session)
+
+if not dag or not dag_model:
+self.log.error("Couldn't find DAG %s in DAG bag or database!", 
dag_run.dag_id)
+return callback
+
+if (
+dag_run.start_date
+and dag.dagrun_timeout
+and dag_run.start_date < timezone.utcnow() - dag.dagrun_timeout
+):
+dag_run.set_state(DagRunState.FAILED)
+unfinished_task_instances = session.scalars(
+select(TI)
+.where(TI.dag_id == dag_run.dag_id)
+.where(TI.run_id == dag_run.run_id)
+.where(TI.state.in_(State.unfinished))
+)
+for task_instance in unfinished_task_instances:
+task_instance.state = TaskInstanceState.SKIPPED
+session.merge(task_instance)
+session.flush()
+self.log.info("Run %s of %s has timed-out", dag_run.run_id, 
dag_run.dag_id)
+
+if self._should_update_dag_next_dagruns(
+dag, dag_model, last_dag_run=dag_run, session=session
+):
+dag_model.calculate_dagrun_date_fields(dag, 
dag.get_run_data_interval(dag_run))
+
+callback_to_execute = DagCallbackRequest(
+full_filepath=dag.fileloc,
+dag_id=dag.dag_id,
+run_id=dag_run.run_id,
+is_failure_callback=True,
+processor_subdir=dag_model.processor_subdir,
+msg="timed_out",
+)
 
-dag = dag_run.dag = self.dagbag.get_dag(dag_run.dag_id, 
session=session)
-dag_model = DM.get_dagmodel(dag_run.dag_id, session)
+dag_run.notify_dagrun_state_changed()
+duration = dag_run.end_date - dag_run.start_date
+Stats.timing(f"dagrun.duration.failed.{dag_run.dag_id}", 
duration)
+Stats.timing("dagrun.duration.failed", duration, 
tags={"dag_id": dag_run.dag_id})
+
+s.set_attribute("error", True)
+s.add_event(
+name="error",
+attributes={
+"message": f"Run {dag_run.run_id} of {dag_run.dag_id} 
has timed-out",
+"duration": str(duration),
+},
+)
+return callback_to_execute
 
-if not dag or not dag_model:
-self.log.error("Couldn't find DAG %s in DAG bag or database!", 
dag_run.dag_id)
-return callback
+if dag_run.execution_date > timezone.utcnow() and not 
dag.allow_future_exec_dates:
+self.log.error("Execution date is in future: %s", 
dag_run.execution_date)
+return callback
 
-if (
-dag_run.start_date
-and dag.dagrun_timeout
-and dag_run.start_date < timezone.utcnow() - dag.dagrun_timeout
-):
-dag_run.set_state(DagRunState.FAILED)
-unfinished_task_instances = session.scalars(
-select(TI)
-.where(TI.dag_id == dag_run.dag_id)
-.where(TI.run_id == dag_run.run_id)
-.where(TI.state.in_(State.unfinished))
-)
-for task_instance in unfinished_task_instances:
-task_instance.state = TaskInstanceState.SKIPPED
-session.merge(task_instance)
-session.flush()
-self.log.info("Run %s of %s has timed-out", dag_run.run_id, 
dag_run.dag_id)
+if not self._verify_integrity_if_dag_changed(dag_run=dag_run, 
session=session):
+self.log.warning(
+"The DAG disappeared before verifying integrity: %s. 
Skipping.", dag_run.dag_id
+

Re: [PR] [AIP-49] OpenTelemetry Traces for Apache Airflow [airflow]

2024-03-11 Thread via GitHub


ferruzzi commented on code in PR #37948:
URL: https://github.com/apache/airflow/pull/37948#discussion_r1520533373


##
airflow/executors/base_executor.py:
##
@@ -287,15 +301,41 @@ def trigger_tasks(self, open_slots: int) -> None:
 if key in self.attempts:
 del self.attempts[key]
 task_tuples.append((key, command, queue, ti.executor_config))
+s.add_event(
+name="task to trigger",
+attributes={"command": str(command), "conf": 
str(ti.executor_config)},
+)
 
 if task_tuples:
 self._process_tasks(task_tuples)
 
+@span
 def _process_tasks(self, task_tuples: list[TaskTuple]) -> None:
+from airflow.traces.utils import gen_span_id_from_ti_key, gen_trace_id
+
 for key, command, queue, executor_config in task_tuples:
-del self.queued_tasks[key]
-self.execute_async(key=key, command=command, queue=queue, 
executor_config=executor_config)
-self.running.add(key)
+qt = self.queued_tasks[key][3]
+trace_id = int(gen_trace_id(qt.dag_run), 16)  # TaskInstance in 
fourth element

Review Comment:
   oof.That makes the suggestion less useful... hmm...  I'll think on 
it, you shouldn't have to be casting it a million times, there must be a better 
way.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [AIP-49] OpenTelemetry Traces for Apache Airflow [airflow]

2024-03-11 Thread via GitHub


ferruzzi commented on code in PR #37948:
URL: https://github.com/apache/airflow/pull/37948#discussion_r1520529201


##
airflow/dag_processing/manager.py:
##
@@ -1028,6 +1049,23 @@ def _collect_results_from_processor(self, processor) -> 
None:
 )
 self._file_stats[processor.file_path] = stat
 file_name = Path(processor.file_path).stem
+
+"""crude exposure of instrumentation code which may need to be 
furnished"""
+s = Trace.get_tracer("DagFileProcessorManager").start_span(
+"dag_processing", start_time=int(processor.start_time.timestamp() 
* 10)

Review Comment:
   Better name than mine, thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [AIP-49] OpenTelemetry Traces for Apache Airflow [airflow]

2024-03-11 Thread via GitHub


ferruzzi commented on code in PR #37948:
URL: https://github.com/apache/airflow/pull/37948#discussion_r1520526202


##
airflow/traces/tracer.py:
##
@@ -0,0 +1,256 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import logging
+import socket
+import typing
+from typing import (
+TYPE_CHECKING,
+Callable,
+)
+
+from airflow.configuration import conf
+from airflow.typing_compat import Protocol
+
+log = logging.getLogger(__name__)
+
+
+def gen_context(trace_id, span_id):
+"""Generate span context from trace_id and span_id."""
+from airflow.traces.otel_tracer import gen_context as otel_gen_context
+
+return otel_gen_context(trace_id, span_id)
+
+
+def gen_links_from_kv_list(list):
+"""Generate links from kv list of {trace_id:int, span_id:int}."""
+from airflow.traces.otel_tracer import gen_links_from_kv_list
+
+return gen_links_from_kv_list(list)
+
+
+def span(func):
+"""Decorate a function with span."""
+
+def wrapper(*args, **kwargs):
+func_name = func.__name__
+qual_name = func.__qualname__
+module_name = func.__module__
+if "." in qual_name:
+component = f"{qual_name.rsplit('.', 1)[0]}"
+else:
+component = module_name
+with Trace.start_span(span_name=func_name, component=component):
+return func(*args, **kwargs)
+
+return wrapper
+
+
+class DummyContext:
+"""If no Tracer is configured, DummyContext is used as a fallback."""
+
+def __init__(self):
+self.trace_id = 1
+
+
+class DummySpan:
+"""If no Tracer is configured, DummySpan is used as a fallback."""
+
+def __enter__(self):
+"""Enter."""
+return self
+
+def __exit__(self, *args, **kwargs):
+"""Exit."""
+pass
+
+def __call__(self, obj):
+"""Call."""
+return obj
+
+def get_span_context(self):
+"""Get span context."""
+return DUMMY_CTX
+
+def set_attribute(self, key, value) -> None:
+"""Set an attribute to the span."""
+pass
+
+def set_attributes(self, attributes) -> None:
+"""Set multiple attributes at once."""
+pass
+
+def add_event(
+self,
+name: str,
+attributes=None,

Review Comment:
   Maybe a suggestion... shouldn't OtelTrace inherit Tracer?  If so, you can 
add a field tot eh Tracer class which holds the type hint and override that in 
the various tracers as they are implemented.   For example:
   
   ```class Tracer(Protocol):
   """This class is only used for TypeChecking (for IDEs, mypy, etc)."""
   
   instance: Tracer | DummyTrace | None = None
   attribute_type = NewType(Any, None)
   ```
   
   Then in OtelTrace
   
   ```
   class OtelTrace:
   attribute_type = NewType(OtelAttribute, None)  # or whatever it is 
supposed to be...
   ```
   
   That should let you define the type hint as `attribute_type` and it should 
be enforced depending on which tracer is being used.
   
   
   example showing the inheritence in action:
   
   ```
   In [1]: class A:
  ...: banana = "banana"
  ...: 
   
   In [2]: class B(A):
  ...: banana = "apple"
  ...: 
   
   In [3]: class C(A):
  ...: # banana not defined
  ...: 
   
   In [4]: A.banana
   Out[4]: 'banana'
   
   In [5]: B.banana
   Out[5]: 'apple'
   
   In [6]: C.banana
   Out[6]: 'banana'
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [AIP-49] OpenTelemetry Traces for Apache Airflow [airflow]

2024-03-11 Thread via GitHub


ferruzzi commented on code in PR #37948:
URL: https://github.com/apache/airflow/pull/37948#discussion_r1520517176


##
airflow/traces/utils.py:
##
@@ -0,0 +1,105 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import logging
+
+from airflow.utils.hashlib_wrapper import md5
+
+log = logging.getLogger(__name__)
+
+
+def gen_trace_id(dag_run) -> str:
+dag_id = dag_run.dag_id
+run_id = dag_run.run_id
+start_dt = dag_run.start_date
+hash_seed = f"{dag_id}_{run_id}_{start_dt.timestamp()}"
+hash_hex = md5(hash_seed.encode("utf-8")).hexdigest()
+return hash_hex
+
+
+def gen_span_id_from_ti_key(ti_key) -> str:
+"""Generate span id from TI key."""
+dag_id = ti_key.dag_id
+run_id = ti_key.run_id
+task_id = ti_key.task_id
+try_num = ti_key.try_number  # key always has next number, not current
+hash_seed = f"{dag_id}_{run_id}_{task_id}_{try_num}"
+hash_hex = md5(hash_seed.encode("utf-8")).hexdigest()[16:]
+return hash_hex
+
+
+def gen_dag_span_id(dag_run):
+"""Generate dag's root span id using dag_run."""
+dag_id = dag_run.dag_id
+run_id = dag_run.run_id
+start_dt = dag_run.start_date
+hash_seed = f"{dag_id}_{run_id}_{start_dt.timestamp()}"
+hash_hex = md5(hash_seed.encode("utf-8")).hexdigest()[16:]
+return hash_hex
+
+
+def gen_span_id(ti):
+"""Generate span id from the task instance."""
+dag_run = ti.dag_run
+dag_id = dag_run.dag_id
+run_id = dag_run.run_id
+task_id = ti.task_id
+"""in terms of ti when this is called, the try_number is already set to 
next, hence the subtraction"""
+try_num = ti.try_number - 1
+hash_seed = f"{dag_id}_{run_id}_{task_id}_{try_num}"
+hash_hex = md5(hash_seed.encode("utf-8")).hexdigest()[16:]
+return hash_hex
+
+
+def parse_traceparent(traceparent_str: str | None = None) -> dict:
+"""Parse traceparent string: 
00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01."""
+if traceparent_str is None:
+return {}
+tokens = traceparent_str.split("-")
+if len(tokens) != 4:
+raise ValueError("The traceparent string does not have the correct 
format.")
+return {"version": tokens[0], "trace_id": tokens[1], "parent_id": 
tokens[2], "flags": tokens[3]}

Review Comment:
   Yeah, that's a "cultural difference' between Python and Java.  I was taught 
the same thing when I learned Java, but in Python it is considered perfectly 
acceptable (maybe even preferred) to use exceptions as control logic like that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [AIP-49] OpenTelemetry Traces for Apache Airflow [airflow]

2024-03-11 Thread via GitHub


ferruzzi commented on code in PR #37948:
URL: https://github.com/apache/airflow/pull/37948#discussion_r1520514584


##
scripts/ci/docker-compose/integration-otel.yml:
##
@@ -54,14 +54,31 @@ services:
   - ./grafana/volume/dashboards:/grafana/dashboards
   - ./grafana/volume/provisioning:/grafana/provisioning
 
+  jaeger:
+image: jaegertracing/all-in-one
+container_name: "breeze-jaeger"
+environment:
+  COLLECTOR_OTLP_ENABLED: true
+  COLLLECTOR_ZIPKIN_HOST_PORT: :9411

Review Comment:
   Cool.  If it works, that's fine.  It looks odd so I wanted to verify.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [AIP-49] OpenTelemetry Traces for Apache Airflow [airflow]

2024-03-11 Thread via GitHub


ferruzzi commented on code in PR #37948:
URL: https://github.com/apache/airflow/pull/37948#discussion_r1520307917


##
airflow/jobs/job.py:
##
@@ -211,35 +211,29 @@ def heartbeat(
 heartbeat_callback(session)
 self.log.debug("[heartbeat]")
 except OperationalError:
-Stats.incr(convert_camel_to_snake(self.__class__.__name__) + 
"_heartbeat_failure", 1, 1)
+class_name = self.__class__.__name__
+Stats.incr(convert_camel_to_snake(class_name) + 
"_heartbeat_failure", 1, 1)
 if not self.heartbeat_failed:
-self.log.exception("%s heartbeat got an exception", 
self.__class__.__name__)
+msg = f"{class_name} heartbeat got an exception"
+self.log.exception(msg)
 self.heartbeat_failed = True
-s.add_event(
+span.add_event(
 name="error",
-attributes={"message": f"{self.__class__.__name__} 
heartbeat got an exception"},
+attributes={"message": msg},

Review Comment:
   Personal preference, feel free to ignore:  With `"message": msg` being so 
short now (here and below), I think this should all fit on one line and look 
clean.  Removing the trailing `,` at the end of this line will let the linter 
do that.   Pretty sure that's just a personal style thing though, you do you. 
:P 



##
airflow/jobs/job.py:
##
@@ -211,35 +211,29 @@ def heartbeat(
 heartbeat_callback(session)
 self.log.debug("[heartbeat]")
 except OperationalError:
-Stats.incr(convert_camel_to_snake(self.__class__.__name__) + 
"_heartbeat_failure", 1, 1)
+class_name = self.__class__.__name__
+Stats.incr(convert_camel_to_snake(class_name) + 
"_heartbeat_failure", 1, 1)
 if not self.heartbeat_failed:
-self.log.exception("%s heartbeat got an exception", 
self.__class__.__name__)
+msg = f"{class_name} heartbeat got an exception"

Review Comment:
   Thanks, I think this looks much cleaner and will make sure those messages 
will always be consistent.



##
airflow/traces/utils.py:
##
@@ -18,52 +18,66 @@
 from __future__ import annotations
 
 import logging
+from typing import TYPE_CHECKING
 
 from airflow.utils.hashlib_wrapper import md5
 
+if TYPE_CHECKING:
+from airflow.models import DagRun, TaskInstance
+from airflow.models.taskinstancekey import TaskInstanceKey
+
 log = logging.getLogger(__name__)
 
 
-def gen_trace_id(dag_run) -> str:
+def gen_trace_id(dag_run: DagRun, as_int: bool = False) -> str | int:
+"""Generate trace id from DagRun."""
 dag_id = dag_run.dag_id
 run_id = dag_run.run_id
 start_dt = dag_run.start_date
 hash_seed = f"{dag_id}_{run_id}_{start_dt.timestamp()}"
 hash_hex = md5(hash_seed.encode("utf-8")).hexdigest()
+if as_int is True:
+return int(hash_hex, 16)

Review Comment:
   I know this was my suggestion, but seeing it in use, I wonder if `as_int` is 
right, or should it be `as_decimal`?   I'll leave it to you.  I think both 
work, not sure if one is better than the other.



##
airflow/executors/base_executor.py:
##
@@ -311,12 +313,10 @@ def trigger_tasks(self, open_slots: int) -> None:
 
 @span
 def _process_tasks(self, task_tuples: list[TaskTuple]) -> None:
-from airflow.traces.utils import gen_span_id_from_ti_key, gen_trace_id
-
 for key, command, queue, executor_config in task_tuples:
-qt = self.queued_tasks[key][3]
-trace_id = int(gen_trace_id(qt.dag_run), 16)  # TaskInstance in 
fourth element
-span_id = int(gen_span_id_from_ti_key(key), 16)
+task_instance = self.queued_tasks[key][3]
+trace_id = int(gen_trace_id(task_instance.dag_run, as_int=True))  
# TaskInstance in fourth element

Review Comment:
   Here and elsewhere:  With `as_int` casting the return value, you shouldn't 
have to cast it again here.



##
airflow/jobs/job.py:
##
@@ -211,35 +211,29 @@ def heartbeat(
 heartbeat_callback(session)
 self.log.debug("[heartbeat]")
 except OperationalError:
-Stats.incr(convert_camel_to_snake(self.__class__.__name__) + 
"_heartbeat_failure", 1, 1)
+class_name = self.__class__.__name__
+Stats.incr(convert_camel_to_snake(class_name) + 
"_heartbeat_failure", 1, 1)

Review Comment:
   Stats.incr() should default to 1,1 so you should be able to drop those and 
make this just `Stats.incr(convert_camel_to_snake(class_name) + 
"_heartbeat_failure")`, but I'll leave it up to you if you think it's nicer to 
be explicit there.



##
airflow/executors/base_executor.py:
##
@@ -311,12 +313,10 @@ de

Re: [PR] [AIP-49] OpenTelemetry Traces for Apache Airflow [airflow]

2024-03-08 Thread via GitHub


howardyoo commented on code in PR #37948:
URL: https://github.com/apache/airflow/pull/37948#discussion_r1518483339


##
airflow/executors/base_executor.py:
##
@@ -287,15 +301,41 @@ def trigger_tasks(self, open_slots: int) -> None:
 if key in self.attempts:
 del self.attempts[key]
 task_tuples.append((key, command, queue, ti.executor_config))
+s.add_event(
+name="task to trigger",
+attributes={"command": str(command), "conf": 
str(ti.executor_config)},
+)
 
 if task_tuples:
 self._process_tasks(task_tuples)
 
+@span
 def _process_tasks(self, task_tuples: list[TaskTuple]) -> None:
+from airflow.traces.utils import gen_span_id_from_ti_key, gen_trace_id
+
 for key, command, queue, executor_config in task_tuples:
-del self.queued_tasks[key]
-self.execute_async(key=key, command=command, queue=queue, 
executor_config=executor_config)
-self.running.add(key)
+qt = self.queued_tasks[key][3]
+trace_id = int(gen_trace_id(qt.dag_run), 16)  # TaskInstance in 
fourth element

Review Comment:
   So, applying the function with `as_int` boolean did cause problem with 
pre-commit type checking because it thinks trace_id can now be either str | 
int. You do end up casting the value with int(..) anyway.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [AIP-49] OpenTelemetry Traces for Apache Airflow [airflow]

2024-03-08 Thread via GitHub


howardyoo commented on code in PR #37948:
URL: https://github.com/apache/airflow/pull/37948#discussion_r1518482023


##
airflow/executors/base_executor.py:
##
@@ -287,15 +301,41 @@ def trigger_tasks(self, open_slots: int) -> None:
 if key in self.attempts:
 del self.attempts[key]
 task_tuples.append((key, command, queue, ti.executor_config))
+s.add_event(
+name="task to trigger",
+attributes={"command": str(command), "conf": 
str(ti.executor_config)},
+)
 
 if task_tuples:
 self._process_tasks(task_tuples)
 
+@span
 def _process_tasks(self, task_tuples: list[TaskTuple]) -> None:
+from airflow.traces.utils import gen_span_id_from_ti_key, gen_trace_id
+
 for key, command, queue, executor_config in task_tuples:
-del self.queued_tasks[key]
-self.execute_async(key=key, command=command, queue=queue, 
executor_config=executor_config)
-self.running.add(key)
+qt = self.queued_tasks[key][3]
+trace_id = int(gen_trace_id(qt.dag_run), 16)  # TaskInstance in 
fourth element

Review Comment:
   I think that's a great idea - will apply!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [AIP-49] OpenTelemetry Traces for Apache Airflow [airflow]

2024-03-08 Thread via GitHub


howardyoo commented on code in PR #37948:
URL: https://github.com/apache/airflow/pull/37948#discussion_r1518481942


##
airflow/jobs/scheduler_job_runner.py:
##
@@ -1415,70 +1478,101 @@ def _schedule_dag_run(
 :param dag_run: The DagRun to schedule
 :return: Callback that needs to be executed
 """
-callback: DagCallbackRequest | None = None
+trace_id = int(trace_utils.gen_trace_id(dag_run=dag_run), 16)
+span_id = int(trace_utils.gen_dag_span_id(dag_run=dag_run), 16)

Review Comment:
   Nice idea, but span_id sometimes needs to come from different locations 
(like task instance, task instance keys, or from dag_run), so this could make 
the function a bit complicated. I'd like to keep this as is for now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [AIP-49] OpenTelemetry Traces for Apache Airflow [airflow]

2024-03-08 Thread via GitHub


howardyoo commented on code in PR #37948:
URL: https://github.com/apache/airflow/pull/37948#discussion_r1518480222


##
tests/core/test_otel_tracer.py:
##
@@ -0,0 +1,155 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import json
+import logging
+from datetime import datetime
+from unittest.mock import MagicMock, patch
+
+import pytest
+from opentelemetry.sdk.trace.export.in_memory_span_exporter import 
InMemorySpanExporter
+
+from airflow.traces import TRACEPARENT, TRACESTATE, otel_tracer, utils
+from airflow.traces.tracer import Trace
+from tests.test_utils.config import env_vars
+
+
+@pytest.fixture
+def name():
+return "test_traces_run"
+
+
+class TestOtelTrace:
+@patch("opentelemetry.sdk.trace.export.ConsoleSpanExporter")
+@patch("airflow.traces.otel_tracer.conf")
+def test_tracer(self, conf_a, exporter):
+# necessary to speed up the span to be emitted
+with env_vars({"OTEL_BSP_SCHEDULE_DELAY": "1"}):
+log = logging.getLogger("TestOtelTrace.test_tracer")
+log.setLevel(logging.DEBUG)
+# hijacking airflow conf with pre-defined
+# values
+conf_a.get.return_value = "abc"
+conf_a.getint.return_value = 123
+"""this will enable debug to set - which outputs the result

Review Comment:
   Nothing particular, didn't know what the usual convention was. """ comment 
seemed better than # comment in many cases, as it was more outstanding and easy 
to detect.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [AIP-49] OpenTelemetry Traces for Apache Airflow [airflow]

2024-03-08 Thread via GitHub


howardyoo commented on code in PR #37948:
URL: https://github.com/apache/airflow/pull/37948#discussion_r1518480136


##
airflow/traces/tracer.py:
##
@@ -0,0 +1,256 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import logging
+import socket
+import typing
+from typing import (
+TYPE_CHECKING,
+Callable,
+)
+
+from airflow.configuration import conf
+from airflow.typing_compat import Protocol
+
+log = logging.getLogger(__name__)
+
+
+def gen_context(trace_id, span_id):
+"""Generate span context from trace_id and span_id."""
+from airflow.traces.otel_tracer import gen_context as otel_gen_context
+
+return otel_gen_context(trace_id, span_id)
+
+
+def gen_links_from_kv_list(list):
+"""Generate links from kv list of {trace_id:int, span_id:int}."""
+from airflow.traces.otel_tracer import gen_links_from_kv_list
+
+return gen_links_from_kv_list(list)
+
+
+def span(func):
+"""Decorate a function with span."""
+
+def wrapper(*args, **kwargs):
+func_name = func.__name__
+qual_name = func.__qualname__
+module_name = func.__module__
+if "." in qual_name:
+component = f"{qual_name.rsplit('.', 1)[0]}"
+else:
+component = module_name
+with Trace.start_span(span_name=func_name, component=component):
+return func(*args, **kwargs)
+
+return wrapper
+
+
+class DummyContext:
+"""If no Tracer is configured, DummyContext is used as a fallback."""
+
+def __init__(self):
+self.trace_id = 1
+
+
+class DummySpan:
+"""If no Tracer is configured, DummySpan is used as a fallback."""
+
+def __enter__(self):
+"""Enter."""
+return self
+
+def __exit__(self, *args, **kwargs):
+"""Exit."""
+pass
+
+def __call__(self, obj):
+"""Call."""
+return obj
+
+def get_span_context(self):
+"""Get span context."""
+return DUMMY_CTX
+
+def set_attribute(self, key, value) -> None:
+"""Set an attribute to the span."""
+pass
+
+def set_attributes(self, attributes) -> None:
+"""Set multiple attributes at once."""
+pass
+
+def add_event(
+self,
+name: str,
+attributes=None,

Review Comment:
   For now, I'll set the type as `Any` :
   `attributes: Any | None = None`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [AIP-49] OpenTelemetry Traces for Apache Airflow [airflow]

2024-03-08 Thread via GitHub


howardyoo commented on code in PR #37948:
URL: https://github.com/apache/airflow/pull/37948#discussion_r1518479352


##
airflow/traces/otel_tracer.py:
##
@@ -0,0 +1,333 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import logging
+import random
+
+from opentelemetry import trace
+from opentelemetry.context import create_key
+from opentelemetry.exporter.otlp.proto.http.trace_exporter import 
OTLPSpanExporter
+from opentelemetry.sdk.resources import HOST_NAME, SERVICE_NAME, Resource
+from opentelemetry.sdk.trace import Span, Tracer as OpenTelemetryTracer, 
TracerProvider
+from opentelemetry.sdk.trace.export import BatchSpanProcessor, 
ConsoleSpanExporter
+from opentelemetry.sdk.trace.id_generator import IdGenerator
+from opentelemetry.trace import Link, NonRecordingSpan, SpanContext, 
TraceFlags, Tracer
+from opentelemetry.trace.span import INVALID_SPAN_ID, INVALID_TRACE_ID
+
+from airflow.configuration import conf
+from airflow.traces import (
+TRACEPARENT,
+TRACESTATE,
+)
+from airflow.traces.utils import (
+gen_dag_span_id,
+gen_span_id,
+gen_trace_id,
+parse_traceparent,
+parse_tracestate,
+)
+from airflow.utils.net import get_hostname
+
+log = logging.getLogger(__name__)
+
+_NEXT_ID = create_key("next_id")
+
+
+class OtelTrace:
+"""
+OpenTelemetry Tracing Class.
+
+Handles all tracing requirements such as getting the tracer, and starting 
a new span.
+When OTEL is enabled, the Trace class will be replaced by this class.
+"""
+
+def __init__(self, span_exporter: ConsoleSpanExporter | OTLPSpanExporter, 
tags=None):
+self.span_exporter = span_exporter
+self.span_processor = BatchSpanProcessor(self.span_exporter)
+self.tags = tags
+self.otel_service = conf.get("traces", "otel_service")
+
+def get_tracer(self, component: str) -> OpenTelemetryTracer | Tracer:
+"""Get tracer from a given component."""
+resource = Resource(attributes={HOST_NAME: get_hostname(), 
SERVICE_NAME: self.otel_service})
+tracer_provider = TracerProvider(resource=resource)
+# span_processor = BatchSpanProcessor(self.span_exporter)
+tracer_provider.add_span_processor(self.span_processor)
+tracer = tracer_provider.get_tracer(component)
+return tracer
+
+def get_tracer_with_id(
+self, component: str, trace_id: int | None = None, span_id: int | None 
= None
+) -> OpenTelemetryTracer | Tracer:
+"""Tracer that will use special AirflowOtelIdGenerator to control 
producing certain span and trace id."""
+resource = Resource(attributes={HOST_NAME: get_hostname(), 
SERVICE_NAME: self.otel_service})
+tracer_provider = TracerProvider(
+resource=resource, 
id_generator=AirflowOtelIdGenerator(span_id=span_id, trace_id=trace_id)
+)
+# span_processor = BatchSpanProcessor(self.span_exporter, 
schedule_delay_millis=1)
+tracer_provider.add_span_processor(self.span_processor)
+tracer = tracer_provider.get_tracer(component)
+"""
+Tracer will product a Single ID value if value is provided. Note that 
this is one-time only, so any
+subsequent call will produce the normal random ids.
+"""
+return tracer
+
+def get_current_span(self):
+return trace.get_current_span()
+
+def use_span(self, span: Span):
+return trace.use_span(span=span)
+
+def start_span(
+self,
+span_name: str,
+component: str | None = None,
+parent_sc: SpanContext | None = None,
+span_id=None,
+links=None,
+start_time=None,
+):
+"""Start a span. if service_name is not given, otel_service is used."""
+if component is None:
+component = self.otel_service
+
+trace_id = self.get_current_span().get_span_context().trace_id
+if span_id is not None:
+tracer = self.get_tracer_with_id(component=component, 
trace_id=trace_id, span_id=span_id)
+else:
+tracer = self.get_tracer(component)
+
+kvs = {}
+if self.tags is not None:
+kvs = parse_tracestate(self.

Re: [PR] [AIP-49] OpenTelemetry Traces for Apache Airflow [airflow]

2024-03-08 Thread via GitHub


howardyoo commented on code in PR #37948:
URL: https://github.com/apache/airflow/pull/37948#discussion_r1518478981


##
airflow/traces/otel_tracer.py:
##
@@ -0,0 +1,333 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import logging
+import random
+
+from opentelemetry import trace
+from opentelemetry.context import create_key
+from opentelemetry.exporter.otlp.proto.http.trace_exporter import 
OTLPSpanExporter
+from opentelemetry.sdk.resources import HOST_NAME, SERVICE_NAME, Resource
+from opentelemetry.sdk.trace import Span, Tracer as OpenTelemetryTracer, 
TracerProvider
+from opentelemetry.sdk.trace.export import BatchSpanProcessor, 
ConsoleSpanExporter
+from opentelemetry.sdk.trace.id_generator import IdGenerator
+from opentelemetry.trace import Link, NonRecordingSpan, SpanContext, 
TraceFlags, Tracer
+from opentelemetry.trace.span import INVALID_SPAN_ID, INVALID_TRACE_ID
+
+from airflow.configuration import conf
+from airflow.traces import (
+TRACEPARENT,
+TRACESTATE,
+)
+from airflow.traces.utils import (
+gen_dag_span_id,
+gen_span_id,
+gen_trace_id,
+parse_traceparent,
+parse_tracestate,
+)
+from airflow.utils.net import get_hostname
+
+log = logging.getLogger(__name__)
+
+_NEXT_ID = create_key("next_id")
+
+
+class OtelTrace:
+"""
+OpenTelemetry Tracing Class.
+
+Handles all tracing requirements such as getting the tracer, and starting 
a new span.
+When OTEL is enabled, the Trace class will be replaced by this class.
+"""
+
+def __init__(self, span_exporter: ConsoleSpanExporter | OTLPSpanExporter, 
tags=None):
+self.span_exporter = span_exporter
+self.span_processor = BatchSpanProcessor(self.span_exporter)
+self.tags = tags
+self.otel_service = conf.get("traces", "otel_service")
+
+def get_tracer(self, component: str) -> OpenTelemetryTracer | Tracer:
+"""Get tracer from a given component."""
+resource = Resource(attributes={HOST_NAME: get_hostname(), 
SERVICE_NAME: self.otel_service})
+tracer_provider = TracerProvider(resource=resource)
+# span_processor = BatchSpanProcessor(self.span_exporter)
+tracer_provider.add_span_processor(self.span_processor)
+tracer = tracer_provider.get_tracer(component)
+return tracer
+
+def get_tracer_with_id(
+self, component: str, trace_id: int | None = None, span_id: int | None 
= None
+) -> OpenTelemetryTracer | Tracer:
+"""Tracer that will use special AirflowOtelIdGenerator to control 
producing certain span and trace id."""
+resource = Resource(attributes={HOST_NAME: get_hostname(), 
SERVICE_NAME: self.otel_service})
+tracer_provider = TracerProvider(
+resource=resource, 
id_generator=AirflowOtelIdGenerator(span_id=span_id, trace_id=trace_id)
+)
+# span_processor = BatchSpanProcessor(self.span_exporter, 
schedule_delay_millis=1)
+tracer_provider.add_span_processor(self.span_processor)
+tracer = tracer_provider.get_tracer(component)
+"""
+Tracer will product a Single ID value if value is provided. Note that 
this is one-time only, so any
+subsequent call will produce the normal random ids.
+"""
+return tracer
+
+def get_current_span(self):

Review Comment:
   Good idea, but this function is same with how Otel tracer would have, so I'd 
like to match the signature of this as function, if possible.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [AIP-49] OpenTelemetry Traces for Apache Airflow [airflow]

2024-03-08 Thread via GitHub


howardyoo commented on code in PR #37948:
URL: https://github.com/apache/airflow/pull/37948#discussion_r1518477892


##
airflow/traces/otel_tracer.py:
##
@@ -0,0 +1,333 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import logging
+import random
+
+from opentelemetry import trace
+from opentelemetry.context import create_key
+from opentelemetry.exporter.otlp.proto.http.trace_exporter import 
OTLPSpanExporter
+from opentelemetry.sdk.resources import HOST_NAME, SERVICE_NAME, Resource
+from opentelemetry.sdk.trace import Span, Tracer as OpenTelemetryTracer, 
TracerProvider
+from opentelemetry.sdk.trace.export import BatchSpanProcessor, 
ConsoleSpanExporter
+from opentelemetry.sdk.trace.id_generator import IdGenerator
+from opentelemetry.trace import Link, NonRecordingSpan, SpanContext, 
TraceFlags, Tracer
+from opentelemetry.trace.span import INVALID_SPAN_ID, INVALID_TRACE_ID
+
+from airflow.configuration import conf
+from airflow.traces import (
+TRACEPARENT,
+TRACESTATE,
+)
+from airflow.traces.utils import (
+gen_dag_span_id,
+gen_span_id,
+gen_trace_id,
+parse_traceparent,
+parse_tracestate,
+)
+from airflow.utils.net import get_hostname
+
+log = logging.getLogger(__name__)
+
+_NEXT_ID = create_key("next_id")
+
+
+class OtelTrace:
+"""
+OpenTelemetry Tracing Class.
+
+Handles all tracing requirements such as getting the tracer, and starting 
a new span.
+When OTEL is enabled, the Trace class will be replaced by this class.
+"""
+
+def __init__(self, span_exporter: ConsoleSpanExporter | OTLPSpanExporter, 
tags=None):
+self.span_exporter = span_exporter
+self.span_processor = BatchSpanProcessor(self.span_exporter)
+self.tags = tags
+self.otel_service = conf.get("traces", "otel_service")
+
+def get_tracer(self, component: str) -> OpenTelemetryTracer | Tracer:
+"""Get tracer from a given component."""
+resource = Resource(attributes={HOST_NAME: get_hostname(), 
SERVICE_NAME: self.otel_service})
+tracer_provider = TracerProvider(resource=resource)
+# span_processor = BatchSpanProcessor(self.span_exporter)
+tracer_provider.add_span_processor(self.span_processor)
+tracer = tracer_provider.get_tracer(component)
+return tracer
+
+def get_tracer_with_id(
+self, component: str, trace_id: int | None = None, span_id: int | None 
= None
+) -> OpenTelemetryTracer | Tracer:
+"""Tracer that will use special AirflowOtelIdGenerator to control 
producing certain span and trace id."""
+resource = Resource(attributes={HOST_NAME: get_hostname(), 
SERVICE_NAME: self.otel_service})
+tracer_provider = TracerProvider(
+resource=resource, 
id_generator=AirflowOtelIdGenerator(span_id=span_id, trace_id=trace_id)
+)
+# span_processor = BatchSpanProcessor(self.span_exporter, 
schedule_delay_millis=1)
+tracer_provider.add_span_processor(self.span_processor)
+tracer = tracer_provider.get_tracer(component)
+"""
+Tracer will product a Single ID value if value is provided. Note that 
this is one-time only, so any
+subsequent call will produce the normal random ids.
+"""
+return tracer
+
+def get_current_span(self):
+return trace.get_current_span()
+
+def use_span(self, span: Span):
+return trace.use_span(span=span)
+
+def start_span(
+self,
+span_name: str,
+component: str | None = None,
+parent_sc: SpanContext | None = None,
+span_id=None,
+links=None,
+start_time=None,
+):
+"""Start a span. if service_name is not given, otel_service is used."""
+if component is None:
+component = self.otel_service
+
+trace_id = self.get_current_span().get_span_context().trace_id
+if span_id is not None:
+tracer = self.get_tracer_with_id(component=component, 
trace_id=trace_id, span_id=span_id)
+else:
+tracer = self.get_tracer(component)
+
+kvs = {}
+if self.tags is not None:
+kvs = parse_tracestate(self.

Re: [PR] [AIP-49] OpenTelemetry Traces for Apache Airflow [airflow]

2024-03-08 Thread via GitHub


howardyoo commented on code in PR #37948:
URL: https://github.com/apache/airflow/pull/37948#discussion_r1518477669


##
airflow/traces/otel_tracer.py:
##
@@ -0,0 +1,333 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import logging
+import random
+
+from opentelemetry import trace
+from opentelemetry.context import create_key
+from opentelemetry.exporter.otlp.proto.http.trace_exporter import 
OTLPSpanExporter
+from opentelemetry.sdk.resources import HOST_NAME, SERVICE_NAME, Resource
+from opentelemetry.sdk.trace import Span, Tracer as OpenTelemetryTracer, 
TracerProvider
+from opentelemetry.sdk.trace.export import BatchSpanProcessor, 
ConsoleSpanExporter
+from opentelemetry.sdk.trace.id_generator import IdGenerator
+from opentelemetry.trace import Link, NonRecordingSpan, SpanContext, 
TraceFlags, Tracer
+from opentelemetry.trace.span import INVALID_SPAN_ID, INVALID_TRACE_ID
+
+from airflow.configuration import conf
+from airflow.traces import (
+TRACEPARENT,
+TRACESTATE,
+)
+from airflow.traces.utils import (
+gen_dag_span_id,
+gen_span_id,
+gen_trace_id,
+parse_traceparent,
+parse_tracestate,
+)
+from airflow.utils.net import get_hostname
+
+log = logging.getLogger(__name__)
+
+_NEXT_ID = create_key("next_id")
+
+
+class OtelTrace:
+"""
+OpenTelemetry Tracing Class.
+
+Handles all tracing requirements such as getting the tracer, and starting 
a new span.
+When OTEL is enabled, the Trace class will be replaced by this class.
+"""
+
+def __init__(self, span_exporter: ConsoleSpanExporter | OTLPSpanExporter, 
tags=None):
+self.span_exporter = span_exporter
+self.span_processor = BatchSpanProcessor(self.span_exporter)
+self.tags = tags
+self.otel_service = conf.get("traces", "otel_service")
+
+def get_tracer(self, component: str) -> OpenTelemetryTracer | Tracer:
+"""Get tracer from a given component."""
+resource = Resource(attributes={HOST_NAME: get_hostname(), 
SERVICE_NAME: self.otel_service})
+tracer_provider = TracerProvider(resource=resource)
+# span_processor = BatchSpanProcessor(self.span_exporter)
+tracer_provider.add_span_processor(self.span_processor)
+tracer = tracer_provider.get_tracer(component)
+return tracer
+
+def get_tracer_with_id(
+self, component: str, trace_id: int | None = None, span_id: int | None 
= None
+) -> OpenTelemetryTracer | Tracer:
+"""Tracer that will use special AirflowOtelIdGenerator to control 
producing certain span and trace id."""
+resource = Resource(attributes={HOST_NAME: get_hostname(), 
SERVICE_NAME: self.otel_service})
+tracer_provider = TracerProvider(
+resource=resource, 
id_generator=AirflowOtelIdGenerator(span_id=span_id, trace_id=trace_id)
+)
+# span_processor = BatchSpanProcessor(self.span_exporter, 
schedule_delay_millis=1)
+tracer_provider.add_span_processor(self.span_processor)
+tracer = tracer_provider.get_tracer(component)
+"""
+Tracer will product a Single ID value if value is provided. Note that 
this is one-time only, so any
+subsequent call will produce the normal random ids.
+"""
+return tracer
+
+def get_current_span(self):
+return trace.get_current_span()
+
+def use_span(self, span: Span):
+return trace.use_span(span=span)
+
+def start_span(
+self,
+span_name: str,
+component: str | None = None,
+parent_sc: SpanContext | None = None,
+span_id=None,
+links=None,
+start_time=None,
+):
+"""Start a span. if service_name is not given, otel_service is used."""
+if component is None:
+component = self.otel_service

Review Comment:
   I tried to do that, but the default signature did not support assigning it 
as 'self.otel_service'



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about 

Re: [PR] [AIP-49] OpenTelemetry Traces for Apache Airflow [airflow]

2024-03-08 Thread via GitHub


howardyoo commented on code in PR #37948:
URL: https://github.com/apache/airflow/pull/37948#discussion_r1518476652


##
airflow/traces/otel_tracer.py:
##
@@ -0,0 +1,333 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import logging
+import random
+
+from opentelemetry import trace
+from opentelemetry.context import create_key
+from opentelemetry.exporter.otlp.proto.http.trace_exporter import 
OTLPSpanExporter
+from opentelemetry.sdk.resources import HOST_NAME, SERVICE_NAME, Resource
+from opentelemetry.sdk.trace import Span, Tracer as OpenTelemetryTracer, 
TracerProvider
+from opentelemetry.sdk.trace.export import BatchSpanProcessor, 
ConsoleSpanExporter
+from opentelemetry.sdk.trace.id_generator import IdGenerator
+from opentelemetry.trace import Link, NonRecordingSpan, SpanContext, 
TraceFlags, Tracer
+from opentelemetry.trace.span import INVALID_SPAN_ID, INVALID_TRACE_ID
+
+from airflow.configuration import conf
+from airflow.traces import (
+TRACEPARENT,
+TRACESTATE,
+)
+from airflow.traces.utils import (
+gen_dag_span_id,
+gen_span_id,
+gen_trace_id,
+parse_traceparent,
+parse_tracestate,
+)
+from airflow.utils.net import get_hostname
+
+log = logging.getLogger(__name__)
+
+_NEXT_ID = create_key("next_id")
+
+
+class OtelTrace:
+"""
+OpenTelemetry Tracing Class.
+
+Handles all tracing requirements such as getting the tracer, and starting 
a new span.
+When OTEL is enabled, the Trace class will be replaced by this class.
+"""
+
+def __init__(self, span_exporter: ConsoleSpanExporter | OTLPSpanExporter, 
tags=None):
+self.span_exporter = span_exporter
+self.span_processor = BatchSpanProcessor(self.span_exporter)
+self.tags = tags
+self.otel_service = conf.get("traces", "otel_service")
+
+def get_tracer(self, component: str) -> OpenTelemetryTracer | Tracer:
+"""Get tracer from a given component."""
+resource = Resource(attributes={HOST_NAME: get_hostname(), 
SERVICE_NAME: self.otel_service})
+tracer_provider = TracerProvider(resource=resource)
+# span_processor = BatchSpanProcessor(self.span_exporter)
+tracer_provider.add_span_processor(self.span_processor)
+tracer = tracer_provider.get_tracer(component)
+return tracer
+
+def get_tracer_with_id(

Review Comment:
   I did not realize that! Applied with pleasure.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [AIP-49] OpenTelemetry Traces for Apache Airflow [airflow]

2024-03-08 Thread via GitHub


howardyoo commented on code in PR #37948:
URL: https://github.com/apache/airflow/pull/37948#discussion_r1518475273


##
airflow/jobs/scheduler_job_runner.py:
##
@@ -1415,70 +1478,101 @@ def _schedule_dag_run(
 :param dag_run: The DagRun to schedule
 :return: Callback that needs to be executed
 """
-callback: DagCallbackRequest | None = None
+trace_id = int(trace_utils.gen_trace_id(dag_run=dag_run), 16)
+span_id = int(trace_utils.gen_dag_span_id(dag_run=dag_run), 16)
+links = [{"trace_id": trace_id, "span_id": span_id}]
+
+with Trace.start_span(
+span_name="_schedule_dag_run", component="SchedulerJobRunner", 
links=links
+) as s:
+s.set_attribute("dag_id", dag_run.dag_id)
+s.set_attribute("run_id", dag_run.run_id)
+s.set_attribute("run_type", dag_run.run_type)
+
+callback: DagCallbackRequest | None = None
+
+dag = dag_run.dag = self.dagbag.get_dag(dag_run.dag_id, 
session=session)
+dag_model = DM.get_dagmodel(dag_run.dag_id, session)
+
+if not dag or not dag_model:
+self.log.error("Couldn't find DAG %s in DAG bag or database!", 
dag_run.dag_id)
+return callback
+
+if (
+dag_run.start_date
+and dag.dagrun_timeout
+and dag_run.start_date < timezone.utcnow() - dag.dagrun_timeout
+):
+dag_run.set_state(DagRunState.FAILED)
+unfinished_task_instances = session.scalars(
+select(TI)
+.where(TI.dag_id == dag_run.dag_id)
+.where(TI.run_id == dag_run.run_id)
+.where(TI.state.in_(State.unfinished))
+)
+for task_instance in unfinished_task_instances:
+task_instance.state = TaskInstanceState.SKIPPED
+session.merge(task_instance)
+session.flush()
+self.log.info("Run %s of %s has timed-out", dag_run.run_id, 
dag_run.dag_id)
+
+if self._should_update_dag_next_dagruns(
+dag, dag_model, last_dag_run=dag_run, session=session
+):
+dag_model.calculate_dagrun_date_fields(dag, 
dag.get_run_data_interval(dag_run))
+
+callback_to_execute = DagCallbackRequest(
+full_filepath=dag.fileloc,
+dag_id=dag.dag_id,
+run_id=dag_run.run_id,
+is_failure_callback=True,
+processor_subdir=dag_model.processor_subdir,
+msg="timed_out",
+)
 
-dag = dag_run.dag = self.dagbag.get_dag(dag_run.dag_id, 
session=session)
-dag_model = DM.get_dagmodel(dag_run.dag_id, session)
+dag_run.notify_dagrun_state_changed()
+duration = dag_run.end_date - dag_run.start_date
+Stats.timing(f"dagrun.duration.failed.{dag_run.dag_id}", 
duration)
+Stats.timing("dagrun.duration.failed", duration, 
tags={"dag_id": dag_run.dag_id})
+
+s.set_attribute("error", True)
+s.add_event(
+name="error",
+attributes={
+"message": f"Run {dag_run.run_id} of {dag_run.dag_id} 
has timed-out",
+"duration": str(duration),
+},
+)
+return callback_to_execute
 
-if not dag or not dag_model:
-self.log.error("Couldn't find DAG %s in DAG bag or database!", 
dag_run.dag_id)
-return callback
+if dag_run.execution_date > timezone.utcnow() and not 
dag.allow_future_exec_dates:
+self.log.error("Execution date is in future: %s", 
dag_run.execution_date)
+return callback
 
-if (
-dag_run.start_date
-and dag.dagrun_timeout
-and dag_run.start_date < timezone.utcnow() - dag.dagrun_timeout
-):
-dag_run.set_state(DagRunState.FAILED)
-unfinished_task_instances = session.scalars(
-select(TI)
-.where(TI.dag_id == dag_run.dag_id)
-.where(TI.run_id == dag_run.run_id)
-.where(TI.state.in_(State.unfinished))
-)
-for task_instance in unfinished_task_instances:
-task_instance.state = TaskInstanceState.SKIPPED
-session.merge(task_instance)
-session.flush()
-self.log.info("Run %s of %s has timed-out", dag_run.run_id, 
dag_run.dag_id)
+if not self._verify_integrity_if_dag_changed(dag_run=dag_run, 
session=session):
+self.log.warning(
+"The DAG disappeared before verifying integrity: %s. 
Skipping.", dag_run.dag_id
+   

Re: [PR] [AIP-49] OpenTelemetry Traces for Apache Airflow [airflow]

2024-03-08 Thread via GitHub


howardyoo commented on code in PR #37948:
URL: https://github.com/apache/airflow/pull/37948#discussion_r1518474943


##
airflow/executors/base_executor.py:
##
@@ -287,15 +301,41 @@ def trigger_tasks(self, open_slots: int) -> None:
 if key in self.attempts:
 del self.attempts[key]
 task_tuples.append((key, command, queue, ti.executor_config))
+s.add_event(
+name="task to trigger",
+attributes={"command": str(command), "conf": 
str(ti.executor_config)},
+)
 
 if task_tuples:
 self._process_tasks(task_tuples)
 
+@span
 def _process_tasks(self, task_tuples: list[TaskTuple]) -> None:
+from airflow.traces.utils import gen_span_id_from_ti_key, gen_trace_id
+
 for key, command, queue, executor_config in task_tuples:
-del self.queued_tasks[key]
-self.execute_async(key=key, command=command, queue=queue, 
executor_config=executor_config)
-self.running.add(key)
+qt = self.queued_tasks[key][3]

Review Comment:
   Yes, that is actually a great suggestion! Renaming it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [AIP-49] OpenTelemetry Traces for Apache Airflow [airflow]

2024-03-08 Thread via GitHub


howardyoo commented on code in PR #37948:
URL: https://github.com/apache/airflow/pull/37948#discussion_r1518474305


##
airflow/executors/base_executor.py:
##
@@ -287,15 +301,41 @@ def trigger_tasks(self, open_slots: int) -> None:
 if key in self.attempts:
 del self.attempts[key]
 task_tuples.append((key, command, queue, ti.executor_config))
+s.add_event(
+name="task to trigger",
+attributes={"command": str(command), "conf": 
str(ti.executor_config)},
+)
 
 if task_tuples:
 self._process_tasks(task_tuples)
 
+@span
 def _process_tasks(self, task_tuples: list[TaskTuple]) -> None:
+from airflow.traces.utils import gen_span_id_from_ti_key, gen_trace_id

Review Comment:
   I'm afraid that there's no particular reason.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [AIP-49] OpenTelemetry Traces for Apache Airflow [airflow]

2024-03-08 Thread via GitHub


howardyoo commented on code in PR #37948:
URL: https://github.com/apache/airflow/pull/37948#discussion_r1518471941


##
airflow/dag_processing/manager.py:
##
@@ -1028,6 +1049,23 @@ def _collect_results_from_processor(self, processor) -> 
None:
 )
 self._file_stats[processor.file_path] = stat
 file_name = Path(processor.file_path).stem
+
+"""crude exposure of instrumentation code which may need to be 
furnished"""
+s = Trace.get_tracer("DagFileProcessorManager").start_span(
+"dag_processing", start_time=int(processor.start_time.timestamp() 
* 10)

Review Comment:
   let me do that. The function name should be `datetime_to_nano(datetime) -> 
int`, though.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [AIP-49] OpenTelemetry Traces for Apache Airflow [airflow]

2024-03-08 Thread via GitHub


howardyoo commented on code in PR #37948:
URL: https://github.com/apache/airflow/pull/37948#discussion_r1518471507


##
airflow/jobs/job.py:
##
@@ -181,48 +182,67 @@ def heartbeat(
 :param session to use for saving the job
 """
 previous_heartbeat = self.latest_heartbeat
-
-try:
-# This will cause it to load from the db
-self._merge_from(Job._fetch_from_db(self, session))
-previous_heartbeat = self.latest_heartbeat
-
-if self.state == JobState.RESTARTING:
-self.kill()
-
-# Figure out how long to sleep for
-sleep_for = 0
-if self.latest_heartbeat:
-seconds_remaining = (
-self.heartrate - (timezone.utcnow() - 
self.latest_heartbeat).total_seconds()
-)
-sleep_for = max(0, seconds_remaining)
-sleep(sleep_for)
-
-job = Job._update_heartbeat(job=self, session=session)
-self._merge_from(job)
-
-# At this point, the DB has updated.
-previous_heartbeat = self.latest_heartbeat
-
-heartbeat_callback(session)
-self.log.debug("[heartbeat]")
-except OperationalError:
-Stats.incr(convert_camel_to_snake(self.__class__.__name__) + 
"_heartbeat_failure", 1, 1)
-if not self.heartbeat_failed:
-self.log.exception("%s heartbeat got an exception", 
self.__class__.__name__)
-self.heartbeat_failed = True
-if self.is_alive():
-self.log.error(
-"%s heartbeat failed with error. Scheduler may go into 
unhealthy state",
-self.__class__.__name__,
-)
-else:
-self.log.error(
-"%s heartbeat failed with error. Scheduler is in unhealthy 
state", self.__class__.__name__
-)
-# We didn't manage to heartbeat, so make sure that the timestamp 
isn't updated
-self.latest_heartbeat = previous_heartbeat
+with Trace.start_span(span_name="heartbeat", component="Job") as s:
+try:
+s.set_attribute("heartbeat", str(self.latest_heartbeat))
+# This will cause it to load from the db
+self._merge_from(Job._fetch_from_db(self, session))
+previous_heartbeat = self.latest_heartbeat
+
+if self.state == JobState.RESTARTING:
+self.kill()
+
+# Figure out how long to sleep for
+sleep_for = 0
+if self.latest_heartbeat:
+seconds_remaining = (
+self.heartrate - (timezone.utcnow() - 
self.latest_heartbeat).total_seconds()
+)
+sleep_for = max(0, seconds_remaining)
+s.add_event(name="sleep()", attributes={"sleep_for": 
sleep_for})
+sleep(sleep_for)
+
+job = Job._update_heartbeat(job=self, session=session)
+self._merge_from(job)
+
+# At this point, the DB has updated.
+previous_heartbeat = self.latest_heartbeat
+
+heartbeat_callback(session)
+self.log.debug("[heartbeat]")
+except OperationalError:
+Stats.incr(convert_camel_to_snake(self.__class__.__name__) + 
"_heartbeat_failure", 1, 1)
+if not self.heartbeat_failed:
+self.log.exception("%s heartbeat got an exception", 
self.__class__.__name__)

Review Comment:
   I actually thought about that! (but was just lazy :-( )



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [AIP-49] OpenTelemetry Traces for Apache Airflow [airflow]

2024-03-08 Thread via GitHub


howardyoo commented on code in PR #37948:
URL: https://github.com/apache/airflow/pull/37948#discussion_r1518471245


##
airflow/jobs/job.py:
##
@@ -181,48 +182,67 @@ def heartbeat(
 :param session to use for saving the job
 """
 previous_heartbeat = self.latest_heartbeat
-
-try:
-# This will cause it to load from the db
-self._merge_from(Job._fetch_from_db(self, session))
-previous_heartbeat = self.latest_heartbeat
-
-if self.state == JobState.RESTARTING:
-self.kill()
-
-# Figure out how long to sleep for
-sleep_for = 0
-if self.latest_heartbeat:
-seconds_remaining = (
-self.heartrate - (timezone.utcnow() - 
self.latest_heartbeat).total_seconds()
-)
-sleep_for = max(0, seconds_remaining)
-sleep(sleep_for)
-
-job = Job._update_heartbeat(job=self, session=session)
-self._merge_from(job)
-
-# At this point, the DB has updated.
-previous_heartbeat = self.latest_heartbeat
-
-heartbeat_callback(session)
-self.log.debug("[heartbeat]")
-except OperationalError:
-Stats.incr(convert_camel_to_snake(self.__class__.__name__) + 
"_heartbeat_failure", 1, 1)
-if not self.heartbeat_failed:
-self.log.exception("%s heartbeat got an exception", 
self.__class__.__name__)
-self.heartbeat_failed = True
-if self.is_alive():
-self.log.error(
-"%s heartbeat failed with error. Scheduler may go into 
unhealthy state",
-self.__class__.__name__,
-)
-else:
-self.log.error(
-"%s heartbeat failed with error. Scheduler is in unhealthy 
state", self.__class__.__name__
-)
-# We didn't manage to heartbeat, so make sure that the timestamp 
isn't updated
-self.latest_heartbeat = previous_heartbeat
+with Trace.start_span(span_name="heartbeat", component="Job") as s:
+try:
+s.set_attribute("heartbeat", str(self.latest_heartbeat))
+# This will cause it to load from the db
+self._merge_from(Job._fetch_from_db(self, session))
+previous_heartbeat = self.latest_heartbeat
+
+if self.state == JobState.RESTARTING:
+self.kill()
+
+# Figure out how long to sleep for
+sleep_for = 0
+if self.latest_heartbeat:
+seconds_remaining = (
+self.heartrate - (timezone.utcnow() - 
self.latest_heartbeat).total_seconds()
+)
+sleep_for = max(0, seconds_remaining)
+s.add_event(name="sleep()", attributes={"sleep_for": 
sleep_for})
+sleep(sleep_for)
+
+job = Job._update_heartbeat(job=self, session=session)
+self._merge_from(job)
+
+# At this point, the DB has updated.
+previous_heartbeat = self.latest_heartbeat
+
+heartbeat_callback(session)
+self.log.debug("[heartbeat]")
+except OperationalError:
+Stats.incr(convert_camel_to_snake(self.__class__.__name__) + 
"_heartbeat_failure", 1, 1)

Review Comment:
   done! Tidy is always good.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [AIP-49] OpenTelemetry Traces for Apache Airflow [airflow]

2024-03-08 Thread via GitHub


howardyoo commented on code in PR #37948:
URL: https://github.com/apache/airflow/pull/37948#discussion_r1518463666


##
airflow/traces/tracer.py:
##
@@ -0,0 +1,256 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import logging
+import socket
+import typing
+from typing import (
+TYPE_CHECKING,
+Callable,
+)
+
+from airflow.configuration import conf
+from airflow.typing_compat import Protocol
+
+log = logging.getLogger(__name__)
+
+
+def gen_context(trace_id, span_id):
+"""Generate span context from trace_id and span_id."""
+from airflow.traces.otel_tracer import gen_context as otel_gen_context
+
+return otel_gen_context(trace_id, span_id)
+
+
+def gen_links_from_kv_list(list):
+"""Generate links from kv list of {trace_id:int, span_id:int}."""
+from airflow.traces.otel_tracer import gen_links_from_kv_list
+
+return gen_links_from_kv_list(list)
+
+
+def span(func):
+"""Decorate a function with span."""
+
+def wrapper(*args, **kwargs):
+func_name = func.__name__
+qual_name = func.__qualname__
+module_name = func.__module__
+if "." in qual_name:
+component = f"{qual_name.rsplit('.', 1)[0]}"
+else:
+component = module_name
+with Trace.start_span(span_name=func_name, component=component):
+return func(*args, **kwargs)
+
+return wrapper
+
+
+class DummyContext:
+"""If no Tracer is configured, DummyContext is used as a fallback."""
+
+def __init__(self):
+self.trace_id = 1
+
+
+class DummySpan:
+"""If no Tracer is configured, DummySpan is used as a fallback."""
+
+def __enter__(self):
+"""Enter."""
+return self
+
+def __exit__(self, *args, **kwargs):
+"""Exit."""
+pass
+
+def __call__(self, obj):
+"""Call."""
+return obj
+
+def get_span_context(self):
+"""Get span context."""
+return DUMMY_CTX
+
+def set_attribute(self, key, value) -> None:
+"""Set an attribute to the span."""
+pass
+
+def set_attributes(self, attributes) -> None:
+"""Set multiple attributes at once."""
+pass
+
+def add_event(
+self,
+name: str,
+attributes=None,

Review Comment:
   The type hint comes from opentelemetry, but since there is a possibility 
that maybe sometime in the future we 'might' be able to support any other trace 
library, I do not want tracer.py to have any dependencies from opentelemetry. 
Any suggestions?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [AIP-49] OpenTelemetry Traces for Apache Airflow [airflow]

2024-03-08 Thread via GitHub


howardyoo commented on code in PR #37948:
URL: https://github.com/apache/airflow/pull/37948#discussion_r1518461942


##
airflow/traces/utils.py:
##
@@ -0,0 +1,105 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import logging
+
+from airflow.utils.hashlib_wrapper import md5
+
+log = logging.getLogger(__name__)
+
+
+def gen_trace_id(dag_run) -> str:
+dag_id = dag_run.dag_id
+run_id = dag_run.run_id
+start_dt = dag_run.start_date
+hash_seed = f"{dag_id}_{run_id}_{start_dt.timestamp()}"
+hash_hex = md5(hash_seed.encode("utf-8")).hexdigest()
+return hash_hex
+
+
+def gen_span_id_from_ti_key(ti_key) -> str:
+"""Generate span id from TI key."""
+dag_id = ti_key.dag_id
+run_id = ti_key.run_id
+task_id = ti_key.task_id
+try_num = ti_key.try_number  # key always has next number, not current
+hash_seed = f"{dag_id}_{run_id}_{task_id}_{try_num}"
+hash_hex = md5(hash_seed.encode("utf-8")).hexdigest()[16:]
+return hash_hex
+
+
+def gen_dag_span_id(dag_run):
+"""Generate dag's root span id using dag_run."""
+dag_id = dag_run.dag_id
+run_id = dag_run.run_id
+start_dt = dag_run.start_date
+hash_seed = f"{dag_id}_{run_id}_{start_dt.timestamp()}"
+hash_hex = md5(hash_seed.encode("utf-8")).hexdigest()[16:]
+return hash_hex
+
+
+def gen_span_id(ti):
+"""Generate span id from the task instance."""
+dag_run = ti.dag_run
+dag_id = dag_run.dag_id
+run_id = dag_run.run_id
+task_id = ti.task_id
+"""in terms of ti when this is called, the try_number is already set to 
next, hence the subtraction"""
+try_num = ti.try_number - 1
+hash_seed = f"{dag_id}_{run_id}_{task_id}_{try_num}"
+hash_hex = md5(hash_seed.encode("utf-8")).hexdigest()[16:]
+return hash_hex
+
+
+def parse_traceparent(traceparent_str: str | None = None) -> dict:
+"""Parse traceparent string: 
00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01."""
+if traceparent_str is None:
+return {}
+tokens = traceparent_str.split("-")
+if len(tokens) != 4:
+raise ValueError("The traceparent string does not have the correct 
format.")
+return {"version": tokens[0], "trace_id": tokens[1], "parent_id": 
tokens[2], "flags": tokens[3]}
+
+
+def parse_tracestate(tracestate_str: str | None = None) -> dict:
+"""Parse tracestate string: rojo=00f067aa0ba902b7,congo=t61rcWkgMzE."""
+if tracestate_str is None:
+return {}
+tokens = tracestate_str.split(",")
+result = {}
+for pair in tokens:
+key, value = pair.split("=")
+result[key.strip()] = value.strip()
+return result
+
+
+def is_valid_trace_id(trace_id: str) -> bool:
+"""Check whether trace id is valid."""
+if trace_id is not None and len(trace_id) == 32 and trace_id != 
"0x":
+return True
+else:
+return False

Review Comment:
   the len(trace_id) should be 34, since the string should have '0x' (2 
characters) in addition to 32 0's.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [AIP-49] OpenTelemetry Traces for Apache Airflow [airflow]

2024-03-08 Thread via GitHub


howardyoo commented on code in PR #37948:
URL: https://github.com/apache/airflow/pull/37948#discussion_r1518461640


##
airflow/traces/tracer.py:
##
@@ -0,0 +1,256 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import logging
+import socket
+import typing
+from typing import (
+TYPE_CHECKING,
+Callable,
+)
+
+from airflow.configuration import conf
+from airflow.typing_compat import Protocol
+
+log = logging.getLogger(__name__)
+
+
+def gen_context(trace_id, span_id):
+"""Generate span context from trace_id and span_id."""
+from airflow.traces.otel_tracer import gen_context as otel_gen_context
+
+return otel_gen_context(trace_id, span_id)
+
+
+def gen_links_from_kv_list(list):
+"""Generate links from kv list of {trace_id:int, span_id:int}."""
+from airflow.traces.otel_tracer import gen_links_from_kv_list
+
+return gen_links_from_kv_list(list)
+
+
+def span(func):
+"""Decorate a function with span."""
+
+def wrapper(*args, **kwargs):
+func_name = func.__name__
+qual_name = func.__qualname__
+module_name = func.__module__
+if "." in qual_name:
+component = f"{qual_name.rsplit('.', 1)[0]}"
+else:
+component = module_name
+with Trace.start_span(span_name=func_name, component=component):
+return func(*args, **kwargs)
+
+return wrapper
+
+
+class DummyContext:

Review Comment:
   Ok, I see - I'll replace the context with "Empty", as the idea is more 
towards providing `empty` object.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [AIP-49] OpenTelemetry Traces for Apache Airflow [airflow]

2024-03-08 Thread via GitHub


howardyoo commented on code in PR #37948:
URL: https://github.com/apache/airflow/pull/37948#discussion_r1518461479


##
airflow/traces/utils.py:
##
@@ -0,0 +1,105 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import logging
+
+from airflow.utils.hashlib_wrapper import md5
+
+log = logging.getLogger(__name__)
+
+
+def gen_trace_id(dag_run) -> str:
+dag_id = dag_run.dag_id
+run_id = dag_run.run_id
+start_dt = dag_run.start_date
+hash_seed = f"{dag_id}_{run_id}_{start_dt.timestamp()}"
+hash_hex = md5(hash_seed.encode("utf-8")).hexdigest()
+return hash_hex
+
+
+def gen_span_id_from_ti_key(ti_key) -> str:
+"""Generate span id from TI key."""
+dag_id = ti_key.dag_id
+run_id = ti_key.run_id
+task_id = ti_key.task_id
+try_num = ti_key.try_number  # key always has next number, not current
+hash_seed = f"{dag_id}_{run_id}_{task_id}_{try_num}"
+hash_hex = md5(hash_seed.encode("utf-8")).hexdigest()[16:]
+return hash_hex
+
+
+def gen_dag_span_id(dag_run):
+"""Generate dag's root span id using dag_run."""
+dag_id = dag_run.dag_id
+run_id = dag_run.run_id
+start_dt = dag_run.start_date
+hash_seed = f"{dag_id}_{run_id}_{start_dt.timestamp()}"
+hash_hex = md5(hash_seed.encode("utf-8")).hexdigest()[16:]
+return hash_hex
+
+
+def gen_span_id(ti):
+"""Generate span id from the task instance."""
+dag_run = ti.dag_run
+dag_id = dag_run.dag_id
+run_id = dag_run.run_id
+task_id = ti.task_id
+"""in terms of ti when this is called, the try_number is already set to 
next, hence the subtraction"""
+try_num = ti.try_number - 1
+hash_seed = f"{dag_id}_{run_id}_{task_id}_{try_num}"
+hash_hex = md5(hash_seed.encode("utf-8")).hexdigest()[16:]
+return hash_hex
+
+
+def parse_traceparent(traceparent_str: str | None = None) -> dict:
+"""Parse traceparent string: 
00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01."""
+if traceparent_str is None:
+return {}
+tokens = traceparent_str.split("-")
+if len(tokens) != 4:
+raise ValueError("The traceparent string does not have the correct 
format.")
+return {"version": tokens[0], "trace_id": tokens[1], "parent_id": 
tokens[2], "flags": tokens[3]}
+
+
+def parse_tracestate(tracestate_str: str | None = None) -> dict:
+"""Parse tracestate string: rojo=00f067aa0ba902b7,congo=t61rcWkgMzE."""
+if tracestate_str is None:
+return {}
+tokens = tracestate_str.split(",")
+result = {}
+for pair in tokens:
+key, value = pair.split("=")
+result[key.strip()] = value.strip()
+return result
+
+
+def is_valid_trace_id(trace_id: str) -> bool:
+"""Check whether trace id is valid."""
+if trace_id is not None and len(trace_id) == 32 and trace_id != 
"0x":
+return True
+else:
+return False

Review Comment:
   I like the second example better, will use that!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [AIP-49] OpenTelemetry Traces for Apache Airflow [airflow]

2024-03-08 Thread via GitHub


howardyoo commented on code in PR #37948:
URL: https://github.com/apache/airflow/pull/37948#discussion_r1518461011


##
airflow/traces/utils.py:
##
@@ -0,0 +1,105 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import logging
+
+from airflow.utils.hashlib_wrapper import md5
+
+log = logging.getLogger(__name__)
+
+
+def gen_trace_id(dag_run) -> str:
+dag_id = dag_run.dag_id
+run_id = dag_run.run_id
+start_dt = dag_run.start_date
+hash_seed = f"{dag_id}_{run_id}_{start_dt.timestamp()}"
+hash_hex = md5(hash_seed.encode("utf-8")).hexdigest()
+return hash_hex
+
+
+def gen_span_id_from_ti_key(ti_key) -> str:
+"""Generate span id from TI key."""
+dag_id = ti_key.dag_id
+run_id = ti_key.run_id
+task_id = ti_key.task_id
+try_num = ti_key.try_number  # key always has next number, not current
+hash_seed = f"{dag_id}_{run_id}_{task_id}_{try_num}"
+hash_hex = md5(hash_seed.encode("utf-8")).hexdigest()[16:]
+return hash_hex
+
+
+def gen_dag_span_id(dag_run):
+"""Generate dag's root span id using dag_run."""
+dag_id = dag_run.dag_id
+run_id = dag_run.run_id
+start_dt = dag_run.start_date
+hash_seed = f"{dag_id}_{run_id}_{start_dt.timestamp()}"
+hash_hex = md5(hash_seed.encode("utf-8")).hexdigest()[16:]
+return hash_hex
+
+
+def gen_span_id(ti):
+"""Generate span id from the task instance."""
+dag_run = ti.dag_run
+dag_id = dag_run.dag_id
+run_id = dag_run.run_id
+task_id = ti.task_id
+"""in terms of ti when this is called, the try_number is already set to 
next, hence the subtraction"""
+try_num = ti.try_number - 1
+hash_seed = f"{dag_id}_{run_id}_{task_id}_{try_num}"
+hash_hex = md5(hash_seed.encode("utf-8")).hexdigest()[16:]
+return hash_hex
+
+
+def parse_traceparent(traceparent_str: str | None = None) -> dict:
+"""Parse traceparent string: 
00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01."""
+if traceparent_str is None:
+return {}
+tokens = traceparent_str.split("-")
+if len(tokens) != 4:
+raise ValueError("The traceparent string does not have the correct 
format.")
+return {"version": tokens[0], "trace_id": tokens[1], "parent_id": 
tokens[2], "flags": tokens[3]}

Review Comment:
   Yeah, it is easier to read, but when I was programming in Java, my mentor 
told me not to create too many exceptions, as exceptions are one of the 
expensive operations in programming. I'll keep it as-is for now, but appreciate 
the suggestion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [AIP-49] OpenTelemetry Traces for Apache Airflow [airflow]

2024-03-08 Thread via GitHub


howardyoo commented on code in PR #37948:
URL: https://github.com/apache/airflow/pull/37948#discussion_r1518459668


##
airflow/traces/utils.py:
##
@@ -0,0 +1,105 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import logging
+
+from airflow.utils.hashlib_wrapper import md5
+
+log = logging.getLogger(__name__)
+
+
+def gen_trace_id(dag_run) -> str:

Review Comment:
   ok, done. Thanks for the suggestion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [AIP-49] OpenTelemetry Traces for Apache Airflow [airflow]

2024-03-08 Thread via GitHub


howardyoo commented on code in PR #37948:
URL: https://github.com/apache/airflow/pull/37948#discussion_r1518458488


##
tests/core/test_otel_tracer.py:
##
@@ -0,0 +1,155 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import json
+import logging
+from datetime import datetime
+from unittest.mock import MagicMock, patch
+
+import pytest
+from opentelemetry.sdk.trace.export.in_memory_span_exporter import 
InMemorySpanExporter
+
+from airflow.traces import TRACEPARENT, TRACESTATE, otel_tracer, utils
+from airflow.traces.tracer import Trace
+from tests.test_utils.config import env_vars
+
+
+@pytest.fixture
+def name():
+return "test_traces_run"
+
+
+class TestOtelTrace:
+@patch("opentelemetry.sdk.trace.export.ConsoleSpanExporter")
+@patch("airflow.traces.otel_tracer.conf")
+def test_tracer(self, conf_a, exporter):
+# necessary to speed up the span to be emitted
+with env_vars({"OTEL_BSP_SCHEDULE_DELAY": "1"}):
+log = logging.getLogger("TestOtelTrace.test_tracer")
+log.setLevel(logging.DEBUG)
+# hijacking airflow conf with pre-defined
+# values
+conf_a.get.return_value = "abc"
+conf_a.getint.return_value = 123
+"""this will enable debug to set - which outputs the result
+to the console exporter"""
+conf_a.getboolean.return_value = True
+
+# mocking console exporter with in mem exporter for better 
assertion
+in_mem_exporter = InMemorySpanExporter()
+exporter.return_value = in_mem_exporter
+
+tracer = otel_tracer.get_otel_tracer(Trace)
+assert conf_a.get.called
+assert conf_a.getint.called
+assert conf_a.getboolean.called
+with tracer.start_span(span_name="span1") as s1:
+with tracer.start_span(span_name="span2") as s2:
+s2.set_attribute("attr2", "val2")
+span2 = json.loads(s2.to_json())
+span1 = json.loads(s1.to_json())
+# assert the two span data
+assert span1["name"] == "span1"
+assert span2["name"] == "span2"
+trace_id = span1["context"]["trace_id"]
+s1_span_id = span1["context"]["span_id"]
+assert span2["context"]["trace_id"] == trace_id
+assert span2["parent_id"] == s1_span_id
+assert span2["attributes"]["attr2"] == "val2"
+assert span2["resource"]["attributes"]["service.name"] == "abc"
+
+@patch("opentelemetry.sdk.trace.export.ConsoleSpanExporter")
+@patch("airflow.traces.otel_tracer.conf")
+def test_dag_tracer(self, conf_a, exporter):
+# necessary to speed up the span to be emitted
+with env_vars({"OTEL_BSP_SCHEDULE_DELAY": "1"}):
+log = logging.getLogger("TestOtelTrace.test_dag_tracer")
+log.setLevel(logging.DEBUG)
+conf_a.get.return_value = "abc"
+conf_a.getint.return_value = 123
+"""this will enable debug to set - which outputs the result
+to the console exporter"""
+conf_a.getboolean.return_value = True
+
+# mocking console exporter with in mem exporter for better 
assertion
+in_mem_exporter = InMemorySpanExporter()
+exporter.return_value = in_mem_exporter
+
+now = datetime.now()
+dag_run = MagicMock()
+dag_run.conf = {
+TRACEPARENT: 
"00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01",

Review Comment:
   Not significant, but this particular one is from w3c doc on traceparent (as 
an example) - so I resued it from there. Anyway, the suggestion seems to be 
legit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [AIP-49] OpenTelemetry Traces for Apache Airflow [airflow]

2024-03-08 Thread via GitHub


howardyoo commented on code in PR #37948:
URL: https://github.com/apache/airflow/pull/37948#discussion_r1518444796


##
scripts/ci/docker-compose/integration-otel.yml:
##
@@ -54,14 +54,31 @@ services:
   - ./grafana/volume/dashboards:/grafana/dashboards
   - ./grafana/volume/provisioning:/grafana/provisioning
 
+  jaeger:
+image: jaegertracing/all-in-one
+container_name: "breeze-jaeger"
+environment:
+  COLLECTOR_OTLP_ENABLED: true
+  COLLLECTOR_ZIPKIN_HOST_PORT: :9411

Review Comment:
   The colon does not seem to be necessary - in 'some' documentation it was 
there, but I am now certain it is a typo. [Jaeger 
Doc](https://www.jaegertracing.io/docs/1.6/getting-started/)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [AIP-49] OpenTelemetry Traces for Apache Airflow [airflow]

2024-03-08 Thread via GitHub


howardyoo commented on code in PR #37948:
URL: https://github.com/apache/airflow/pull/37948#discussion_r1518444257


##
airflow/traces/otel_tracer.py:
##
@@ -0,0 +1,333 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import logging
+import random
+
+from opentelemetry import trace
+from opentelemetry.context import create_key
+from opentelemetry.exporter.otlp.proto.http.trace_exporter import 
OTLPSpanExporter
+from opentelemetry.sdk.resources import HOST_NAME, SERVICE_NAME, Resource
+from opentelemetry.sdk.trace import Span, Tracer as OpenTelemetryTracer, 
TracerProvider
+from opentelemetry.sdk.trace.export import BatchSpanProcessor, 
ConsoleSpanExporter
+from opentelemetry.sdk.trace.id_generator import IdGenerator
+from opentelemetry.trace import Link, NonRecordingSpan, SpanContext, 
TraceFlags, Tracer
+from opentelemetry.trace.span import INVALID_SPAN_ID, INVALID_TRACE_ID
+
+from airflow.configuration import conf
+from airflow.traces import (
+TRACEPARENT,
+TRACESTATE,
+)
+from airflow.traces.utils import (
+gen_dag_span_id,
+gen_span_id,
+gen_trace_id,
+parse_traceparent,
+parse_tracestate,
+)
+from airflow.utils.net import get_hostname
+
+log = logging.getLogger(__name__)
+
+_NEXT_ID = create_key("next_id")
+
+
+class OtelTrace:
+"""
+OpenTelemetry Tracing Class.
+
+Handles all tracing requirements such as getting the tracer, and starting 
a new span.
+When OTEL is enabled, the Trace class will be replaced by this class.
+"""
+
+def __init__(self, span_exporter: ConsoleSpanExporter | OTLPSpanExporter, 
tags=None):
+self.span_exporter = span_exporter
+self.span_processor = BatchSpanProcessor(self.span_exporter)
+self.tags = tags
+self.otel_service = conf.get("traces", "otel_service")
+
+def get_tracer(self, component: str) -> OpenTelemetryTracer | Tracer:
+"""Get tracer from a given component."""
+resource = Resource(attributes={HOST_NAME: get_hostname(), 
SERVICE_NAME: self.otel_service})
+tracer_provider = TracerProvider(resource=resource)
+# span_processor = BatchSpanProcessor(self.span_exporter)

Review Comment:
   Yes, will remove.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [AIP-49] OpenTelemetry Traces for Apache Airflow [airflow]

2024-03-08 Thread via GitHub


howardyoo commented on code in PR #37948:
URL: https://github.com/apache/airflow/pull/37948#discussion_r1518443375


##
airflow/traces/otel_tracer.py:
##
@@ -0,0 +1,333 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import logging
+import random
+
+from opentelemetry import trace
+from opentelemetry.context import create_key
+from opentelemetry.exporter.otlp.proto.http.trace_exporter import 
OTLPSpanExporter
+from opentelemetry.sdk.resources import HOST_NAME, SERVICE_NAME, Resource
+from opentelemetry.sdk.trace import Span, Tracer as OpenTelemetryTracer, 
TracerProvider
+from opentelemetry.sdk.trace.export import BatchSpanProcessor, 
ConsoleSpanExporter
+from opentelemetry.sdk.trace.id_generator import IdGenerator
+from opentelemetry.trace import Link, NonRecordingSpan, SpanContext, 
TraceFlags, Tracer
+from opentelemetry.trace.span import INVALID_SPAN_ID, INVALID_TRACE_ID
+
+from airflow.configuration import conf
+from airflow.traces import (
+TRACEPARENT,
+TRACESTATE,
+)
+from airflow.traces.utils import (
+gen_dag_span_id,
+gen_span_id,
+gen_trace_id,
+parse_traceparent,
+parse_tracestate,
+)
+from airflow.utils.net import get_hostname
+
+log = logging.getLogger(__name__)
+
+_NEXT_ID = create_key("next_id")
+
+
+class OtelTrace:
+"""
+OpenTelemetry Tracing Class.
+
+Handles all tracing requirements such as getting the tracer, and starting 
a new span.
+When OTEL is enabled, the Trace class will be replaced by this class.
+"""

Review Comment:
   I guess, that's because the paragraph is in multi-line? I'll make 
modification accordingly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [AIP-49] OpenTelemetry Traces for Apache Airflow [airflow]

2024-03-08 Thread via GitHub


howardyoo commented on code in PR #37948:
URL: https://github.com/apache/airflow/pull/37948#discussion_r1518443166


##
airflow/config_templates/config.yml:
##
@@ -1130,6 +1130,56 @@ metrics:
   type: string
   example: ~
   default: "False"
+traces:
+  description: |
+Distributed traces integration settings.
+  options:
+otel_on:
+  description: |
+Enables sending traces to OpenTelemetry.
+  version_added: 2.9.0
+  type: string
+  example: ~
+  default: "False"
+otel_host:
+  description: |
+Specifies the hostname or IP address of the OpenTelemetry Collector to 
which Airflow sends
+traces.

Review Comment:
   That is a good topic. I kind of thought about combining all together, but 
then resorted to them being separate, because I know that some users would not 
want to have traces turned on, while sending metrics, or vice versa. The reason 
being is that users may not want traces for cost or simply they are not using 
it. In some other cases, users may want to send metrics to metrics monitoring 
system only, or directly (e.g. chronosphere), or have traces sent to some other 
traces system separately (e.g. Jaeger, zipkin, or honeycomb). I know we assume 
that users would definitely use OTEL collector to do that, but some users may 
prefer skipping the agent and just sending the OTEL metrics right into some end 
point directly.
   
   For now, I'd like to have them separated , and this may be true for the OTEL 
logs as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [AIP-49] OpenTelemetry Traces for Apache Airflow [airflow]

2024-03-08 Thread via GitHub


howardyoo commented on code in PR #37948:
URL: https://github.com/apache/airflow/pull/37948#discussion_r1518442156


##
airflow/traces/otel_tracer.py:
##
@@ -0,0 +1,333 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import logging
+import random
+
+from opentelemetry import trace
+from opentelemetry.context import create_key
+from opentelemetry.exporter.otlp.proto.http.trace_exporter import 
OTLPSpanExporter
+from opentelemetry.sdk.resources import HOST_NAME, SERVICE_NAME, Resource
+from opentelemetry.sdk.trace import Span, Tracer as OpenTelemetryTracer, 
TracerProvider
+from opentelemetry.sdk.trace.export import BatchSpanProcessor, 
ConsoleSpanExporter
+from opentelemetry.sdk.trace.id_generator import IdGenerator
+from opentelemetry.trace import Link, NonRecordingSpan, SpanContext, 
TraceFlags, Tracer
+from opentelemetry.trace.span import INVALID_SPAN_ID, INVALID_TRACE_ID
+
+from airflow.configuration import conf
+from airflow.traces import (
+TRACEPARENT,
+TRACESTATE,
+)
+from airflow.traces.utils import (
+gen_dag_span_id,
+gen_span_id,
+gen_trace_id,
+parse_traceparent,
+parse_tracestate,
+)
+from airflow.utils.net import get_hostname
+
+log = logging.getLogger(__name__)
+
+_NEXT_ID = create_key("next_id")
+
+
+class OtelTrace:
+"""
+OpenTelemetry Tracing Class.
+
+Handles all tracing requirements such as getting the tracer, and starting 
a new span.
+When OTEL is enabled, the Trace class will be replaced by this class.
+"""
+
+def __init__(self, span_exporter: ConsoleSpanExporter | OTLPSpanExporter, 
tags=None):
+self.span_exporter = span_exporter
+self.span_processor = BatchSpanProcessor(self.span_exporter)
+self.tags = tags
+self.otel_service = conf.get("traces", "otel_service")
+
+def get_tracer(self, component: str) -> OpenTelemetryTracer | Tracer:
+"""Get tracer from a given component."""
+resource = Resource(attributes={HOST_NAME: get_hostname(), 
SERVICE_NAME: self.otel_service})
+tracer_provider = TracerProvider(resource=resource)
+# span_processor = BatchSpanProcessor(self.span_exporter)
+tracer_provider.add_span_processor(self.span_processor)
+tracer = tracer_provider.get_tracer(component)
+return tracer
+
+def get_tracer_with_id(
+self, component: str, trace_id: int | None = None, span_id: int | None 
= None
+) -> OpenTelemetryTracer | Tracer:
+"""Tracer that will use special AirflowOtelIdGenerator to control 
producing certain span and trace id."""
+resource = Resource(attributes={HOST_NAME: get_hostname(), 
SERVICE_NAME: self.otel_service})
+tracer_provider = TracerProvider(
+resource=resource, 
id_generator=AirflowOtelIdGenerator(span_id=span_id, trace_id=trace_id)
+)
+# span_processor = BatchSpanProcessor(self.span_exporter, 
schedule_delay_millis=1)

Review Comment:
   right, good catch!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >