Re: [PR] Bugfix to correct GCSHook being called even when not required with BeamRunPythonPipelineOperator [airflow]

2024-04-19 Thread via GitHub


boring-cyborg[bot] commented on PR #38716:
URL: https://github.com/apache/airflow/pull/38716#issuecomment-2066107379

   Awesome work, congrats on your first merged pull request! You are invited to 
check our [Issue Tracker](https://github.com/apache/airflow/issues) for 
additional contributions.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Bugfix to correct GCSHook being called even when not required with BeamRunPythonPipelineOperator [airflow]

2024-04-19 Thread via GitHub


potiuk merged PR #38716:
URL: https://github.com/apache/airflow/pull/38716


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Bugfix to correct GCSHook being called even when not required with BeamRunPythonPipelineOperator [airflow]

2024-04-19 Thread via GitHub


potiuk commented on PR #38716:
URL: https://github.com/apache/airflow/pull/38716#issuecomment-2066107147

   Nothing to be sorry about :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Bugfix to correct GCSHook being called even when not required with BeamRunPythonPipelineOperator [airflow]

2024-04-18 Thread via GitHub


zstrathe commented on PR #38716:
URL: https://github.com/apache/airflow/pull/38716#issuecomment-2065462531

   @potiuk Static checks should pass now. I sincerely apologize for that, I'll 
ensure to run pre-commit checks for any future contributions.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Bugfix to correct GCSHook being called even when not required with BeamRunPythonPipelineOperator [airflow]

2024-04-18 Thread via GitHub


potiuk commented on PR #38716:
URL: https://github.com/apache/airflow/pull/38716#issuecomment-2065216353

   Static checks need fixing.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Bugfix to correct GCSHook being called even when not required with BeamRunPythonPipelineOperator [airflow]

2024-04-18 Thread via GitHub


e-galan commented on PR #38716:
URL: https://github.com/apache/airflow/pull/38716#issuecomment-2063660272

   The changes look good to me @zstrathe. What do you think @eladkal ? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Bugfix to correct GCSHook being called even when not required with BeamRunPythonPipelineOperator [airflow]

2024-04-16 Thread via GitHub


zstrathe commented on code in PR #38716:
URL: https://github.com/apache/airflow/pull/38716#discussion_r1568197801


##
tests/providers/apache/beam/operators/test_beam.py:
##
@@ -256,6 +256,59 @@ def test_on_kill_direct_runner(self, _, dataflow_mock, __):
 op.on_kill()
 dataflow_cancel_job.assert_not_called()
 
+@mock.patch(BEAM_OPERATOR_PATH.format("BeamHook"))
+@mock.patch(BEAM_OPERATOR_PATH.format("GCSHook"))
+def test_execute_gcs_hook_called_only_with_gs_prefix(self, mock_gcs_hook, 
_):

Review Comment:
   @e-galan Thanks, that's now updated:
   
   
![image](https://github.com/apache/airflow/assets/59071005/abd7d7aa-5574-4dc2-bae7-12c7f9c8aedb)
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Bugfix to correct GCSHook being called even when not required with BeamRunPythonPipelineOperator [airflow]

2024-04-16 Thread via GitHub


e-galan commented on code in PR #38716:
URL: https://github.com/apache/airflow/pull/38716#discussion_r1566949795


##
tests/providers/apache/beam/operators/test_beam.py:
##
@@ -256,6 +256,59 @@ def test_on_kill_direct_runner(self, _, dataflow_mock, __):
 op.on_kill()
 dataflow_cancel_job.assert_not_called()
 
+@mock.patch(BEAM_OPERATOR_PATH.format("BeamHook"))
+@mock.patch(BEAM_OPERATOR_PATH.format("GCSHook"))
+def test_execute_gcs_hook_called_only_with_gs_prefix(self, mock_gcs_hook, 
_):

Review Comment:
   @zstrathe In my opinion it is better to break this up into 3 separate tests. 
Otherwise the code looks good.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Bugfix to correct GCSHook being called even when not required with BeamRunPythonPipelineOperator [airflow]

2024-04-14 Thread via GitHub


zstrathe commented on code in PR #38716:
URL: https://github.com/apache/airflow/pull/38716#discussion_r1565032713


##
tests/providers/apache/beam/operators/test_beam.py:
##


Review Comment:
   @e-galan @eladkal 
   I added test_execute_gcs_hook_called_only_with_gs_prefix() to the 
TestBeamRunPythonPipelineOperator. Please let me know if this is sufficient or 
if any changes/additions are needed. Thanks!
   
   
![image](https://github.com/apache/airflow/assets/59071005/b2fba319-8fca-454c-937c-d7d35ad2421b)
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Bugfix to correct GCSHook being called even when not required with BeamRunPythonPipelineOperator [airflow]

2024-04-14 Thread via GitHub


zstrathe commented on code in PR #38716:
URL: https://github.com/apache/airflow/pull/38716#discussion_r1565032713


##
tests/providers/apache/beam/operators/test_beam.py:
##


Review Comment:
   @e-galan @eladkal 
   I added test_execute_gcs_hook_called_only_with_gs_prefix() to the 
TestBeamRunPythonPipelineOperator. Please let me know if this is sufficient or 
if any changes/additions are needed. Thanks!
   
   
https://github.com/zstrathe/airflow/blob/566f07360b2c2393d64b6432e14d0e828d758b95/tests/providers/apache/beam/operators/test_beam.py#L259-L310



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Bugfix to correct GCSHook being called even when not required with BeamRunPythonPipelineOperator [airflow]

2024-04-14 Thread via GitHub


zstrathe commented on code in PR #38716:
URL: https://github.com/apache/airflow/pull/38716#discussion_r1565032713


##
tests/providers/apache/beam/operators/test_beam.py:
##


Review Comment:
   I added test_execute_gcs_hook_called_only_with_gs_prefix() to the 
TestBeamRunPythonPipelineOperator. Please let me know if this is sufficient or 
if any changes/additions are needed. Thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Bugfix to correct GCSHook being called even when not required with BeamRunPythonPipelineOperator [airflow]

2024-04-14 Thread via GitHub


zstrathe commented on code in PR #38716:
URL: https://github.com/apache/airflow/pull/38716#discussion_r1565030327


##
airflow/providers/apache/beam/operators/beam.py:
##
@@ -364,11 +364,13 @@ def execute(self, context: Context):
 
 def execute_sync(self, context: Context):
 with ExitStack() as exit_stack:
-gcs_hook = GCSHook(gcp_conn_id=self.gcp_conn_id)
 if self.py_file.lower().startswith("gs://"):
+gcs_hook = GCSHook(gcp_conn_id=self.gcp_conn_id)
 tmp_gcs_file = 
exit_stack.enter_context(gcs_hook.provide_file(object_url=self.py_file))
 self.py_file = tmp_gcs_file.name
 if self.snake_case_pipeline_options.get("requirements_file", 
"").startswith("gs://"):
+if 'gcs_hook' not in locals():

Review Comment:
   @e-galan This change is now made.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Bugfix to correct GCSHook being called even when not required with BeamRunPythonPipelineOperator [airflow]

2024-04-09 Thread via GitHub


zstrathe commented on code in PR #38716:
URL: https://github.com/apache/airflow/pull/38716#discussion_r1555982325


##
airflow/providers/apache/beam/operators/beam.py:
##
@@ -364,11 +364,13 @@ def execute(self, context: Context):
 
 def execute_sync(self, context: Context):
 with ExitStack() as exit_stack:
-gcs_hook = GCSHook(gcp_conn_id=self.gcp_conn_id)
 if self.py_file.lower().startswith("gs://"):
+gcs_hook = GCSHook(gcp_conn_id=self.gcp_conn_id)
 tmp_gcs_file = 
exit_stack.enter_context(gcs_hook.provide_file(object_url=self.py_file))
 self.py_file = tmp_gcs_file.name
 if self.snake_case_pipeline_options.get("requirements_file", 
"").startswith("gs://"):
+if 'gcs_hook' not in locals():

Review Comment:
   @e-galan thanks for the feedback! I will go ahead and remove that check then.
   
   EDIT: removed some other potential issues; will separately look into those 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Bugfix to correct GCSHook being called even when not required with BeamRunPythonPipelineOperator [airflow]

2024-04-08 Thread via GitHub


zstrathe commented on code in PR #38716:
URL: https://github.com/apache/airflow/pull/38716#discussion_r1555982325


##
airflow/providers/apache/beam/operators/beam.py:
##
@@ -364,11 +364,13 @@ def execute(self, context: Context):
 
 def execute_sync(self, context: Context):
 with ExitStack() as exit_stack:
-gcs_hook = GCSHook(gcp_conn_id=self.gcp_conn_id)
 if self.py_file.lower().startswith("gs://"):
+gcs_hook = GCSHook(gcp_conn_id=self.gcp_conn_id)
 tmp_gcs_file = 
exit_stack.enter_context(gcs_hook.provide_file(object_url=self.py_file))
 self.py_file = tmp_gcs_file.name
 if self.snake_case_pipeline_options.get("requirements_file", 
"").startswith("gs://"):
+if 'gcs_hook' not in locals():

Review Comment:
   @e-galan thanks for the feedback! I will go ahead and remove that check then.
   
   In addition, after looking through the code some more, it looks like any 
other GCS resources may not be instantiated if supplied as pipeline_options? 
For example, the below from 
tests/system/providers/apache/beam/example_python.py:
   ```
   start_python_pipeline_direct_runner = BeamRunPythonPipelineOperator(
   task_id="start_python_pipeline_direct_runner",
   py_file=GCS_PYTHON,
   py_options=[],
   pipeline_options={"output": GCS_OUTPUT},
   py_requirements=["apache-beam[gcp]==2.46.0"],
   py_interpreter="python3",
   py_system_site_packages=False,
   )
   ```
   Would  ```pipeline_options={"output": GCS_OUTPUT}``` correctly result in the 
output file being updated in GCS? 
   
   If not, I think that somehow every value in pipeline_options should be 
recursively parsed for conversion to a GCS resource, with the complication that 
"output" files would need to utilize ```GCSHook.provide_file_and_upload()``` 
instead of ```GCSHook.provide_file()```. And I think that could be solved by 
adding another file prefix to distinguish GCS resources that need to be 
uploaded (i.e., ```"gcs-upload://"```), and updating the docs to note that.
   
   If this all sounds correct, I'd be happy to add to this PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Bugfix to correct GCSHook being called even when not required with BeamRunPythonPipelineOperator [airflow]

2024-04-08 Thread via GitHub


zstrathe commented on code in PR #38716:
URL: https://github.com/apache/airflow/pull/38716#discussion_r1555982325


##
airflow/providers/apache/beam/operators/beam.py:
##
@@ -364,11 +364,13 @@ def execute(self, context: Context):
 
 def execute_sync(self, context: Context):
 with ExitStack() as exit_stack:
-gcs_hook = GCSHook(gcp_conn_id=self.gcp_conn_id)
 if self.py_file.lower().startswith("gs://"):
+gcs_hook = GCSHook(gcp_conn_id=self.gcp_conn_id)
 tmp_gcs_file = 
exit_stack.enter_context(gcs_hook.provide_file(object_url=self.py_file))
 self.py_file = tmp_gcs_file.name
 if self.snake_case_pipeline_options.get("requirements_file", 
"").startswith("gs://"):
+if 'gcs_hook' not in locals():

Review Comment:
   @e-galan thanks for the feedback! I will go ahead and remove that check then.
   
   In addition, after looking through the code some more, it looks like any 
other GCS resources may not be instantiated if supplied as pipeline_options? 
For example, the below from 
tests/system/providers/apache/beam/example_python.py:
   ```
   start_python_pipeline_direct_runner = BeamRunPythonPipelineOperator(
   task_id="start_python_pipeline_direct_runner",
   py_file=GCS_PYTHON,
   py_options=[],
   pipeline_options={"output": GCS_OUTPUT},
   py_requirements=["apache-beam[gcp]==2.46.0"],
   py_interpreter="python3",
   py_system_site_packages=False,
   )
   ```
   Would  ```pipeline_options={"output": GCS_OUTPUT}``` correctly result in the 
output file being updated in GCS? 
   
   If not, I think that somehow every value in pipeline_options should be 
parsed for conversion to a GCS resource, with the complication that "output" 
files would need to utilize ```GCSHook.provide_file_and_upload()``` instead of 
```GCSHook.provide_file()```. And I think that could be solved by adding 
another file prefix to distinguish GCS resources that need to be uploaded 
(i.e., ```"gcs-upload://"```), and updating the docs to note that.
   
   If this all sounds correct, I'd be happy to add to this PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Bugfix to correct GCSHook being called even when not required with BeamRunPythonPipelineOperator [airflow]

2024-04-08 Thread via GitHub


e-galan commented on code in PR #38716:
URL: https://github.com/apache/airflow/pull/38716#discussion_r1555830278


##
airflow/providers/apache/beam/operators/beam.py:
##
@@ -364,11 +364,13 @@ def execute(self, context: Context):
 
 def execute_sync(self, context: Context):
 with ExitStack() as exit_stack:
-gcs_hook = GCSHook(gcp_conn_id=self.gcp_conn_id)
 if self.py_file.lower().startswith("gs://"):
+gcs_hook = GCSHook(gcp_conn_id=self.gcp_conn_id)
 tmp_gcs_file = 
exit_stack.enter_context(gcs_hook.provide_file(object_url=self.py_file))
 self.py_file = tmp_gcs_file.name
 if self.snake_case_pipeline_options.get("requirements_file", 
"").startswith("gs://"):
+if 'gcs_hook' not in locals():

Review Comment:
   I don't think that this check is necessary, there won't be much harm in 
re-instantiating the hook. 
   
   If you'd like to avoid re-instantiating it, then you could do the following: 
add `gcs_hook` as another class instance attribute (with `None` as the default 
value)  and set it with a dedicated method. You could even include checks such 
as `if self.snake_case_pipeline_options.get("requirements_file", 
"").startswith("gs://")` and `if self.py_file.lower().startswith("gs://")` into 
the method. This will require additional unit tests though.



##
airflow/providers/apache/beam/operators/beam.py:
##
@@ -364,11 +364,13 @@ def execute(self, context: Context):
 
 def execute_sync(self, context: Context):
 with ExitStack() as exit_stack:
-gcs_hook = GCSHook(gcp_conn_id=self.gcp_conn_id)
 if self.py_file.lower().startswith("gs://"):
+gcs_hook = GCSHook(gcp_conn_id=self.gcp_conn_id)
 tmp_gcs_file = 
exit_stack.enter_context(gcs_hook.provide_file(object_url=self.py_file))
 self.py_file = tmp_gcs_file.name
 if self.snake_case_pipeline_options.get("requirements_file", 
"").startswith("gs://"):
+if 'gcs_hook' not in locals():

Review Comment:
   @zstrathe I don't think that this check is necessary, there won't be much 
harm in re-instantiating the hook. 
   
   If you'd like to avoid re-instantiating it, then you could do the following: 
add `gcs_hook` as another class instance attribute (with `None` as the default 
value)  and set it with a dedicated method. You could even include checks such 
as `if self.snake_case_pipeline_options.get("requirements_file", 
"").startswith("gs://")` and `if self.py_file.lower().startswith("gs://")` into 
the method. This will require additional unit tests though.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Bugfix to correct GCSHook being called even when not required with BeamRunPythonPipelineOperator [airflow]

2024-04-08 Thread via GitHub


e-galan commented on code in PR #38716:
URL: https://github.com/apache/airflow/pull/38716#discussion_r1555810282


##
airflow/providers/apache/beam/operators/beam.py:
##
@@ -364,11 +364,13 @@ def execute(self, context: Context):
 
 def execute_sync(self, context: Context):
 with ExitStack() as exit_stack:
-gcs_hook = GCSHook(gcp_conn_id=self.gcp_conn_id)
 if self.py_file.lower().startswith("gs://"):
+gcs_hook = GCSHook(gcp_conn_id=self.gcp_conn_id)

Review Comment:
   Hello @zstrathe! You are right, the Google connection should be optional, as 
it is needed only for particular use cases, and yours is not among them. 
   Your change on line 368 looks correct to me.
   
   I would only suggest to update the unit tests that use this operator. In 
particular, I think 
[this](https://github.com/apache/airflow/blob/main/tests/providers/apache/beam/operators/test_beam.py#L142C5-L169C10)
 and 
[this](https://github.com/apache/airflow/blob/main/tests/providers/apache/beam/operators/test_beam.py#L251C5-L257C48)
 tests will need some refactoring, but you can add new ones if you'd like.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Bugfix to correct GCSHook being called even when not required with BeamRunPythonPipelineOperator [airflow]

2024-04-03 Thread via GitHub


zstrathe commented on code in PR #38716:
URL: https://github.com/apache/airflow/pull/38716#discussion_r1550311342


##
airflow/providers/apache/beam/operators/beam.py:
##
@@ -364,11 +364,13 @@ def execute(self, context: Context):
 
 def execute_sync(self, context: Context):
 with ExitStack() as exit_stack:
-gcs_hook = GCSHook(gcp_conn_id=self.gcp_conn_id)
 if self.py_file.lower().startswith("gs://"):
+gcs_hook = GCSHook(gcp_conn_id=self.gcp_conn_id)

Review Comment:
   Correct. It's my understanding that I should be able to run a Beam pipeline 
via airflow locally without any connection to google. Thank you.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Bugfix to correct GCSHook being called even when not required with BeamRunPythonPipelineOperator [airflow]

2024-04-03 Thread via GitHub


eladkal commented on code in PR #38716:
URL: https://github.com/apache/airflow/pull/38716#discussion_r1550301331


##
airflow/providers/apache/beam/operators/beam.py:
##
@@ -364,11 +364,13 @@ def execute(self, context: Context):
 
 def execute_sync(self, context: Context):
 with ExitStack() as exit_stack:
-gcs_hook = GCSHook(gcp_conn_id=self.gcp_conn_id)
 if self.py_file.lower().startswith("gs://"):
+gcs_hook = GCSHook(gcp_conn_id=self.gcp_conn_id)

Review Comment:
   If I get it right you simply say that google connection can be optional.
   As I am not expert in this domain lets ask for @VladaZakharova and her team 
to review this PR



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Bugfix to correct GCSHook being called even when not required with BeamRunPythonPipelineOperator [airflow]

2024-04-03 Thread via GitHub


zstrathe commented on code in PR #38716:
URL: https://github.com/apache/airflow/pull/38716#discussion_r1550297987


##
airflow/providers/apache/beam/operators/beam.py:
##
@@ -364,11 +364,13 @@ def execute(self, context: Context):
 
 def execute_sync(self, context: Context):
 with ExitStack() as exit_stack:
-gcs_hook = GCSHook(gcp_conn_id=self.gcp_conn_id)
 if self.py_file.lower().startswith("gs://"):
+gcs_hook = GCSHook(gcp_conn_id=self.gcp_conn_id)

Review Comment:
   Hello, thank you for the quick response to this PR. My use case is only for 
local development (hence why I'm not utilizing Google Cloud at all), so please 
do not consider this a high priority to review.
   
   The change is because, when I'm running the Beam pipeline from local storage 
only relative to airflow (with both the beam pipeline .py file and any other 
pipeline assets located in the same Docker container as the airflow worker in 
my case), then the connection to Google Cloud Storage (GCSHook) is not needed 
at all. And my assumption is that this is why I am getting the 
AirflowNotFoundException when the connection is attempted to be established. 
However, I'm new to both Airflow and Beam so I can't say for certain if this is 
actually the root cause for the error. If I'm wrong then I sincerely apologize 
for wasting your time!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Bugfix to correct GCSHook being called even when not required with BeamRunPythonPipelineOperator [airflow]

2024-04-03 Thread via GitHub


eladkal commented on code in PR #38716:
URL: https://github.com/apache/airflow/pull/38716#discussion_r1550265521


##
airflow/providers/apache/beam/operators/beam.py:
##
@@ -364,11 +364,13 @@ def execute(self, context: Context):
 
 def execute_sync(self, context: Context):
 with ExitStack() as exit_stack:
-gcs_hook = GCSHook(gcp_conn_id=self.gcp_conn_id)
 if self.py_file.lower().startswith("gs://"):
+gcs_hook = GCSHook(gcp_conn_id=self.gcp_conn_id)

Review Comment:
   Can you please elaborate about this change?
   In https://github.com/apache/airflow/issues/38713 you showed traceback that 
connection isn't found
   ```
 File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/models/connection.py",
 line 514, in get_connection_from_secrets
   raise AirflowNotFoundException(f"The conn_id `{conn_id}` isn't defined")
   airflow.exceptions.AirflowNotFoundException: The conn_id 
`google_cloud_default` isn't defined
   ```
   
   I do not understand how this fix solves the issue.
   Why does it makes a different (for finding the connection) if the hook is 
initialized before or after the `if`?
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Bugfix to correct GCSHook being called even when not required with BeamRunPythonPipelineOperator [airflow]

2024-04-03 Thread via GitHub


eladkal commented on code in PR #38716:
URL: https://github.com/apache/airflow/pull/38716#discussion_r1550265521


##
airflow/providers/apache/beam/operators/beam.py:
##
@@ -364,11 +364,13 @@ def execute(self, context: Context):
 
 def execute_sync(self, context: Context):
 with ExitStack() as exit_stack:
-gcs_hook = GCSHook(gcp_conn_id=self.gcp_conn_id)
 if self.py_file.lower().startswith("gs://"):
+gcs_hook = GCSHook(gcp_conn_id=self.gcp_conn_id)

Review Comment:
   Can you please elaborate about this change?
   In https://github.com/apache/airflow/issues/38713 you showed traceback that 
connection isn't found
   ```
 File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/models/connection.py",
 line 514, in get_connection_from_secrets
   raise AirflowNotFoundException(f"The conn_id `{conn_id}` isn't defined")
   airflow.exceptions.AirflowNotFoundException: The conn_id 
`google_cloud_default` isn't defined
   ```
   
   I do not understand how this fix solves the issue.
   Why does it makes a different (for finding the connection) if the hook is 
initialized before or after the `if`?
   
   Connections should be defined before the code runs otherwirse the conn won't 
be found regardless of which part of the code invokes it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Bugfix to correct GCSHook being called even when not required with BeamRunPythonPipelineOperator [airflow]

2024-04-03 Thread via GitHub


boring-cyborg[bot] commented on PR #38716:
URL: https://github.com/apache/airflow/pull/38716#issuecomment-2035003228

   Congratulations on your first Pull Request and welcome to the Apache Airflow 
community! If you have any issues or are unsure about any anything please check 
our Contributors' Guide 
(https://github.com/apache/airflow/blob/main/contributing-docs/README.rst)
   Here are some useful points:
   - Pay attention to the quality of your code (ruff, mypy and type 
annotations). Our [pre-commits]( 
https://github.com/apache/airflow/blob/main/contributing-docs/08_static_code_checks.rst#prerequisites-for-pre-commit-hooks)
 will help you with that.
   - In case of a new feature add useful documentation (in docstrings or in 
`docs/` directory). Adding a new operator? Check this short 
[guide](https://github.com/apache/airflow/blob/main/docs/apache-airflow/howto/custom-operator.rst)
 Consider adding an example DAG that shows how users should use it.
   - Consider using [Breeze 
environment](https://github.com/apache/airflow/blob/main/dev/breeze/doc/README.rst)
 for testing locally, it's a heavy docker but it ships with a working Airflow 
and a lot of integrations.
   - Be patient and persistent. It might take some time to get a review or get 
the final approval from Committers.
   - Please follow [ASF Code of 
Conduct](https://www.apache.org/foundation/policies/conduct) for all 
communication including (but not limited to) comments on Pull Requests, Mailing 
list and Slack.
   - Be sure to read the [Airflow Coding style]( 
https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#coding-style-and-best-practices).
   - Always keep your Pull Requests rebased, otherwise your build might fail 
due to changes not related to your commits.
   Apache Airflow is a community-driven project and together we are making it 
better .
   In case of doubts contact the developers at:
   Mailing List: d...@airflow.apache.org
   Slack: https://s.apache.org/airflow-slack
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] Bugfix to correct GCSHook being called even when not required with BeamRunPythonPipelineOperator [airflow]

2024-04-03 Thread via GitHub


zstrathe opened a new pull request, #38716:
URL: https://github.com/apache/airflow/pull/38716

   This resolves #38713 by modifying 
BeamRunPythonPipelineOperator.execute_sync() to only call GCSHook when necessary
   
   
   
   
   
   
   
   
   ---
   **^ Add meaningful description above**
   Read the **[Pull Request 
Guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#pull-request-guidelines)**
 for more information.
   In case of fundamental code changes, an Airflow Improvement Proposal 
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals))
 is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party 
License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in a 
newsfragment file, named `{pr_number}.significant.rst` or 
`{issue_number}.significant.rst`, in 
[newsfragments](https://github.com/apache/airflow/tree/main/newsfragments).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org