Re: [PR] Implemented ML Pipeline Continuous new table rows RunInference [beam]
damccorm merged PR #37647: URL: https://github.com/apache/beam/pull/37647 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Implemented ML Pipeline Continuous new table rows RunInference [beam]
aIbrahiim commented on PR #37647: URL: https://github.com/apache/beam/pull/37647#issuecomment-4132993806 @damccorm resolved -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Implemented ML Pipeline Continuous new table rows RunInference [beam]
gemini-code-assist[bot] commented on code in PR #37647: URL: https://github.com/apache/beam/pull/37647#discussion_r2993326802 ## sdks/python/apache_beam/examples/inference/table_row_inference.py: ## @@ -0,0 +1,369 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""A pipeline that uses RunInference to perform inference on table rows. + +This pipeline demonstrates ML Pipelines #18: handling continuous new table +rows with RunInference using table input models. It reads structured data +(table rows) from a streaming source, performs inference while preserving +the table schema, and writes results to a table output. + +The pipeline supports both streaming and batch modes: +- Streaming: Reads from Pub/Sub, applies windowing, writes via streaming inserts +- Batch: Reads from file, processes all data, writes via file loads + +Example usage for streaming: + python table_row_inference.py \ +--mode=streaming \ +--input_subscription=projects/PROJECT/subscriptions/SUBSCRIPTION \ +--output_table=PROJECT:DATASET.TABLE \ +--model_path=gs://BUCKET/model.pkl \ +--feature_columns=feature1,feature2,feature3 \ +--runner=DataflowRunner \ +--project=PROJECT \ +--region=REGION \ +--temp_location=gs://BUCKET/temp + +Example usage for batch: + python table_row_inference.py \ +--mode=batch \ +--input_file=gs://BUCKET/input.jsonl \ +--output_table=PROJECT:DATASET.TABLE \ +--model_path=gs://BUCKET/model.pkl \ +--feature_columns=feature1,feature2,feature3 + + # Batch with file output + python table_row_inference.py \ +--mode=batch \ +--input_file=data.jsonl \ +--output_file=predictions.jsonl \ +--model_path=model.pkl \ +--feature_columns=feature1,feature2,feature3 +""" + +import argparse +import hashlib +import json +import logging +from collections.abc import Iterable +from typing import Any +from typing import Optional + +import apache_beam as beam +import numpy as np +from apache_beam.ml.inference.base import KeyedModelHandler +from apache_beam.ml.inference.base import PredictionResult +from apache_beam.ml.inference.base import RunInference +from apache_beam.ml.inference.sklearn_inference import ModelFileType +from apache_beam.ml.inference.sklearn_inference import SklearnModelHandlerNumpy +from apache_beam.options.pipeline_options import PipelineOptions +from apache_beam.options.pipeline_options import SetupOptions +from apache_beam.options.pipeline_options import StandardOptions +from apache_beam.runners.runner import PipelineResult + + +class TableRowModelHandler(SklearnModelHandlerNumpy): + """ModelHandler that processes table rows (beam.Row objects) for inference. + + This handler extends SklearnModelHandlerNumpy to work with structured + table data represented as beam.Row objects. It extracts specified feature + columns from the row and converts them to numpy arrays for model input. + + Attributes: +feature_columns: List of column names to extract as features from input rows Review Comment:  The indentation for the `Attributes` section in the docstring is incorrect. According to PEP 257 (Docstring Conventions), the descriptions for attributes should be indented to align with the start of their respective descriptions, typically one level deeper than the `Attributes` keyword itself. ```python Attributes: feature_columns: List of column names to extract as features from input rows ``` ## sdks/python/apache_beam/examples/inference/table_row_batch_example.py: ## @@ -0,0 +1,194 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed u
Re: [PR] Implemented ML Pipeline Continuous new table rows RunInference [beam]
aIbrahiim commented on PR #37647: URL: https://github.com/apache/beam/pull/37647#issuecomment-4132678268 /gemini review -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Implemented ML Pipeline Continuous new table rows RunInference [beam]
damccorm commented on PR #37647: URL: https://github.com/apache/beam/pull/37647#issuecomment-4093087573 Thanks - could you address the 2 gemini comments as well? Otherwise, this looks good -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Implemented ML Pipeline Continuous new table rows RunInference [beam]
aIbrahiim commented on code in PR #37647:
URL: https://github.com/apache/beam/pull/37647#discussion_r2904182635
##
sdks/python/apache_beam/ml/inference/base_test.py:
##
@@ -1157,10 +1155,9 @@ def test_run_inference_with_iterable_side_input(self):
FakeModelHandler(), model_metadata_pcoll=side_input))
test_pipeline.run()
-self.assertTrue(
-'PCollection of size 2 with more than one element accessed as a '
-'singleton view. First two elements encountered are' in str(
-e.exception))
+msg = str(e.exception)
+self.assertIn('singleton', msg, msg='Expected singleton view error')
+self.assertIn('more than one', msg, msg='Expected multiple-elements error')
Review Comment:
The test was failing in CI because it asserted on the exact exception
message from pvalue.py so when the message was reworded or wrapped like by the
runner or environment the assertion failed even though the right exception was
still raised so i relaxed the assertion to check that the message contains
""singleton" and "more than one" so we still verify the intended behavior which
is multiple elements used as a singleton without the depending on the exact
wording
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] Implemented ML Pipeline Continuous new table rows RunInference [beam]
tvalentyn commented on PR #37647: URL: https://github.com/apache/beam/pull/37647#issuecomment-4014971824 waiting on author -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Implemented ML Pipeline Continuous new table rows RunInference [beam]
github-actions[bot] commented on PR #37647: URL: https://github.com/apache/beam/pull/37647#issuecomment-4011443068 Reminder, please take a look at this pr: @tvalentyn @Abacn -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Implemented ML Pipeline Continuous new table rows RunInference [beam]
aIbrahiim closed pull request #37647: Implemented ML Pipeline Continuous new table rows RunInference URL: https://github.com/apache/beam/pull/37647 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Implemented ML Pipeline Continuous new table rows RunInference [beam]
codecov[bot] commented on PR #37647: URL: https://github.com/apache/beam/pull/37647#issuecomment-3936452840 ## [Codecov](https://app.codecov.io/gh/apache/beam/pull/37647?dropdown=coverage&src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) Report :x: Patch coverage is `2.83019%` with `103 lines` in your changes missing coverage. Please review. :white_check_mark: Project coverage is 40.06%. Comparing base ([`600bd61`](https://app.codecov.io/gh/apache/beam/commit/600bd612dc2ef495fef08675398361159f7a03df?dropdown=coverage&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)) to head ([`331aa64`](https://app.codecov.io/gh/apache/beam/commit/331aa6470e01a80167427e2580cdca9b857aa017?dropdown=coverage&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)). :warning: Report is 179 commits behind head on master. | [Files with missing lines](https://app.codecov.io/gh/apache/beam/pull/37647?dropdown=coverage&src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | Patch % | Lines | |---|---|---| | [...beam/testing/load\_tests/dataflow\_cost\_benchmark.py](https://app.codecov.io/gh/apache/beam/pull/37647?src=pr&el=tree&filepath=sdks%2Fpython%2Fapache_beam%2Ftesting%2Fload_tests%2Fdataflow_cost_benchmark.py&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdGVzdGluZy9sb2FkX3Rlc3RzL2RhdGFmbG93X2Nvc3RfYmVuY2htYXJrLnB5) | 0.00% | [52 Missing :warning: ](https://app.codecov.io/gh/apache/beam/pull/37647?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | | [...chmarks/inference/table\_row\_inference\_benchmark.py](https://app.codecov.io/gh/apache/beam/pull/37647?src=pr&el=tree&filepath=sdks%2Fpython%2Fapache_beam%2Ftesting%2Fbenchmarks%2Finference%2Ftable_row_inference_benchmark.py&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdGVzdGluZy9iZW5jaG1hcmtzL2luZmVyZW5jZS90YWJsZV9yb3dfaW5mZXJlbmNlX2JlbmNobWFyay5weQ==) | 0.00% | [46 Missing :warning: ](https://app.codecov.io/gh/apache/beam/pull/37647?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | | [sdks/python/apache\_beam/testing/test\_pipeline.py](https://app.codecov.io/gh/apache/beam/pull/37647?src=pr&el=tree&filepath=sdks%2Fpython%2Fapache_beam%2Ftesting%2Ftest_pipeline.py&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdGVzdGluZy90ZXN0X3BpcGVsaW5lLnB5) | 37.50% | [5 Missing :warning: ](https://app.codecov.io/gh/apache/beam/pull/37647?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | Additional details and impacted files ```diff @@ Coverage Diff @@ ## master #37647 +/- ## = - Coverage 57.13% 40.06% -17.07% Complexity 3515 3515 = Files 1228 1225-3 Lines189092 188725 -367 Branches 3656 3656 = - Hits 10803975615-32424 - Misses77637 109694+32057 Partials 3416 3416 ``` | [Flag](https://app.codecov.io/gh/apache/beam/pull/37647/flags?src=pr&el=flags&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | Coverage Δ | | |---|---|---| | [python](https://app.codecov.io/gh/apache/beam/pull/37647/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `39.68% <2.83%> (-41.12%)` | :arrow_down: | Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#carryforward-flags-in-the-pull-request-comment) to find out more. [:umbrella: View full report in Codecov by Sentry](https://app.codecov.io/gh/apache/beam/pull/37647?dropdown=coverage&src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache). :loudspeaker: Have feedback on the report? [Share it here](https://about.codecov.io/codecov-pr-comment-feedback/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaig
Re: [PR] Implemented ML Pipeline Continuous new table rows RunInference [beam]
github-actions[bot] commented on PR #37647: URL: https://github.com/apache/beam/pull/37647#issuecomment-3929697982 Assigning reviewers: R: @tvalentyn for label python. R: @Abacn for label build. R: @Abacn for label website. Note: If you would like to opt out of this review, comment `assign to next reviewer`. Available commands: - `stop reviewer notifications` - opt out of the automated review tooling - `remind me after tests pass` - tag the comment author after tests pass - `waiting on author` - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers) The PR bot will only process comments in the main thread (not review comments). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Implemented ML Pipeline Continuous new table rows RunInference [beam]
github-actions[bot] commented on PR #37647: URL: https://github.com/apache/beam/pull/37647#issuecomment-3928126720 Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment `assign set of reviewers` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Implemented ML Pipeline Continuous new table rows RunInference [beam]
gemini-code-assist[bot] commented on PR #37647: URL: https://github.com/apache/beam/pull/37647#issuecomment-3927562010 ## Summary of Changes Hello @aIbrahiim, I'm Gemini Code Assist[^1]! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly enhances Apache Beam's ML capabilities by introducing a robust example pipeline for performing continuous machine learning inference on structured table data. The new pipeline, built around the `RunInference` transform, is designed to handle both batch and streaming inputs, allowing for flexible data processing from sources like files or Pub/Sub, applying scikit-learn models, and outputting results to BigQuery while maintaining data schema. The changes also include comprehensive support infrastructure, such as data generation utilities, unit tests, and integration into the performance benchmarking system, ensuring the example is well-tested, documented, and its performance characteristics are measurable. ### Highlights * **New ML Pipeline for Table Row Inference**: Introduced a new `RunInference` pipeline example (`table_row_inference.py`) designed to process continuous structured table rows, supporting both batch and streaming modes with BigQuery output. * **Batch-Only Inference Example**: Added a simplified batch-only version of the table row inference pipeline (`table_row_inference_batch.py`) for clearer demonstration and use cases. * **Comprehensive Utilities and Testing**: Included utility functions (`table_row_inference_utils.py`) for generating sample models and data, managing Pub/Sub resources, and provided dedicated unit tests (`table_row_inference_test.py`) for the new pipeline components. * **Performance Benchmarking Integration**: Integrated the new table row inference pipelines into the performance benchmarking framework, adding new benchmark definitions and updating existing cost benchmark logic to support streaming throughput metrics. * **Documentation and Metrics Updates**: Updated documentation and Looker metrics configurations to reflect the new table row inference pipelines, including dedicated performance pages for both batch and streaming variants. 🧠 **New Feature in Public Preview:** You can now enable **Memory** to help **Gemini Code Assist** learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. **Click [here](https://codeassist.google/code-review/login) to enable Memory in your admin console.** Changelog * **.test-infra/tools/refresh_looker_metrics.py** * Updated Looker metric IDs to include new Table Row Inference Sklearn Batch and Streaming pipelines. * **sdks/python/apache_beam/examples/inference/README.md** * Documented the new `table_row_inference.py` example, detailing prerequisites, model/data setup, and execution instructions for batch and streaming modes. * **sdks/python/apache_beam/examples/inference/table_row_inference.py** * Added a new `RunInference` pipeline for continuous table row processing, supporting batch (file input) and streaming (Pub/Sub) modes with BigQuery output. * **sdks/python/apache_beam/examples/inference/table_row_inference_batch.py** * Added a simplified batch-only inference pipeline for table rows, offering file or BigQuery output. * **sdks/python/apache_beam/examples/inference/table_row_inference_test.py** * Created unit tests for the `table_row_inference.py` pipeline components, including JSON parsing, schema building, model handling, and pipeline integration. * **sdks/python/apache_beam/examples/inference/table_row_inference_utils.py** * Introduced utility functions to create sample scikit-learn models, generate sample JSONL data, and manage Pub/Sub resources for testing and deployment. * **sdks/python/apache_beam/ml/inference/table_row_inference_requirements.txt** * Added a new requirements file specifying dependencies for table row inference, including `scikit-learn`, `numpy`, `google-cloud-monitoring`, `protobuf`, and `requests`. * **sdks/python/apache_beam/testing/benchmarks/inference/README.md** * Updated the README to include documentation for the new Table Row Inference (Sklearn) benchmark, detailing GCS artifacts and pipeline options. * **sdks/python/apache_beam/testing/benchmarks/inference/table_row_inference_benchmark.py** * Implemented a new benchmark test for the table row inference pipeline, extending `DataflowCostBenchmark` to measure performance metrics for both batch and streaming modes. * sdks/python/apache_beam/testing/load_tests/dataflow_cost_benchmark.py * **sdks/python/apache_beam/testing/load_tests/dataflow_cost_benchmark.py** * Modified `DataflowCostBenchmark` to
