Re: [PR] Implemented ML Pipeline Continuous new table rows RunInference [beam]

2026-03-30 Thread via GitHub


damccorm merged PR #37647:
URL: https://github.com/apache/beam/pull/37647


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Implemented ML Pipeline Continuous new table rows RunInference [beam]

2026-03-26 Thread via GitHub


aIbrahiim commented on PR #37647:
URL: https://github.com/apache/beam/pull/37647#issuecomment-4132993806

   @damccorm resolved


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Implemented ML Pipeline Continuous new table rows RunInference [beam]

2026-03-26 Thread via GitHub


gemini-code-assist[bot] commented on code in PR #37647:
URL: https://github.com/apache/beam/pull/37647#discussion_r2993326802


##
sdks/python/apache_beam/examples/inference/table_row_inference.py:
##
@@ -0,0 +1,369 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""A pipeline that uses RunInference to perform inference on table rows.
+
+This pipeline demonstrates ML Pipelines #18: handling continuous new table
+rows with RunInference using table input models. It reads structured data
+(table rows) from a streaming source, performs inference while preserving
+the table schema, and writes results to a table output.
+
+The pipeline supports both streaming and batch modes:
+- Streaming: Reads from Pub/Sub, applies windowing, writes via streaming 
inserts
+- Batch: Reads from file, processes all data, writes via file loads
+
+Example usage for streaming:
+  python table_row_inference.py \
+--mode=streaming \
+--input_subscription=projects/PROJECT/subscriptions/SUBSCRIPTION \
+--output_table=PROJECT:DATASET.TABLE \
+--model_path=gs://BUCKET/model.pkl \
+--feature_columns=feature1,feature2,feature3 \
+--runner=DataflowRunner \
+--project=PROJECT \
+--region=REGION \
+--temp_location=gs://BUCKET/temp
+
+Example usage for batch:
+  python table_row_inference.py \
+--mode=batch \
+--input_file=gs://BUCKET/input.jsonl \
+--output_table=PROJECT:DATASET.TABLE \
+--model_path=gs://BUCKET/model.pkl \
+--feature_columns=feature1,feature2,feature3
+
+  # Batch with file output
+  python table_row_inference.py \
+--mode=batch \
+--input_file=data.jsonl \
+--output_file=predictions.jsonl \
+--model_path=model.pkl \
+--feature_columns=feature1,feature2,feature3
+"""
+
+import argparse
+import hashlib
+import json
+import logging
+from collections.abc import Iterable
+from typing import Any
+from typing import Optional
+
+import apache_beam as beam
+import numpy as np
+from apache_beam.ml.inference.base import KeyedModelHandler
+from apache_beam.ml.inference.base import PredictionResult
+from apache_beam.ml.inference.base import RunInference
+from apache_beam.ml.inference.sklearn_inference import ModelFileType
+from apache_beam.ml.inference.sklearn_inference import SklearnModelHandlerNumpy
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import SetupOptions
+from apache_beam.options.pipeline_options import StandardOptions
+from apache_beam.runners.runner import PipelineResult
+
+
+class TableRowModelHandler(SklearnModelHandlerNumpy):
+  """ModelHandler that processes table rows (beam.Row objects) for inference.
+
+  This handler extends SklearnModelHandlerNumpy to work with structured
+  table data represented as beam.Row objects. It extracts specified feature
+  columns from the row and converts them to numpy arrays for model input.
+
+  Attributes:
+feature_columns: List of column names to extract as features from input 
rows

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   The indentation for the `Attributes` section in the docstring is incorrect. 
According to PEP 257 (Docstring Conventions), the descriptions for attributes 
should be indented to align with the start of their respective descriptions, 
typically one level deeper than the `Attributes` keyword itself.
   
   ```python
 Attributes:
   feature_columns: List of column names to extract as features from input 
rows
   ```



##
sdks/python/apache_beam/examples/inference/table_row_batch_example.py:
##
@@ -0,0 +1,194 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed u

Re: [PR] Implemented ML Pipeline Continuous new table rows RunInference [beam]

2026-03-26 Thread via GitHub


aIbrahiim commented on PR #37647:
URL: https://github.com/apache/beam/pull/37647#issuecomment-4132678268

   /gemini review
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Implemented ML Pipeline Continuous new table rows RunInference [beam]

2026-03-19 Thread via GitHub


damccorm commented on PR #37647:
URL: https://github.com/apache/beam/pull/37647#issuecomment-4093087573

   Thanks - could you address the 2 gemini comments as well? Otherwise, this 
looks good


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Implemented ML Pipeline Continuous new table rows RunInference [beam]

2026-03-09 Thread via GitHub


aIbrahiim commented on code in PR #37647:
URL: https://github.com/apache/beam/pull/37647#discussion_r2904182635


##
sdks/python/apache_beam/ml/inference/base_test.py:
##
@@ -1157,10 +1155,9 @@ def test_run_inference_with_iterable_side_input(self):
   FakeModelHandler(), model_metadata_pcoll=side_input))
   test_pipeline.run()
 
-self.assertTrue(
-'PCollection of size 2 with more than one element accessed as a '
-'singleton view. First two elements encountered are' in str(
-e.exception))
+msg = str(e.exception)
+self.assertIn('singleton', msg, msg='Expected singleton view error')
+self.assertIn('more than one', msg, msg='Expected multiple-elements error')

Review Comment:
   The test was failing in CI because it asserted on the exact exception 
message from pvalue.py so when the message was reworded or wrapped like by the 
runner or environment the assertion failed even though the right exception was 
still raised so i relaxed the assertion to check that the message contains 
""singleton" and "more than one" so we still verify the intended behavior which 
is multiple elements used as a singleton without the depending on the exact 
wording



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Implemented ML Pipeline Continuous new table rows RunInference [beam]

2026-03-06 Thread via GitHub


tvalentyn commented on PR #37647:
URL: https://github.com/apache/beam/pull/37647#issuecomment-4014971824

   waiting on author


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Implemented ML Pipeline Continuous new table rows RunInference [beam]

2026-03-06 Thread via GitHub


github-actions[bot] commented on PR #37647:
URL: https://github.com/apache/beam/pull/37647#issuecomment-4011443068

   Reminder, please take a look at this pr: @tvalentyn @Abacn 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Implemented ML Pipeline Continuous new table rows RunInference [beam]

2026-02-23 Thread via GitHub


aIbrahiim closed pull request #37647: Implemented ML Pipeline Continuous new 
table rows RunInference
URL: https://github.com/apache/beam/pull/37647


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Implemented ML Pipeline Continuous new table rows RunInference [beam]

2026-02-20 Thread via GitHub


codecov[bot] commented on PR #37647:
URL: https://github.com/apache/beam/pull/37647#issuecomment-3936452840

   ## 
[Codecov](https://app.codecov.io/gh/apache/beam/pull/37647?dropdown=coverage&src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 Report
   :x: Patch coverage is `2.83019%` with `103 lines` in your changes missing 
coverage. Please review.
   :white_check_mark: Project coverage is 40.06%. Comparing base 
([`600bd61`](https://app.codecov.io/gh/apache/beam/commit/600bd612dc2ef495fef08675398361159f7a03df?dropdown=coverage&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache))
 to head 
([`331aa64`](https://app.codecov.io/gh/apache/beam/commit/331aa6470e01a80167427e2580cdca9b857aa017?dropdown=coverage&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)).
   :warning: Report is 179 commits behind head on master.
   
   | [Files with missing 
lines](https://app.codecov.io/gh/apache/beam/pull/37647?dropdown=coverage&src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 | Patch % | Lines |
   |---|---|---|
   | 
[...beam/testing/load\_tests/dataflow\_cost\_benchmark.py](https://app.codecov.io/gh/apache/beam/pull/37647?src=pr&el=tree&filepath=sdks%2Fpython%2Fapache_beam%2Ftesting%2Fload_tests%2Fdataflow_cost_benchmark.py&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdGVzdGluZy9sb2FkX3Rlc3RzL2RhdGFmbG93X2Nvc3RfYmVuY2htYXJrLnB5)
 | 0.00% | [52 Missing :warning: 
](https://app.codecov.io/gh/apache/beam/pull/37647?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 |
   | 
[...chmarks/inference/table\_row\_inference\_benchmark.py](https://app.codecov.io/gh/apache/beam/pull/37647?src=pr&el=tree&filepath=sdks%2Fpython%2Fapache_beam%2Ftesting%2Fbenchmarks%2Finference%2Ftable_row_inference_benchmark.py&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdGVzdGluZy9iZW5jaG1hcmtzL2luZmVyZW5jZS90YWJsZV9yb3dfaW5mZXJlbmNlX2JlbmNobWFyay5weQ==)
 | 0.00% | [46 Missing :warning: 
](https://app.codecov.io/gh/apache/beam/pull/37647?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 |
   | 
[sdks/python/apache\_beam/testing/test\_pipeline.py](https://app.codecov.io/gh/apache/beam/pull/37647?src=pr&el=tree&filepath=sdks%2Fpython%2Fapache_beam%2Ftesting%2Ftest_pipeline.py&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdGVzdGluZy90ZXN0X3BpcGVsaW5lLnB5)
 | 37.50% | [5 Missing :warning: 
](https://app.codecov.io/gh/apache/beam/pull/37647?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 |
   
   Additional details and impacted files
   
   
   
   ```diff
   @@  Coverage Diff  @@
   ## master   #37647   +/-   ##
   =
   - Coverage 57.13%   40.06%   -17.07% 
 Complexity 3515 3515   
   =
 Files  1228 1225-3 
 Lines189092   188725  -367 
 Branches   3656 3656   
   =
   - Hits 10803975615-32424 
   - Misses77637   109694+32057 
 Partials   3416 3416   
   ```
   
   | 
[Flag](https://app.codecov.io/gh/apache/beam/pull/37647/flags?src=pr&el=flags&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 | Coverage Δ | |
   |---|---|---|
   | 
[python](https://app.codecov.io/gh/apache/beam/pull/37647/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 | `39.68% <2.83%> (-41.12%)` | :arrow_down: |
   
   Flags with carried forward coverage won't be shown. [Click 
here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#carryforward-flags-in-the-pull-request-comment)
 to find out more.
   
   
   [:umbrella: View full report in Codecov by 
Sentry](https://app.codecov.io/gh/apache/beam/pull/37647?dropdown=coverage&src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache).
   
   :loudspeaker: Have feedback on the report? [Share it 
here](https://about.codecov.io/codecov-pr-comment-feedback/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaig

Re: [PR] Implemented ML Pipeline Continuous new table rows RunInference [beam]

2026-02-19 Thread via GitHub


github-actions[bot] commented on PR #37647:
URL: https://github.com/apache/beam/pull/37647#issuecomment-3929697982

   Assigning reviewers:
   
   R: @tvalentyn for label python.
   R: @Abacn for label build.
   R: @Abacn for label website.
   
   
   Note: If you would like to opt out of this review, comment `assign to next 
reviewer`.
   
   Available commands:
   - `stop reviewer notifications` - opt out of the automated review tooling
   - `remind me after tests pass` - tag the comment author after tests pass
   - `waiting on author` - shift the attention set back to the author (any 
comment or push by the author will return the attention set to the reviewers)
   
   The PR bot will only process comments in the main thread (not review 
comments).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Implemented ML Pipeline Continuous new table rows RunInference [beam]

2026-02-19 Thread via GitHub


github-actions[bot] commented on PR #37647:
URL: https://github.com/apache/beam/pull/37647#issuecomment-3928126720

   Checks are failing. Will not request review until checks are succeeding. If 
you'd like to override that behavior, comment `assign set of reviewers`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Implemented ML Pipeline Continuous new table rows RunInference [beam]

2026-02-19 Thread via GitHub


gemini-code-assist[bot] commented on PR #37647:
URL: https://github.com/apache/beam/pull/37647#issuecomment-3927562010

   ## Summary of Changes
   
   Hello @aIbrahiim, I'm Gemini Code Assist[^1]! I'm currently reviewing this 
pull request and will post my feedback shortly. In the meantime, here's a 
summary to help you and other reviewers quickly get up to speed!
   
   This pull request significantly enhances Apache Beam's ML capabilities by 
introducing a robust example pipeline for performing continuous machine 
learning inference on structured table data. The new pipeline, built around the 
`RunInference` transform, is designed to handle both batch and streaming 
inputs, allowing for flexible data processing from sources like files or 
Pub/Sub, applying scikit-learn models, and outputting results to BigQuery while 
maintaining data schema. The changes also include comprehensive support 
infrastructure, such as data generation utilities, unit tests, and integration 
into the performance benchmarking system, ensuring the example is well-tested, 
documented, and its performance characteristics are measurable.
   
   ### Highlights
   
   * **New ML Pipeline for Table Row Inference**: Introduced a new 
`RunInference` pipeline example (`table_row_inference.py`) designed to process 
continuous structured table rows, supporting both batch and streaming modes 
with BigQuery output.
   * **Batch-Only Inference Example**: Added a simplified batch-only version of 
the table row inference pipeline (`table_row_inference_batch.py`) for clearer 
demonstration and use cases.
   * **Comprehensive Utilities and Testing**: Included utility functions 
(`table_row_inference_utils.py`) for generating sample models and data, 
managing Pub/Sub resources, and provided dedicated unit tests 
(`table_row_inference_test.py`) for the new pipeline components.
   * **Performance Benchmarking Integration**: Integrated the new table row 
inference pipelines into the performance benchmarking framework, adding new 
benchmark definitions and updating existing cost benchmark logic to support 
streaming throughput metrics.
   * **Documentation and Metrics Updates**: Updated documentation and Looker 
metrics configurations to reflect the new table row inference pipelines, 
including dedicated performance pages for both batch and streaming variants.
   
   🧠 **New Feature in Public Preview:** You can now enable **Memory** to help 
**Gemini Code Assist** learn from your team's feedback. This makes future code 
reviews more consistent and personalized to your project's style. **Click 
[here](https://codeassist.google/code-review/login) to enable Memory in your 
admin console.**
   
   
   Changelog
   
   * **.test-infra/tools/refresh_looker_metrics.py**
   * Updated Looker metric IDs to include new Table Row Inference Sklearn 
Batch and Streaming pipelines.
   * **sdks/python/apache_beam/examples/inference/README.md**
   * Documented the new `table_row_inference.py` example, detailing 
prerequisites, model/data setup, and execution instructions for batch and 
streaming modes.
   * **sdks/python/apache_beam/examples/inference/table_row_inference.py**
   * Added a new `RunInference` pipeline for continuous table row 
processing, supporting batch (file input) and streaming (Pub/Sub) modes with 
BigQuery output.
   * **sdks/python/apache_beam/examples/inference/table_row_inference_batch.py**
   * Added a simplified batch-only inference pipeline for table rows, 
offering file or BigQuery output.
   * **sdks/python/apache_beam/examples/inference/table_row_inference_test.py**
   * Created unit tests for the `table_row_inference.py` pipeline 
components, including JSON parsing, schema building, model handling, and 
pipeline integration.
   * **sdks/python/apache_beam/examples/inference/table_row_inference_utils.py**
   * Introduced utility functions to create sample scikit-learn models, 
generate sample JSONL data, and manage Pub/Sub resources for testing and 
deployment.
   * 
**sdks/python/apache_beam/ml/inference/table_row_inference_requirements.txt**
   * Added a new requirements file specifying dependencies for table row 
inference, including `scikit-learn`, `numpy`, `google-cloud-monitoring`, 
`protobuf`, and `requests`.
   * **sdks/python/apache_beam/testing/benchmarks/inference/README.md**
   * Updated the README to include documentation for the new Table Row 
Inference (Sklearn) benchmark, detailing GCS artifacts and pipeline options.
   * 
**sdks/python/apache_beam/testing/benchmarks/inference/table_row_inference_benchmark.py**
   * Implemented a new benchmark test for the table row inference pipeline, 
extending `DataflowCostBenchmark` to measure performance metrics for both batch 
and streaming modes.
   * sdks/python/apache_beam/testing/load_tests/dataflow_cost_benchmark.py
   * **sdks/python/apache_beam/testing/load_tests/dataflow_cost_benchmark.py**
   * Modified `DataflowCostBenchmark` to