[GitHub] [beam] George-Wu commented on a change in pull request #12331: [BEAM-10601] DICOM API Beam IO connector
George-Wu commented on a change in pull request #12331: URL: https://github.com/apache/beam/pull/12331#discussion_r463411782 ## File path: sdks/python/setup.py ## @@ -128,6 +128,7 @@ def get_version(): cythonize = lambda *args, **kwargs: [] REQUIRED_PACKAGES = [ +'google-auth>=1.18.0,<=1.20.0', Review comment: For some reason if I put it under GCP_REQUIREMENTS, there will still be 'no mudule found errors caused by google.auth during the unittest. https://ci-beam.apache.org/job/beam_PreCommit_Python_Commit/14212/testReport/junit/(root)/(empty)/apache_beam_io_gcp_dicomio_test_7/ ## File path: sdks/python/setup.py ## @@ -128,6 +128,7 @@ def get_version(): cythonize = lambda *args, **kwargs: [] REQUIRED_PACKAGES = [ +'google-auth>=1.18.0,<=1.20.0', Review comment: For some reason if I put it under GCP_REQUIREMENTS, there will be 'no mudule found errors caused by google.auth during the unittest. https://ci-beam.apache.org/job/beam_PreCommit_Python_Commit/14212/testReport/junit/(root)/(empty)/apache_beam_io_gcp_dicomio_test_7/ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] George-Wu commented on a change in pull request #12331: [BEAM-10601] DICOM API Beam IO connector
George-Wu commented on a change in pull request #12331: URL: https://github.com/apache/beam/pull/12331#discussion_r463411782 ## File path: sdks/python/setup.py ## @@ -128,6 +128,7 @@ def get_version(): cythonize = lambda *args, **kwargs: [] REQUIRED_PACKAGES = [ +'google-auth>=1.18.0,<=1.20.0', Review comment: For some reason if I put it under GCP_REQUIREMENTS, there will be 'ModuleNotFoundError' caused by google.auth during the unittest. https://ci-beam.apache.org/job/beam_PreCommit_Python_Commit/14212/testReport/junit/(root)/(empty)/apache_beam_io_gcp_dicomio_test_7/ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] lostluck commented on pull request #12429: Clean ExtractOutput of mean transform
lostluck commented on pull request #12429: URL: https://github.com/apache/beam/pull/12429#issuecomment-666901638 Going to close it, largely because while in terms of Beam Semantics it's unlikely to trigger, not guarding a divide by zero seems fundamentally wrong. Something could change that enabler that clause to be triggered, and I'd rather not deal with an emergency cherry pick to deal with it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] lostluck closed pull request #12429: Clean ExtractOutput of mean transform
lostluck closed pull request #12429: URL: https://github.com/apache/beam/pull/12429 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] kennknowles merged pull request #12387: Upgrade spotbugs from 3.1.12 to 4.0.6, spotbugs gradle plugin from 2.0.0 to 3.0.0
kennknowles merged pull request #12387: URL: https://github.com/apache/beam/pull/12387 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] lostluck merged pull request #12425: [BEAM-10610][BEAM-9982] Support Loopback mode with universal runners in the Go SDK.
lostluck merged pull request #12425: URL: https://github.com/apache/beam/pull/12425 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] George-Wu commented on a change in pull request #12331: [BEAM-10601] DICOM API Beam IO connector
George-Wu commented on a change in pull request #12331: URL: https://github.com/apache/beam/pull/12331#discussion_r463383155 ## File path: sdks/python/apache_beam/io/gcp/dicomio.py ## @@ -164,70 +167,109 @@ class DicomSearch(PTransform): } """ - def __init__(self, credential=None): + def __init__(self, buffer_size=8, max_workers=5, credential=None): """Initializes DicomSearch. Args: credential: # type: Google credential object, if it is specified, the Http client will use it to create sessions instead of the default. """ +self.buffer_size = buffer_size +self.max_workers = max_workers Review comment: Added, but for some objects reference issues in the unit test it has to be added in an ugly way in the WriteToDicomStore connector... This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] George-Wu commented on a change in pull request #12331: [BEAM-10601] DICOM API Beam IO connector
George-Wu commented on a change in pull request #12331: URL: https://github.com/apache/beam/pull/12331#discussion_r463382756 ## File path: sdks/python/apache_beam/io/gcp/dicomio.py ## @@ -372,52 +420,63 @@ def __init__(self, destination_dict, input_type, credential=None): credential: # type: Google credential object, if it is specified, the Http client will use it instead of the default one. """ -self.credential = credential self.destination_dict = destination_dict # input_type pre-check if input_type not in ['bytes', 'fileio']: raise ValueError("input_type could only be 'bytes' or 'fileio'") self.input_type = input_type +self.buffer_size = buffer_size +self.max_workers = max_workers +self.credential = credential def expand(self, pcoll): return pcoll | beam.ParDo( -_StoreInstance(self.destination_dict, self.input_type, self.credential)) +_StoreInstance( +self.destination_dict, +self.input_type, +self.buffer_size, +self.max_workers, +self.credential)) class _StoreInstance(beam.DoFn): """A DoFn read or fetch dicom files then push it to a dicom store.""" - def __init__(self, destination_dict, input_type, credential=None): -self.credential = credential + def __init__( + self, + destination_dict, + input_type, + buffer_size, + max_workers, + credential=None): Review comment: Add custom client support ## File path: sdks/python/apache_beam/io/gcp/dicomio.py ## @@ -164,70 +167,109 @@ class DicomSearch(PTransform): } """ - def __init__(self, credential=None): + def __init__(self, buffer_size=8, max_workers=5, credential=None): """Initializes DicomSearch. Args: credential: # type: Google credential object, if it is specified, the Http client will use it to create sessions instead of the default. """ +self.buffer_size = buffer_size +self.max_workers = max_workers self.credential = credential def expand(self, pcoll): -return pcoll | beam.ParDo(_QidoSource(self.credential)) +return pcoll | beam.ParDo( +_QidoSource(self.buffer_size, self.max_workers, self.credential)) class _QidoSource(beam.DoFn): """A DoFn for executing every qido query request.""" - def __init__(self, credential=None): + def __init__(self, buffer_size, max_workers, credential=None): +self.buffer_size = buffer_size +self.max_workers = max_workers self.credential = credential - def process(self, element): + def start_bundle(self): +self.buffer = [] + + def finish_bundle(self): +return self._flush() + + def validate_element(self, element): # Check if all required keys present. required_keys = [ 'project_id', 'region', 'dataset_id', 'dicom_store_id', 'search_type' ] -error_message = None - for key in required_keys: if key not in element: error_message = 'Must have %s in the dict.' % (key) -break - -if not error_message: - project_id = element['project_id'] - region = element['region'] - dataset_id = element['dataset_id'] - dicom_store_id = element['dicom_store_id'] - search_type = element['search_type'] - params = element['params'] if 'params' in element else None - - # Call qido search http client - if element['search_type'] in ['instances', "studies", "series"]: -result, status_code = DicomApiHttpClient().qido_search( - project_id, region, dataset_id, dicom_store_id, - search_type, params, self.credential -) - else: -error_message = ( -'Search type can only be "studies", ' -'"instances" or "series"') - - if not error_message: -out = {} -out['result'] = result -out['status'] = status_code -out['input'] = element -out['success'] = (status_code == 200) -return [out] - -# Return this when the input dict dose not meet the requirements +return False, error_message + +# Check if return type is correct. +if element['search_type'] in ['instances', "studies", "series"]: + return True, None +else: + error_message = ( + 'Search type can only be "studies", ' + '"instances" or "series"') + return False, error_message + + def process(self, element, window=beam.DoFn.WindowParam): +# Check if the element is valid +valid, error_message = self.validate_element(element) + +if valid: + self.buffer.append((element, window)) + if len(self.buffer) >= self.buffer_size: +self._flush() Review comment: Fixed and test added ## File path: sdks/python/apache_beam/io/gcp/dicomio.py ## @@ -164,70 +167,109 @@ class DicomSearch(PTransform): } """ - def __init__(self, credential=None): + def
[GitHub] [beam] George-Wu commented on a change in pull request #12331: [BEAM-10601] DICOM API Beam IO connector
George-Wu commented on a change in pull request #12331: URL: https://github.com/apache/beam/pull/12331#discussion_r463382633 ## File path: sdks/python/apache_beam/io/gcp/dicomio.py ## @@ -372,52 +420,63 @@ def __init__(self, destination_dict, input_type, credential=None): credential: # type: Google credential object, if it is specified, the Http client will use it instead of the default one. """ -self.credential = credential self.destination_dict = destination_dict # input_type pre-check if input_type not in ['bytes', 'fileio']: raise ValueError("input_type could only be 'bytes' or 'fileio'") self.input_type = input_type +self.buffer_size = buffer_size +self.max_workers = max_workers +self.credential = credential def expand(self, pcoll): return pcoll | beam.ParDo( -_StoreInstance(self.destination_dict, self.input_type, self.credential)) +_StoreInstance( +self.destination_dict, +self.input_type, +self.buffer_size, +self.max_workers, +self.credential)) class _StoreInstance(beam.DoFn): """A DoFn read or fetch dicom files then push it to a dicom store.""" - def __init__(self, destination_dict, input_type, credential=None): -self.credential = credential + def __init__( + self, + destination_dict, + input_type, + buffer_size, + max_workers, + credential=None): # pre-check destination dict required_keys = ['project_id', 'region', 'dataset_id', 'dicom_store_id'] for key in required_keys: if key not in destination_dict: raise ValueError('Must have %s in the dict.' % (key)) self.destination_dict = destination_dict self.input_type = input_type +self.buffer_size = buffer_size +self.max_workers = max_workers +self.credential = credential - def process(self, element): + def start_bundle(self): +self.buffer = [] + + def finish_bundle(self): +return self._flush() + + def process(self, element, window=beam.DoFn.WindowParam): +self.buffer.append((element, window)) +if len(self.buffer) >= self.buffer_size: + self._flush() Review comment: Good catch, fixed and tests added! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] George-Wu commented on a change in pull request #12331: [BEAM-10601] DICOM API Beam IO connector
George-Wu commented on a change in pull request #12331: URL: https://github.com/apache/beam/pull/12331#discussion_r463382681 ## File path: sdks/python/apache_beam/io/gcp/dicomio.py ## @@ -426,6 +485,44 @@ def process(self, element): out = {} out['status'] = status_code -out['input'] = None if status_code == 200 else element +out['input'] = None if status_code == 200 else dicom_file out['success'] = (status_code == 200) -return [out] +return out + + def read_dicom_file(self, buffer_element): +# Read the file based on different input. If the read fails ,return +# an error dict which records input and error messages. +try: + if self.input_type == 'fileio': +f = buffer_element.open() +return True, f.read() + else: +return True, buffer_element +except Exception as error_message: + error_out = {} + error_out['status'] = error_message + error_out['input'] = buffer_element + error_out['success'] = False + return False, error_out + + def process_buffer_element(self, buffer_element): +# Thread job runner - each thread stores a DICOM file +success, read_result = self.read_dicom_file(buffer_element[0]) +windows = [buffer_element[1]] +value = None +if success: + value = self.make_request(read_result) +else: + value = read_result +return beam.utils.windowed_value.WindowedValue( +value=value, timestamp=0, windows=windows) Review comment: Added timestamps This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] George-Wu commented on a change in pull request #12331: [BEAM-10601] DICOM API Beam IO connector
George-Wu commented on a change in pull request #12331: URL: https://github.com/apache/beam/pull/12331#discussion_r463382501 ## File path: sdks/python/apache_beam/io/gcp/dicomio.py ## @@ -114,55 +115,60 @@ class DicomSearch(PTransform): """A PTransform used for retrieving DICOM instance metadata from Google -Cloud DICOM store. It takes a Pcollection of dicts as input and return -a Pcollection of dict as results: +Cloud DICOM store. It takes a PCollection of dicts as input and return +a PCollection of dict as results: INPUT: The input dict represents DICOM web path parameters, which has the following string keys and values: { - 'project_id': str, - 'region': str, - 'dataset_id': str, - 'dicom_store_id': str, - 'search_type': str, - 'params': dict(str,str) (Optional), +'project_id': str, +'region': str, +'dataset_id': str, +'dicom_store_id': str, +'search_type': str, +'params': dict(str,str) (Optional), } + Key-value pairs: - project_id: Id of the project in which DICOM store locates. (Required) + project_id: Id of the project in which the DICOM store is + located. (Required) region: Region where the DICOM store resides. (Required) dataset_id: Id of the dataset where DICOM store belongs to. (Required) dicom_store_id: Id of the dicom store. (Required) search_type: Which type of search it is, could only be one of the three -values: 'instances', 'series', or 'studies'. (Required) + values: 'instances', 'series', or 'studies'. (Required) params: A dict of str:str pairs used to refine QIDO search. (Optional) -Supported tags in three categories: - 1. Studies: -StudyInstanceUID -PatientName -PatientID -AccessionNumber -ReferringPhysicianName -StudyDate - 2. Series: all study level search terms and -SeriesInstanceUID -Modality - 3. Instances: all study/series level search terms and -SOPInstanceUID -e.g. {"StudyInstanceUID":"1","SeriesInstanceUID":"2"} + Supported tags in three categories: + 1.Studies: + StudyInstanceUID, + PatientName, + PatientID, + AccessionNumber, + ReferringPhysicianName, + StudyDate, + 2.Series: all study level search terms and + SeriesInstanceUID, + Modality, + 3.Instances: all study/series level search terms and + SOPInstanceUID, Review comment: Fixed ## File path: sdks/python/apache_beam/io/gcp/dicomio.py ## @@ -324,41 +332,45 @@ def process(self, element): return [out] -class DicomStoreInstance(PTransform): +class WriteToDicomStore(PTransform): """A PTransform for storing instances to a DICOM store. -Takes Pcollection of byte[] as input and return a Pcollection of dict as +Takes PCollection of byte[] as input and return a PCollection of dict as results. The inputs are normally DICOM file in bytes or str filename. INPUT: - This PTransform supports two types of input: -1. Byte[]: representing dicom file. -2. Fileio object: stream file object. +This PTransform supports two types of input: +1. Byte[]: representing dicom file. +2. Fileio object: stream file object. + OUTPUT: The output dict encodes status as well as error messages: { - 'success': boolean value telling whether the store is successful - 'input': undeliverable data. Exactly the same as the input, -only set if the operation is failed. - 'status': status code from the server, used as error messages. +'success': boolean value telling whether the store is successful. +'input': undeliverable data. Exactly the same as the input, +only set if the operation is failed. +'status': status code from the server, used as error messages. } + """ def __init__(self, destination_dict, input_type, credential=None): -"""Initializes DicomStoreInstance. +"""Initializes WriteToDicomStore. Args: destination_dict: # type: python dict, encodes DICOM endpoint information: -{ - 'project_id': str, - 'region': str, - 'dataset_id': str, - 'dicom_store_id': str, -} -Key-value pairs: - project_id: Id of the project in which DICOM store locates. (Required) - region: Region where the DICOM store resides. (Required) - dataset_id: Id of the dataset where DICOM store belongs to. (Required) - dicom_store_id: Id of the dicom store. (Required) + { + 'project_id': str, + 'region': str, + 'dataset_id': str, + 'dicom_store_id': str, + } + + Key-value pairs: + project_id: Id of the project in which DICOM store locates. (Required) + region: Region where the
[GitHub] [beam] ZijieSong946 commented on a change in pull request #12348: [BEAM-10240] Support ZetaSQL DATETIME functions in BeamSQL
ZijieSong946 commented on a change in pull request #12348: URL: https://github.com/apache/beam/pull/12348#discussion_r463375064 ## File path: sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlBeamTranslationUtils.java ## @@ -184,6 +188,19 @@ private static Value beamLogicalObjectToZetaSqlValue(Object object, String ident } else { // input type return Value.createTimeValue(CivilTimeEncoder.encodePacked64TimeNanos((LocalTime) object)); } +} else if (SqlTypes.DATETIME.getIdentifier().equals(identifier)) { + // DateTime value + LocalDateTime datetime; + if (object instanceof Row) { // base type +datetime = +LocalDateTime.of( +LocalDate.ofEpochDay(((Row) object).getValue("Date")), +LocalTime.ofNanoOfDay(((Row) object).getValue("Time"))); + } else { // input type +datetime = (LocalDateTime) object; + } + return Value.createDatetimeValue( Review comment: Ack. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] lostluck commented on a change in pull request #12350: [BEAM-10289] Dynamic splitting implementation.
lostluck commented on a change in pull request #12350: URL: https://github.com/apache/beam/pull/12350#discussion_r462445798 ## File path: sdks/go/pkg/beam/core/runtime/exec/datasource.go ## @@ -302,31 +308,76 @@ func (n *DataSource) Split(splits []int64, frac float64, bufSize int64) (int64, } n.mu.Lock() + defer n.mu.Unlock() + var currProg float64 // Current element progress. - if n.index < 0 { // Progress is at the end of the non-existant -1st element. + var su SplittableUnit + if n.index < 0 { // Progress is at the end of the non-existant -1st element. currProg = 1.0 - } else if n.rt == nil { // If this isn't sub-element splittable, estimate some progress. + } else if n.su == nil { // If this isn't sub-element splittable, estimate some progress. currProg = 0.5 } else { // If this is sub-element splittable, get progress of the current element. - rt := <-n.rt - d, r := rt.GetProgress() - currProg = d / (d + r) - n.rt <- rt + // If splittable, hold this tracker for the rest of the function so the element + // doesn't finish processing during a split. + su = <-n.su + if su == nil { + return SplitResult{}, fmt.Errorf("failed to split: splittable unit was nil") + } + defer func() { + n.su <- su + }() + currProg = su.GetProgress() } // Size to split within is the minimum of bufSize or splitIdx so we avoid // including elements we already know won't be processed. if bufSize <= 0 || n.splitIdx < bufSize { bufSize = n.splitIdx } - s, _, err := splitHelper(n.index, bufSize, currProg, splits, frac, false) + s, f, err := splitHelper(n.index, bufSize, currProg, splits, frac, su != nil) if err != nil { - n.mu.Unlock() - return 0, err + return SplitResult{}, err + } + + // No fraction returned, perform channel split. + if f < 0 { + n.splitIdx = s + return SplitResult{PI: s - 1, RI: s}, nil + } + // Otherwise, perform a sub-element split. + fr := f / (1.0 - currProg) + p, r, err := su.Split(fr) + if err != nil { + return SplitResult{}, err + } + + if p != nil && r != nil { // Successful split. Review comment: Consider that reversal technique I mentioned, so the short case is what's indented and returns early, and the long case is unindented. ``` if p == nil || r == nil { // Fallback to channel split, so split at next elm, not current. n.splitIdx = s + 1 return SplitResult{PI: s, RI: s + 1}, nil } // no need for an else. // .. the original contents of the if block ... ``` ## File path: sdks/go/pkg/beam/core/runtime/exec/datasource_test.go ## @@ -412,17 +423,120 @@ func TestDataSource_Split(t *testing.T) { // SDK never splits on 0, so check that every test. sp := SplitPoints{Splits: test.splitPts, Frac: test.frac, BufSize: test.bufSize} - if splitIdx, err := p.Split(sp); err != nil { + if splitRes, err := p.Split(sp); err != nil { t.Fatalf("error in Split: %v", err) - } else if got, want := splitIdx, test.splitIdx; got != want { - t.Fatalf("error in Split: got splitIdx = %v, want %v ", got, want) + } else { Review comment: And here. ## File path: sdks/go/pkg/beam/core/runtime/exec/sdf.go ## @@ -215,14 +215,25 @@ func (n *SplitAndSizeRestrictions) String() string { // changes to support the SDF's method signatures and the expected structure // of the FullValue being received. type ProcessSizedElementsAndRestrictions struct { - PDo *ParDo - - inv *ctInvoker - - // Rt allows this unit to send out restriction trackers being processed. - // Receivers of the tracker do not own it, and must send it back through the - // same channel once finished with it. - Rt chan sdf.RTracker + PDo *ParDo + TfIdstring // Transform ID. Needed for splitting. + ctInv *ctInvoker + sizeInv *rsInvoker + + // SU is a buffered channel for indicating when this unit is splittable. + // When this unit is processing an element, it sends a SplittableUnit + // interface through the channel. That interface can be received on other + // threads and used to perform splitting or other related operation. + // + // Receiving the SplittableUnit prevents the current element from finishing + // processing, so he element does not unexpectedly
[GitHub] [beam] pabloem commented on pull request #12203: [BEAM-6928] Make Python SDK custom Sink the default Sink for BigQuery
pabloem commented on pull request #12203: URL: https://github.com/apache/beam/pull/12203#issuecomment-666858821 Run Python2_PVR_Flink PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] pabloem commented on pull request #12203: [BEAM-6928] Make Python SDK custom Sink the default Sink for BigQuery
pabloem commented on pull request #12203: URL: https://github.com/apache/beam/pull/12203#issuecomment-666851408 only PullLicenses is failing @chamikaramj PTAL This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] ibzib commented on pull request #12385: [BEAM-10527] Migrate Flink and Spark tests to pytest.
ibzib commented on pull request #12385: URL: https://github.com/apache/beam/pull/12385#issuecomment-666850930 > I have already spent a long time trying to fix quotes, so I can't help but wondering: why do we need flinkCompatibilityMatrixPROCESS in the first place, when it is not being run anywhere? If it's important, shouldn't we add it to some postcommit? Another solution I had in mind was reworking the `--environment_config` option. JSON blobs are unwieldy, and overloading the `--environment_config` option is confusing to the user. We could make each field in the PROCESS `--environment_config` blob its own argument, and then reject these arguments when `environment_type != PROCESS`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] pabloem merged pull request #12082: Standardizing BigQuery job names in Beam Python and Java SDKs
pabloem merged pull request #12082: URL: https://github.com/apache/beam/pull/12082 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] ibzib commented on pull request #12385: [BEAM-10527] Migrate Flink and Spark tests to pytest.
ibzib commented on pull request #12385: URL: https://github.com/apache/beam/pull/12385#issuecomment-666849394 I got `flinkCompatibilityMatrixPROCESS` to pass on my machine by escaping the arguments via `${1@Q}`. Apparently whatever shell Jenkins is using does not support this. I will have to find a better solution. I have already spent a long time trying to fix quotes, so I can't help but wondering: why do we need `flinkCompatibilityMatrixPROCESS` in the first place, when it is not being run anywhere? If it's important, shouldn't we add it to some postcommit? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] ibzib commented on a change in pull request #12385: [BEAM-10527] Migrate Flink and Spark tests to pytest.
ibzib commented on a change in pull request #12385: URL: https://github.com/apache/beam/pull/12385#discussion_r463348975 ## File path: sdks/python/apache_beam/runners/portability/flink_runner_test.py ## @@ -53,361 +53,380 @@ from apache_beam.transforms import userstate from apache_beam.transforms.sql import SqlTransform +# Run as +# +# pytest flink_runner_test.py \ +# [--test_pipeline_options "--flink_job_server_jar=/path/to/job_server.jar \ +# --environment_type=DOCKER"] \ +# [FlinkRunnerTest.test_method, ...] + _LOGGER = logging.getLogger(__name__) Row = typing.NamedTuple("Row", [("col1", int), ("col2", unicode)]) beam.coders.registry.register_coder(Row, beam.coders.RowCoder) -if __name__ == '__main__': - # Run as - # - # python -m apache_beam.runners.portability.flink_runner_test \ - # --flink_job_server_jar=/path/to/job_server.jar \ - # --environment_type=docker \ - # --extra_experiments=beam_experiments \ - # [FlinkRunnerTest.test_method, ...] - - parser = argparse.ArgumentParser(add_help=True) - parser.add_argument( - '--flink_job_server_jar', help='Job server jar to submit jobs.') - parser.add_argument( - '--streaming', - default=False, - action='store_true', - help='Job type. batch or streaming') - parser.add_argument( - '--environment_type', - default='loopback', - help='Environment type. docker, process, or loopback.') - parser.add_argument('--environment_config', help='Environment config.') - parser.add_argument( - '--extra_experiments', - default=[], - action='append', - help='Beam experiments config.') - known_args, args = parser.parse_known_args(sys.argv) - sys.argv = args - - flink_job_server_jar = ( - known_args.flink_job_server_jar or - job_server.JavaJarJobServer.path_to_beam_jar( - 'runners:flink:%s:job-server:shadowJar' % - FlinkRunnerOptions.PUBLISHED_FLINK_VERSIONS[-1])) - streaming = known_args.streaming - environment_type = known_args.environment_type.lower() - environment_config = ( - known_args.environment_config if known_args.environment_config else None) - extra_experiments = known_args.extra_experiments - - # This is defined here to only be run when we invoke this file explicitly. - class FlinkRunnerTest(portable_runner_test.PortableRunnerTest): -_use_grpc = True -_use_subprocesses = True - -conf_dir = None -expansion_port = None - -@classmethod -def tearDownClass(cls): - if cls.conf_dir and exists(cls.conf_dir): -_LOGGER.info("removing conf dir: %s" % cls.conf_dir) -rmtree(cls.conf_dir) - super(FlinkRunnerTest, cls).tearDownClass() - -@classmethod -def _create_conf_dir(cls): - """Create (and save a static reference to) a "conf dir", used to provide - metrics configs and verify metrics output - - It gets cleaned up when the suite is done executing""" - - if hasattr(cls, 'conf_dir'): -cls.conf_dir = mkdtemp(prefix='flinktest-conf') - -# path for a FileReporter to write metrics to -cls.test_metrics_path = path.join(cls.conf_dir, 'test-metrics.txt') - -# path to write Flink configuration to -conf_path = path.join(cls.conf_dir, 'flink-conf.yaml') -file_reporter = 'org.apache.beam.runners.flink.metrics.FileReporter' -with open(conf_path, 'w') as f: - f.write( - linesep.join([ - 'metrics.reporters: file', - 'metrics.reporter.file.class: %s' % file_reporter, - 'metrics.reporter.file.path: %s' % cls.test_metrics_path, - 'metrics.scope.operator: ', - ])) - -@classmethod -def _subprocess_command(cls, job_port, expansion_port): - # will be cleaned up at the end of this method, and recreated and used by - # the job server - tmp_dir = mkdtemp(prefix='flinktest') - - cls._create_conf_dir() - cls.expansion_port = expansion_port - - try: -return [ -'java', -'-Dorg.slf4j.simpleLogger.defaultLogLevel=warn', -'-jar', -flink_job_server_jar, -'--flink-master', -'[local]', -'--flink-conf-dir', -cls.conf_dir, -'--artifacts-dir', -tmp_dir, -'--job-port', -str(job_port), -'--artifact-port', -'0', -'--expansion-port', -str(expansion_port), -] - finally: -rmtree(tmp_dir) - -@classmethod -def get_runner(cls): - return portable_runner.PortableRunner() - -@classmethod -def get_expansion_service(cls): - # TODO Move expansion address resides into PipelineOptions - return 'localhost:%s' % cls.expansion_port - -def create_options(self): - options = super(FlinkRunnerTest,
[GitHub] [beam] ibzib commented on a change in pull request #12385: [BEAM-10527] Migrate Flink and Spark tests to pytest.
ibzib commented on a change in pull request #12385: URL: https://github.com/apache/beam/pull/12385#discussion_r463348975 ## File path: sdks/python/apache_beam/runners/portability/flink_runner_test.py ## @@ -53,361 +53,380 @@ from apache_beam.transforms import userstate from apache_beam.transforms.sql import SqlTransform +# Run as +# +# pytest flink_runner_test.py \ +# [--test_pipeline_options "--flink_job_server_jar=/path/to/job_server.jar \ +# --environment_type=DOCKER"] \ +# [FlinkRunnerTest.test_method, ...] + _LOGGER = logging.getLogger(__name__) Row = typing.NamedTuple("Row", [("col1", int), ("col2", unicode)]) beam.coders.registry.register_coder(Row, beam.coders.RowCoder) -if __name__ == '__main__': - # Run as - # - # python -m apache_beam.runners.portability.flink_runner_test \ - # --flink_job_server_jar=/path/to/job_server.jar \ - # --environment_type=docker \ - # --extra_experiments=beam_experiments \ - # [FlinkRunnerTest.test_method, ...] - - parser = argparse.ArgumentParser(add_help=True) - parser.add_argument( - '--flink_job_server_jar', help='Job server jar to submit jobs.') - parser.add_argument( - '--streaming', - default=False, - action='store_true', - help='Job type. batch or streaming') - parser.add_argument( - '--environment_type', - default='loopback', - help='Environment type. docker, process, or loopback.') - parser.add_argument('--environment_config', help='Environment config.') - parser.add_argument( - '--extra_experiments', - default=[], - action='append', - help='Beam experiments config.') - known_args, args = parser.parse_known_args(sys.argv) - sys.argv = args - - flink_job_server_jar = ( - known_args.flink_job_server_jar or - job_server.JavaJarJobServer.path_to_beam_jar( - 'runners:flink:%s:job-server:shadowJar' % - FlinkRunnerOptions.PUBLISHED_FLINK_VERSIONS[-1])) - streaming = known_args.streaming - environment_type = known_args.environment_type.lower() - environment_config = ( - known_args.environment_config if known_args.environment_config else None) - extra_experiments = known_args.extra_experiments - - # This is defined here to only be run when we invoke this file explicitly. - class FlinkRunnerTest(portable_runner_test.PortableRunnerTest): -_use_grpc = True -_use_subprocesses = True - -conf_dir = None -expansion_port = None - -@classmethod -def tearDownClass(cls): - if cls.conf_dir and exists(cls.conf_dir): -_LOGGER.info("removing conf dir: %s" % cls.conf_dir) -rmtree(cls.conf_dir) - super(FlinkRunnerTest, cls).tearDownClass() - -@classmethod -def _create_conf_dir(cls): - """Create (and save a static reference to) a "conf dir", used to provide - metrics configs and verify metrics output - - It gets cleaned up when the suite is done executing""" - - if hasattr(cls, 'conf_dir'): -cls.conf_dir = mkdtemp(prefix='flinktest-conf') - -# path for a FileReporter to write metrics to -cls.test_metrics_path = path.join(cls.conf_dir, 'test-metrics.txt') - -# path to write Flink configuration to -conf_path = path.join(cls.conf_dir, 'flink-conf.yaml') -file_reporter = 'org.apache.beam.runners.flink.metrics.FileReporter' -with open(conf_path, 'w') as f: - f.write( - linesep.join([ - 'metrics.reporters: file', - 'metrics.reporter.file.class: %s' % file_reporter, - 'metrics.reporter.file.path: %s' % cls.test_metrics_path, - 'metrics.scope.operator: ', - ])) - -@classmethod -def _subprocess_command(cls, job_port, expansion_port): - # will be cleaned up at the end of this method, and recreated and used by - # the job server - tmp_dir = mkdtemp(prefix='flinktest') - - cls._create_conf_dir() - cls.expansion_port = expansion_port - - try: -return [ -'java', -'-Dorg.slf4j.simpleLogger.defaultLogLevel=warn', -'-jar', -flink_job_server_jar, -'--flink-master', -'[local]', -'--flink-conf-dir', -cls.conf_dir, -'--artifacts-dir', -tmp_dir, -'--job-port', -str(job_port), -'--artifact-port', -'0', -'--expansion-port', -str(expansion_port), -] - finally: -rmtree(tmp_dir) - -@classmethod -def get_runner(cls): - return portable_runner.PortableRunner() - -@classmethod -def get_expansion_service(cls): - # TODO Move expansion address resides into PipelineOptions - return 'localhost:%s' % cls.expansion_port - -def create_options(self): - options = super(FlinkRunnerTest,
[GitHub] [beam] ibzib commented on pull request #12385: [BEAM-10527] Migrate Flink and Spark tests to pytest.
ibzib commented on pull request #12385: URL: https://github.com/apache/beam/pull/12385#issuecomment-666846231 > I couldn't find the published test results in Jenkins. Do we have to add this separately? https://ci-beam.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Commit/6126/ This is because publishing the test results relies on a change to the Jenkins job config, and I didn't want to `run seed job` because Flink PVR is a precommit and if I made a mistake, `run seed job` could break it for others. I tried testing locally on dockerized Jenkins, but I couldn't get it to work. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] ibzib commented on a change in pull request #12385: [BEAM-10527] Migrate Flink and Spark tests to pytest.
ibzib commented on a change in pull request #12385: URL: https://github.com/apache/beam/pull/12385#discussion_r463346402 ## File path: sdks/python/apache_beam/runners/portability/flink_runner_test.py ## @@ -53,361 +53,380 @@ from apache_beam.transforms import userstate from apache_beam.transforms.sql import SqlTransform +# Run as +# +# pytest flink_runner_test.py \ +# [--test_pipeline_options "--flink_job_server_jar=/path/to/job_server.jar \ +# --environment_type=DOCKER"] \ +# [FlinkRunnerTest.test_method, ...] + _LOGGER = logging.getLogger(__name__) Row = typing.NamedTuple("Row", [("col1", int), ("col2", unicode)]) beam.coders.registry.register_coder(Row, beam.coders.RowCoder) -if __name__ == '__main__': - # Run as - # - # python -m apache_beam.runners.portability.flink_runner_test \ - # --flink_job_server_jar=/path/to/job_server.jar \ - # --environment_type=docker \ - # --extra_experiments=beam_experiments \ - # [FlinkRunnerTest.test_method, ...] - - parser = argparse.ArgumentParser(add_help=True) - parser.add_argument( - '--flink_job_server_jar', help='Job server jar to submit jobs.') - parser.add_argument( - '--streaming', - default=False, - action='store_true', - help='Job type. batch or streaming') - parser.add_argument( - '--environment_type', - default='loopback', - help='Environment type. docker, process, or loopback.') - parser.add_argument('--environment_config', help='Environment config.') - parser.add_argument( - '--extra_experiments', - default=[], - action='append', - help='Beam experiments config.') - known_args, args = parser.parse_known_args(sys.argv) - sys.argv = args - - flink_job_server_jar = ( - known_args.flink_job_server_jar or - job_server.JavaJarJobServer.path_to_beam_jar( - 'runners:flink:%s:job-server:shadowJar' % - FlinkRunnerOptions.PUBLISHED_FLINK_VERSIONS[-1])) - streaming = known_args.streaming - environment_type = known_args.environment_type.lower() - environment_config = ( - known_args.environment_config if known_args.environment_config else None) - extra_experiments = known_args.extra_experiments - - # This is defined here to only be run when we invoke this file explicitly. - class FlinkRunnerTest(portable_runner_test.PortableRunnerTest): -_use_grpc = True -_use_subprocesses = True - -conf_dir = None -expansion_port = None - -@classmethod -def tearDownClass(cls): - if cls.conf_dir and exists(cls.conf_dir): -_LOGGER.info("removing conf dir: %s" % cls.conf_dir) -rmtree(cls.conf_dir) - super(FlinkRunnerTest, cls).tearDownClass() - -@classmethod -def _create_conf_dir(cls): - """Create (and save a static reference to) a "conf dir", used to provide - metrics configs and verify metrics output - - It gets cleaned up when the suite is done executing""" - - if hasattr(cls, 'conf_dir'): -cls.conf_dir = mkdtemp(prefix='flinktest-conf') - -# path for a FileReporter to write metrics to -cls.test_metrics_path = path.join(cls.conf_dir, 'test-metrics.txt') - -# path to write Flink configuration to -conf_path = path.join(cls.conf_dir, 'flink-conf.yaml') -file_reporter = 'org.apache.beam.runners.flink.metrics.FileReporter' -with open(conf_path, 'w') as f: - f.write( - linesep.join([ - 'metrics.reporters: file', - 'metrics.reporter.file.class: %s' % file_reporter, - 'metrics.reporter.file.path: %s' % cls.test_metrics_path, - 'metrics.scope.operator: ', - ])) - -@classmethod -def _subprocess_command(cls, job_port, expansion_port): - # will be cleaned up at the end of this method, and recreated and used by - # the job server - tmp_dir = mkdtemp(prefix='flinktest') - - cls._create_conf_dir() - cls.expansion_port = expansion_port - - try: -return [ -'java', -'-Dorg.slf4j.simpleLogger.defaultLogLevel=warn', -'-jar', -flink_job_server_jar, -'--flink-master', -'[local]', -'--flink-conf-dir', -cls.conf_dir, -'--artifacts-dir', -tmp_dir, -'--job-port', -str(job_port), -'--artifact-port', -'0', -'--expansion-port', -str(expansion_port), -] - finally: -rmtree(tmp_dir) - -@classmethod -def get_runner(cls): - return portable_runner.PortableRunner() - -@classmethod -def get_expansion_service(cls): - # TODO Move expansion address resides into PipelineOptions - return 'localhost:%s' % cls.expansion_port - -def create_options(self): - options = super(FlinkRunnerTest,
[GitHub] [beam] ibzib commented on a change in pull request #12385: [BEAM-10527] Migrate Flink and Spark tests to pytest.
ibzib commented on a change in pull request #12385: URL: https://github.com/apache/beam/pull/12385#discussion_r463345888 ## File path: sdks/python/apache_beam/runners/portability/flink_runner_test.py ## @@ -53,361 +53,380 @@ from apache_beam.transforms import userstate from apache_beam.transforms.sql import SqlTransform +# Run as +# +# pytest flink_runner_test.py \ +# [--test_pipeline_options "--flink_job_server_jar=/path/to/job_server.jar \ +# --environment_type=DOCKER"] \ +# [FlinkRunnerTest.test_method, ...] + _LOGGER = logging.getLogger(__name__) Row = typing.NamedTuple("Row", [("col1", int), ("col2", unicode)]) beam.coders.registry.register_coder(Row, beam.coders.RowCoder) -if __name__ == '__main__': - # Run as - # - # python -m apache_beam.runners.portability.flink_runner_test \ - # --flink_job_server_jar=/path/to/job_server.jar \ - # --environment_type=docker \ - # --extra_experiments=beam_experiments \ - # [FlinkRunnerTest.test_method, ...] - - parser = argparse.ArgumentParser(add_help=True) - parser.add_argument( - '--flink_job_server_jar', help='Job server jar to submit jobs.') - parser.add_argument( - '--streaming', - default=False, - action='store_true', - help='Job type. batch or streaming') - parser.add_argument( - '--environment_type', - default='loopback', - help='Environment type. docker, process, or loopback.') - parser.add_argument('--environment_config', help='Environment config.') - parser.add_argument( - '--extra_experiments', - default=[], - action='append', - help='Beam experiments config.') - known_args, args = parser.parse_known_args(sys.argv) - sys.argv = args - - flink_job_server_jar = ( - known_args.flink_job_server_jar or - job_server.JavaJarJobServer.path_to_beam_jar( - 'runners:flink:%s:job-server:shadowJar' % - FlinkRunnerOptions.PUBLISHED_FLINK_VERSIONS[-1])) - streaming = known_args.streaming - environment_type = known_args.environment_type.lower() - environment_config = ( - known_args.environment_config if known_args.environment_config else None) - extra_experiments = known_args.extra_experiments - - # This is defined here to only be run when we invoke this file explicitly. - class FlinkRunnerTest(portable_runner_test.PortableRunnerTest): -_use_grpc = True -_use_subprocesses = True - -conf_dir = None -expansion_port = None - -@classmethod -def tearDownClass(cls): - if cls.conf_dir and exists(cls.conf_dir): -_LOGGER.info("removing conf dir: %s" % cls.conf_dir) -rmtree(cls.conf_dir) - super(FlinkRunnerTest, cls).tearDownClass() - -@classmethod -def _create_conf_dir(cls): - """Create (and save a static reference to) a "conf dir", used to provide - metrics configs and verify metrics output - - It gets cleaned up when the suite is done executing""" - - if hasattr(cls, 'conf_dir'): -cls.conf_dir = mkdtemp(prefix='flinktest-conf') - -# path for a FileReporter to write metrics to -cls.test_metrics_path = path.join(cls.conf_dir, 'test-metrics.txt') - -# path to write Flink configuration to -conf_path = path.join(cls.conf_dir, 'flink-conf.yaml') -file_reporter = 'org.apache.beam.runners.flink.metrics.FileReporter' -with open(conf_path, 'w') as f: - f.write( - linesep.join([ - 'metrics.reporters: file', - 'metrics.reporter.file.class: %s' % file_reporter, - 'metrics.reporter.file.path: %s' % cls.test_metrics_path, - 'metrics.scope.operator: ', - ])) - -@classmethod -def _subprocess_command(cls, job_port, expansion_port): - # will be cleaned up at the end of this method, and recreated and used by - # the job server - tmp_dir = mkdtemp(prefix='flinktest') - - cls._create_conf_dir() - cls.expansion_port = expansion_port - - try: -return [ -'java', -'-Dorg.slf4j.simpleLogger.defaultLogLevel=warn', -'-jar', -flink_job_server_jar, -'--flink-master', -'[local]', -'--flink-conf-dir', -cls.conf_dir, -'--artifacts-dir', -tmp_dir, -'--job-port', -str(job_port), -'--artifact-port', -'0', -'--expansion-port', -str(expansion_port), -] - finally: -rmtree(tmp_dir) - -@classmethod -def get_runner(cls): - return portable_runner.PortableRunner() - -@classmethod -def get_expansion_service(cls): - # TODO Move expansion address resides into PipelineOptions - return 'localhost:%s' % cls.expansion_port - -def create_options(self): - options = super(FlinkRunnerTest,
[GitHub] [beam] pabloem commented on pull request #12203: [BEAM-6928] Make Python SDK custom Sink the default Sink for BigQuery
pabloem commented on pull request #12203: URL: https://github.com/apache/beam/pull/12203#issuecomment-666830620 Run Python2_PVR_Flink PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] chamikaramj commented on pull request #12198: Widen ranges for GCP libraries
chamikaramj commented on pull request #12198: URL: https://github.com/apache/beam/pull/12198#issuecomment-666825962 Thanks for clarifying. LGTM. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] chamikaramj commented on a change in pull request #12149: [BEAM-9897] Add cross-language support to SnowflakeIO.Read
chamikaramj commented on a change in pull request #12149: URL: https://github.com/apache/beam/pull/12149#discussion_r463341503 ## File path: sdks/python/apache_beam/io/external/snowflake.py ## @@ -0,0 +1,185 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# pytype: skip-file + +from __future__ import absolute_import + +import typing + +from past.builtins import unicode + +import apache_beam as beam +from apache_beam.transforms.external import BeamJarExpansionService +from apache_beam.transforms.external import ExternalTransform +from apache_beam.transforms.external import NamedTupleBasedPayloadBuilder + +"""Snowflake transforms tested against Flink portable runner. + **Setup** + Transforms provided in this module are cross-language transforms + implemented in the Beam Java SDK. During the pipeline construction, Python SDK + will connect to a Java expansion service to expand these transforms. + To facilitate this, a small amount of setup is needed before using these + transforms in a Beam Python pipeline. + There are several ways to setup cross-language Snowflake transforms. + * Option 1: use the default expansion service + * Option 2: specify a custom expansion service + See below for details regarding each of these options. + *Option 1: Use the default expansion service* + This is the recommended and easiest setup option for using Python Kafka + transforms. This option is only available for Beam 2.22.0 and later. Review comment: Pls drop "This option is only available for Beam 2.22.0 and later". This was just for Kafka. ## File path: sdks/python/apache_beam/io/external/snowflake.py ## @@ -0,0 +1,144 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# pytype: skip-file + +from __future__ import absolute_import + +import typing + +from past.builtins import unicode + +import apache_beam as beam +from apache_beam.transforms.external import BeamJarExpansionService +from apache_beam.transforms.external import ExternalTransform +from apache_beam.transforms.external import NamedTupleBasedPayloadBuilder + +__all__ = ['ReadFromSnowflake'] + + +def default_io_expansion_service(): + return BeamJarExpansionService('sdks:java:io:expansion-service:shadowJar') + + +ReadFromSnowflakeSchema = typing.NamedTuple( +'WriteToSnowflakeSchema', +[ +('server_name', unicode), +('schema', unicode), +('database', unicode), +('staging_bucket_name', unicode), +('storage_integration_name', unicode), +('username', typing.Optional[unicode]), +('password', typing.Optional[unicode]), +('private_key_path', typing.Optional[unicode]), +('private_key_passphrase', typing.Optional[unicode]), +('o_auth_token', typing.Optional[unicode]), +('table', typing.Optional[unicode]), +('query', typing.Optional[unicode]), +]) + + +class ReadFromSnowflake(beam.PTransform): + """An external PTransform which reads from Snowflake.""" + + URN = 'beam:external:java:snowflake:read:v1' + + def __init__( + self, + server_name, + schema, + database, + staging_bucket_name, + storage_integration_name, + csv_mapper, + username=None, + password=None, + private_key_path=None, + private_key_passphrase=None, + o_auth_token=None, + table=None, + query=None, + expansion_service=None): +""" +Initializes a read operation from Snowflake. + +
[GitHub] [beam] tysonjh commented on pull request #12416: Update google-api-services versions.
tysonjh commented on pull request #12416: URL: https://github.com/apache/beam/pull/12416#issuecomment-666812322 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] tysonjh commented on pull request #12416: Update google-api-services versions.
tysonjh commented on pull request #12416: URL: https://github.com/apache/beam/pull/12416#issuecomment-666812699 No, my mistake. It fails locally too, I was running it incorrectly. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] tysonjh commented on pull request #12416: Update google-api-services versions.
tysonjh commented on pull request #12416: URL: https://github.com/apache/beam/pull/12416#issuecomment-666808017 More test failures. NPEs in what seems like unrelated code, but notably the pubsub one again which passes locally when I run it for some reason. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] TheNeuralBit merged pull request #12399: [BEAM-10559] Add apache_beam.examples.sql_taxi
TheNeuralBit merged pull request #12399: URL: https://github.com/apache/beam/pull/12399 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] pabloem commented on pull request #12082: Standardizing BigQuery job names in Beam Python and Java SDKs
pabloem commented on pull request #12082: URL: https://github.com/apache/beam/pull/12082#issuecomment-666774552 Run Python PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] TheNeuralBit commented on a change in pull request #12422: [BEAM-601] Run KinesisIOIT with localstack
TheNeuralBit commented on a change in pull request #12422: URL: https://github.com/apache/beam/pull/12422#discussion_r463325571 ## File path: sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisIOIT.java ## @@ -34,34 +35,65 @@ import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.PCollection; import org.joda.time.Duration; -import org.joda.time.Instant; -import org.junit.BeforeClass; +import org.junit.After; +import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +import org.testcontainers.containers.localstack.LocalStackContainer; /** * Integration test, that writes and reads data to and from real Kinesis. You need to provide {@link * KinesisTestOptions} in order to run this. */ @RunWith(JUnit4.class) public class KinesisIOIT implements Serializable { - private static int numberOfShards; - private static int numberOfRows; + private static final String STREAM_NAME = "beam_kinesis"; + private static final int NUM_RECORDS = 1000; @Rule public TestPipeline pipelineWrite = TestPipeline.create(); @Rule public TestPipeline pipelineRead = TestPipeline.create(); - private static KinesisTestOptions options; - private static final Instant now = Instant.now(); + static { + System.setProperty(SDKGlobalConfiguration.DISABLE_CERT_CHECKING_SYSTEM_PROPERTY, "true"); + System.setProperty(SDKGlobalConfiguration.AWS_CBOR_DISABLE_SYSTEM_PROPERTY, "true"); + } + + private final LocalStackContainer localstackContainer = + new LocalStackContainer("0.11.3") + .withServices(LocalStackContainer.Service.KINESIS) + .withEnv("USE_SSL", "true") + .withStartupAttempts(3); + + private String endpoint; + private String region; + private String accessKey; + private String secretKey; + + @Before + public void setup() throws Exception { +localstackContainer.start(); +endpoint = +localstackContainer +.getEndpointConfiguration(LocalStackContainer.Service.KINESIS) +.getServiceEndpoint() +.replace("http", "https"); +region = +localstackContainer +.getEndpointConfiguration(LocalStackContainer.Service.KINESIS) +.getSigningRegion(); +accessKey = + localstackContainer.getDefaultCredentialsProvider().getCredentials().getAWSAccessKeyId(); +secretKey = + localstackContainer.getDefaultCredentialsProvider().getCredentials().getAWSSecretKey(); + +createStream(); + } - @BeforeClass - public static void setup() { -PipelineOptionsFactory.register(KinesisTestOptions.class); -options = TestPipeline.testingPipelineOptions().as(KinesisTestOptions.class); -numberOfShards = options.getNumberOfShards(); -numberOfRows = options.getNumberOfRecords(); Review comment: I think we should maintain the ability to test against production AWS. Someday maybe we'll get some AWS credits to run this continuously against prod, and it could still be useful for local testing. Could you make it so we only start up a localstack container if nothing in `KinesisTestOptions` is modified? When starting the localstack you could just set the relevant fields in the PipelineOptions and let the rest of the test read them as it does now. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] boyuanzz opened a new pull request #12430: [WIP][BEAM-10303] Scale progress with respect to windows observation.
boyuanzz opened a new pull request #12430: URL: https://github.com/apache/beam/pull/12430 **Please** add a meaningful description for your change here Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`). - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] Update `CHANGES.md` with noteworthy changes. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier). Post-Commit Tests Status (on master branch) Lang | SDK | Dataflow | Flink | Samza | Spark | Twister2 --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) | --- Java | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/i con)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build Status](htt ps://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/) Python | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)[![Build
[GitHub] [beam] jiyongjung0 commented on pull request #12404: [BEAM-10598] Bumps google cloud bigquery to 1.26.1
jiyongjung0 commented on pull request #12404: URL: https://github.com/apache/beam/pull/12404#issuecomment-666770943 Thanks! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] simonepri commented on pull request #12429: Clean ExtractOutput of mean transform
simonepri commented on pull request #12429: URL: https://github.com/apache/beam/pull/12429#issuecomment-666770785 R: @youngoli, @lostluck Feel free to close it without merging if you don't feel it is needed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] amaliujia merged pull request #12417: Remove redundant pre-commits.
amaliujia merged pull request #12417: URL: https://github.com/apache/beam/pull/12417 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] youngoli commented on pull request #12350: [BEAM-10289] Dynamic splitting implementation.
youngoli commented on pull request #12350: URL: https://github.com/apache/beam/pull/12350#issuecomment-666769902 Good idea. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] aaltay commented on pull request #12304: [BEAM-10499] Adds a descriptive toString to SamzaRunner KeyedTimerData
aaltay commented on pull request #12304: URL: https://github.com/apache/beam/pull/12304#issuecomment-666768573 Change LGTM. Could you rebase? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] aaltay commented on pull request #11967: [BEAM-9992] | use Sets transform in BeamSQL
aaltay commented on pull request #11967: URL: https://github.com/apache/beam/pull/11967#issuecomment-666767908 What is the next step for this PR? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] aaltay commented on pull request #12042: [BEAM-9962] Jenkins Plugin
aaltay commented on pull request #12042: URL: https://github.com/apache/beam/pull/12042#issuecomment-666767694 What are the next steps for this PR? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] pabloem commented on a change in pull request #12331: [BEAM-10601] DICOM API Beam IO connector
pabloem commented on a change in pull request #12331: URL: https://github.com/apache/beam/pull/12331#discussion_r463282351 ## File path: sdks/python/apache_beam/io/gcp/dicomio.py ## @@ -372,52 +420,63 @@ def __init__(self, destination_dict, input_type, credential=None): credential: # type: Google credential object, if it is specified, the Http client will use it instead of the default one. """ -self.credential = credential self.destination_dict = destination_dict # input_type pre-check if input_type not in ['bytes', 'fileio']: raise ValueError("input_type could only be 'bytes' or 'fileio'") self.input_type = input_type +self.buffer_size = buffer_size +self.max_workers = max_workers +self.credential = credential def expand(self, pcoll): return pcoll | beam.ParDo( -_StoreInstance(self.destination_dict, self.input_type, self.credential)) +_StoreInstance( +self.destination_dict, +self.input_type, +self.buffer_size, +self.max_workers, +self.credential)) class _StoreInstance(beam.DoFn): """A DoFn read or fetch dicom files then push it to a dicom store.""" - def __init__(self, destination_dict, input_type, credential=None): -self.credential = credential + def __init__( + self, + destination_dict, + input_type, + buffer_size, + max_workers, + credential=None): # pre-check destination dict required_keys = ['project_id', 'region', 'dataset_id', 'dicom_store_id'] for key in required_keys: if key not in destination_dict: raise ValueError('Must have %s in the dict.' % (key)) self.destination_dict = destination_dict self.input_type = input_type +self.buffer_size = buffer_size +self.max_workers = max_workers +self.credential = credential - def process(self, element): + def start_bundle(self): +self.buffer = [] + + def finish_bundle(self): +return self._flush() + + def process(self, element, window=beam.DoFn.WindowParam): +self.buffer.append((element, window)) +if len(self.buffer) >= self.buffer_size: + self._flush() Review comment: you may need to `return self._flush()`, right? ## File path: sdks/python/apache_beam/io/gcp/dicomio.py ## @@ -164,70 +167,109 @@ class DicomSearch(PTransform): } """ - def __init__(self, credential=None): + def __init__(self, buffer_size=8, max_workers=5, credential=None): """Initializes DicomSearch. Args: credential: # type: Google credential object, if it is specified, the Http client will use it to create sessions instead of the default. """ +self.buffer_size = buffer_size +self.max_workers = max_workers self.credential = credential def expand(self, pcoll): -return pcoll | beam.ParDo(_QidoSource(self.credential)) +return pcoll | beam.ParDo( +_QidoSource(self.buffer_size, self.max_workers, self.credential)) class _QidoSource(beam.DoFn): """A DoFn for executing every qido query request.""" - def __init__(self, credential=None): + def __init__(self, buffer_size, max_workers, credential=None): +self.buffer_size = buffer_size +self.max_workers = max_workers self.credential = credential - def process(self, element): + def start_bundle(self): +self.buffer = [] + + def finish_bundle(self): +return self._flush() + + def validate_element(self, element): # Check if all required keys present. required_keys = [ 'project_id', 'region', 'dataset_id', 'dicom_store_id', 'search_type' ] -error_message = None - for key in required_keys: if key not in element: error_message = 'Must have %s in the dict.' % (key) -break - -if not error_message: - project_id = element['project_id'] - region = element['region'] - dataset_id = element['dataset_id'] - dicom_store_id = element['dicom_store_id'] - search_type = element['search_type'] - params = element['params'] if 'params' in element else None - - # Call qido search http client - if element['search_type'] in ['instances', "studies", "series"]: -result, status_code = DicomApiHttpClient().qido_search( - project_id, region, dataset_id, dicom_store_id, - search_type, params, self.credential -) - else: -error_message = ( -'Search type can only be "studies", ' -'"instances" or "series"') - - if not error_message: -out = {} -out['result'] = result -out['status'] = status_code -out['input'] = element -out['success'] = (status_code == 200) -return [out] - -# Return this when the input dict dose not meet the requirements +return False, error_message + +# Check if return type is correct. +if
[GitHub] [beam] aaltay commented on pull request #12320: [BEAM-10512] adds parsedData member variable to HL7v2Message and a Builder for the class
aaltay commented on pull request #12320: URL: https://github.com/apache/beam/pull/12320#issuecomment-666767097 @dranderson1117 - Could someone from healthcare team do a first pass review? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] simonepri opened a new pull request #12429: Clean ExtractOutput of mean transform
simonepri opened a new pull request #12429: URL: https://github.com/apache/beam/pull/12429 ExtractOutput it is not called when the input PCollection is empty so the if statement that this PR removes is reduntant. Post-Commit Tests Status (on master branch) Lang | SDK | Dataflow | Flink | Samza | Spark | Twister2 --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) | --- Java | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/i con)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build Status](htt ps://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/) Python | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_ Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)[![Build
[GitHub] [beam] aaltay commented on pull request #12404: [BEAM-10598] Bumps google cloud bigquery to 1.26.1
aaltay commented on pull request #12404: URL: https://github.com/apache/beam/pull/12404#issuecomment-666766550 I think we can merge this as is, and let #12198 extend the range a bit more. Based on PR description this sounds like a blocker for using google-cloud-storage. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] TheNeuralBit closed pull request #12298: [BEAM-10137][BEAM-10138] Python wrapper KinesisIO integration tests
TheNeuralBit closed pull request #12298: URL: https://github.com/apache/beam/pull/12298 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] TheNeuralBit commented on pull request #12298: [BEAM-10137][BEAM-10138] Python wrapper KinesisIO integration tests
TheNeuralBit commented on pull request #12298: URL: https://github.com/apache/beam/pull/12298#issuecomment-666766488 Closing in favor of https://github.com/apache/beam/pull/12297 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] TheNeuralBit commented on pull request #12234: [BEAM-10138] Add Cross-language KinesisWrite external transform
TheNeuralBit commented on pull request #12234: URL: https://github.com/apache/beam/pull/12234#issuecomment-666766596 Closing in favor of #12297 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] aaltay merged pull request #12404: [BEAM-10598] Bumps google cloud bigquery to 1.26.1
aaltay merged pull request #12404: URL: https://github.com/apache/beam/pull/12404 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] TheNeuralBit commented on pull request #12235: [BEAM-10138] Python wrapper for Cross-language Java's KinesisIO
TheNeuralBit commented on pull request #12235: URL: https://github.com/apache/beam/pull/12235#issuecomment-666766546 Closing in favor of #12297 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] TheNeuralBit closed pull request #12235: [BEAM-10138] Python wrapper for Cross-language Java's KinesisIO
TheNeuralBit closed pull request #12235: URL: https://github.com/apache/beam/pull/12235 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] TheNeuralBit closed pull request #12234: [BEAM-10138] Add Cross-language KinesisWrite external transform
TheNeuralBit closed pull request #12234: URL: https://github.com/apache/beam/pull/12234 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] jiyongjung0 commented on pull request #12404: [BEAM-10598] Bumps google cloud bigquery to 1.26.1
jiyongjung0 commented on pull request #12404: URL: https://github.com/apache/beam/pull/12404#issuecomment-666765978 Thank you for the reviews. I'll wait for #12198 which has wider effect. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] aaltay merged pull request #12372: [BEAM-10545] KernelModel and jest tests
aaltay merged pull request #12372: URL: https://github.com/apache/beam/pull/12372 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] simonepri commented on pull request #12421: Fix go count on an empty pcollection
simonepri commented on pull request #12421: URL: https://github.com/apache/beam/pull/12421#issuecomment-666763106 > In practice, having no elements at all is so uncommon, it's not reasonable to include that in basic utility transforms, since they never apply, and no data = no compute is a better condition to maintain. I agree it is uncommon, but still it is probably good to have some sane default for common operations. For instance (when in doubt) it might be a good idea (or maybe not) to be consistent with what happen in SQL. In SQL the sum of an empty set is null (i.e. in our case an empty PCollection), while the count of an empty set is 0. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] TheNeuralBit commented on a change in pull request #12426: [BEAM-7996] Add support for MapType and Nulls in container types for Python RowCoder
TheNeuralBit commented on a change in pull request #12426: URL: https://github.com/apache/beam/pull/12426#discussion_r463311197 ## File path: model/pipeline/src/main/proto/beam_runner_api.proto ## @@ -855,10 +855,21 @@ message StandardCoders { // BOOLEAN: beam:coder:bool:v1 // BYTES: beam:coder:bytes:v1 // ArrayType: beam:coder:iterable:v1 (always has a known length) -// MapType: not yet a standard coder (BEAM-7996) +// MapType: not a standard coder, specification defined below. // RowType: beam:coder:row:v1 // LogicalType: Uses the coder for its representation. // +// The MapType is encoded by: +// - An INT32 representing the size of the map (N) +// - Followed by N interleaved keys and values, encoded with their +// corresponding coder. +// +// Nullable types in container types (ArrayType, MapType) are encoded by: +// - A one byte null indicator, 0x00 for null values, or 0x01 for present +// values. +// - For present values the null indicator is followed by the value +// encoded with it's corresponding coder. +// Review comment: This is specifically for nullable types that are elements of an Array or keys/values of a Map. For rows we encode nulls in a separate bitmask: https://github.com/apache/beam/blob/587dde57cbb2b0095a1fa04b59798d1b62c66f18/model/pipeline/src/main/proto/beam_runner_api.proto#L837-L843 Java schemas will already let you use nullable types for keys and values in Maps. I doubt anyone is relying on it.. but there's a risk if we disallow it now. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] TheNeuralBit commented on a change in pull request #12426: [BEAM-7996] Add support for MapType and Nulls in container types for Python RowCoder
TheNeuralBit commented on a change in pull request #12426: URL: https://github.com/apache/beam/pull/12426#discussion_r463311197 ## File path: model/pipeline/src/main/proto/beam_runner_api.proto ## @@ -855,10 +855,21 @@ message StandardCoders { // BOOLEAN: beam:coder:bool:v1 // BYTES: beam:coder:bytes:v1 // ArrayType: beam:coder:iterable:v1 (always has a known length) -// MapType: not yet a standard coder (BEAM-7996) +// MapType: not a standard coder, specification defined below. // RowType: beam:coder:row:v1 // LogicalType: Uses the coder for its representation. // +// The MapType is encoded by: +// - An INT32 representing the size of the map (N) +// - Followed by N interleaved keys and values, encoded with their +// corresponding coder. +// +// Nullable types in container types (ArrayType, MapType) are encoded by: +// - A one byte null indicator, 0x00 for null values, or 0x01 for present +// values. +// - For present values the null indicator is followed by the value +// encoded with it's corresponding coder. +// Review comment: This is specifically for nullable types that are elements of an Array or keys/values of a Map. For rows we nulls in a separate bitmask: https://github.com/apache/beam/blob/587dde57cbb2b0095a1fa04b59798d1b62c66f18/model/pipeline/src/main/proto/beam_runner_api.proto#L837-L843 Java schemas will already let you use nullable types for keys and values in Maps. I doubt anyone is relying on it.. but there's a risk if we disallow it now. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] TheNeuralBit commented on a change in pull request #12426: [BEAM-7996] Add support for MapType and Nulls in container types for Python RowCoder
TheNeuralBit commented on a change in pull request #12426: URL: https://github.com/apache/beam/pull/12426#discussion_r463308917 ## File path: sdks/python/apache_beam/coders/coder_impl.py ## @@ -530,6 +530,88 @@ def estimate_size(self, unused_value, nested=False): return 1 +class MapCoderImpl(StreamCoderImpl): + """For internal use only; no backwards-compatibility guarantees. + + A coder for typing.Mapping objects.""" + def __init__( + self, + key_coder, # type: CoderImpl + value_coder # type: CoderImpl + ): +self._key_coder = key_coder +self._value_coder = value_coder + + def encode_to_stream(self, value, out, nested): +size = len(value) +out.write_bigendian_int32(size) +for i, kv in enumerate(value.items()): + key, value = kv + last = i == size - 1 Review comment: Same comment as above, this was just an attempt to replicate the Java logic. I could change them both if you think we should This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] TheNeuralBit commented on a change in pull request #12426: [BEAM-7996] Add support for MapType and Nulls in container types for Python RowCoder
TheNeuralBit commented on a change in pull request #12426: URL: https://github.com/apache/beam/pull/12426#discussion_r463308693 ## File path: sdks/python/apache_beam/coders/coder_impl.py ## @@ -530,6 +530,88 @@ def estimate_size(self, unused_value, nested=False): return 1 +class MapCoderImpl(StreamCoderImpl): + """For internal use only; no backwards-compatibility guarantees. + + A coder for typing.Mapping objects.""" + def __init__( + self, + key_coder, # type: CoderImpl + value_coder # type: CoderImpl + ): +self._key_coder = key_coder +self._value_coder = value_coder + + def encode_to_stream(self, value, out, nested): +size = len(value) +out.write_bigendian_int32(size) Review comment: Yeah I thought about that too. I was just trying to exactly replicate what we do in [MapCoder.java](https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/MapCoder.java) for now though. Do you think I should switch this and MapCoder.java over to using varint here? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] pabloem commented on pull request #12203: [BEAM-6928] Make Python SDK custom Sink the default Sink for BigQuery
pabloem commented on pull request #12203: URL: https://github.com/apache/beam/pull/12203#issuecomment-666747008 Run Python2_PVR_Flink PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] ibzib opened a new pull request #12428: Update Flink Version Compatibility table
ibzib opened a new pull request #12428: URL: https://github.com/apache/beam/pull/12428 R: @mxm Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`). - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] Update `CHANGES.md` with noteworthy changes. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier). Post-Commit Tests Status (on master branch) Lang | SDK | Dataflow | Flink | Samza | Spark | Twister2 --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) | --- Java | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/i con)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build Status](htt ps://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/) Python | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)[![Build
[GitHub] [beam] lostluck commented on pull request #12350: [BEAM-10289] Dynamic splitting implementation.
lostluck commented on pull request #12350: URL: https://github.com/apache/beam/pull/12350#issuecomment-666735372 Consider updating the PR description to match the current state :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] pabloem commented on pull request #12203: [BEAM-6928] Make Python SDK custom Sink the default Sink for BigQuery
pabloem commented on pull request #12203: URL: https://github.com/apache/beam/pull/12203#issuecomment-666735046 Run Python2_PVR_Flink PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] lostluck merged pull request #12402: [BEAM-9679] Add Composite transforms to Go SDK katas
lostluck merged pull request #12402: URL: https://github.com/apache/beam/pull/12402 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] pabloem commented on a change in pull request #12082: Standardizing BigQuery job names in Beam Python and Java SDKs
pabloem commented on a change in pull request #12082: URL: https://github.com/apache/beam/pull/12082#discussion_r463295587 ## File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java ## @@ -457,6 +456,22 @@ * for security and permission related information specific to BigQuery. */ public class BigQueryIO { + + /** + * Template for BigQuery jobs created by BigQueryIO. This template is: {@code + * "beam_bq_job_{TYPE}_{JOB_ID}_{STEP}_{RANDOM}"}, where: + * + * + * {@code TYPE} represents the BigQuery job type (e.g. extract / copy / load / query) + * {@code JOB_ID} is the Dataflow job name. Review comment: ah the job name comes from Beam, and is autogenerated if one is not provided. Changed the documentation to express that. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] pabloem commented on pull request #12082: Standardizing BigQuery job names in Beam Python and Java SDKs
pabloem commented on pull request #12082: URL: https://github.com/apache/beam/pull/12082#issuecomment-666732227 Run Java PostCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] robertwb commented on a change in pull request #12426: [BEAM-7996] Add support for MapType and Nulls in container types for Python RowCoder
robertwb commented on a change in pull request #12426: URL: https://github.com/apache/beam/pull/12426#discussion_r463291968 ## File path: model/pipeline/src/main/proto/beam_runner_api.proto ## @@ -855,10 +855,21 @@ message StandardCoders { // BOOLEAN: beam:coder:bool:v1 // BYTES: beam:coder:bytes:v1 // ArrayType: beam:coder:iterable:v1 (always has a known length) -// MapType: not yet a standard coder (BEAM-7996) +// MapType: not a standard coder, specification defined below. // RowType: beam:coder:row:v1 // LogicalType: Uses the coder for its representation. // +// The MapType is encoded by: +// - An INT32 representing the size of the map (N) +// - Followed by N interleaved keys and values, encoded with their +// corresponding coder. +// +// Nullable types in container types (ArrayType, MapType) are encoded by: +// - A one byte null indicator, 0x00 for null values, or 0x01 for present +// values. +// - For present values the null indicator is followed by the value +// encoded with it's corresponding coder. +// Review comment: For maps specifically, do we want to allow null keys? Is it valuable to have null values (as distinct from just not present)? I might lean towards disallowing nulls and then possibly allowing it in the future if we have good reason to, which will be forward compatible. ## File path: sdks/python/apache_beam/coders/coder_impl.py ## @@ -530,6 +530,88 @@ def estimate_size(self, unused_value, nested=False): return 1 +class MapCoderImpl(StreamCoderImpl): + """For internal use only; no backwards-compatibility guarantees. + + A coder for typing.Mapping objects.""" + def __init__( + self, + key_coder, # type: CoderImpl + value_coder # type: CoderImpl + ): +self._key_coder = key_coder +self._value_coder = value_coder + + def encode_to_stream(self, value, out, nested): +size = len(value) +out.write_bigendian_int32(size) Review comment: We don't care about larger maps? Also, why not varint? (If that's what's conventionally used elsewhere, I'm fine with that, but iterable coder uses varint64.) ## File path: model/pipeline/src/main/proto/beam_runner_api.proto ## @@ -855,10 +855,21 @@ message StandardCoders { // BOOLEAN: beam:coder:bool:v1 // BYTES: beam:coder:bytes:v1 // ArrayType: beam:coder:iterable:v1 (always has a known length) -// MapType: not yet a standard coder (BEAM-7996) +// MapType: not a standard coder, specification defined below. // RowType: beam:coder:row:v1 // LogicalType: Uses the coder for its representation. // +// The MapType is encoded by: +// - An INT32 representing the size of the map (N) +// - Followed by N interleaved keys and values, encoded with their +// corresponding coder. +// +// Nullable types in container types (ArrayType, MapType) are encoded by: +// - A one byte null indicator, 0x00 for null values, or 0x01 for present +// values. +// - For present values the null indicator is followed by the value +// encoded with it's corresponding coder. +// Review comment: Is this just the encoding of a nullable type? (Why does it have to be called out specially?) ## File path: sdks/python/apache_beam/coders/coder_impl.py ## @@ -530,6 +530,88 @@ def estimate_size(self, unused_value, nested=False): return 1 +class MapCoderImpl(StreamCoderImpl): + """For internal use only; no backwards-compatibility guarantees. + + A coder for typing.Mapping objects.""" + def __init__( + self, + key_coder, # type: CoderImpl + value_coder # type: CoderImpl + ): +self._key_coder = key_coder +self._value_coder = value_coder + + def encode_to_stream(self, value, out, nested): +size = len(value) +out.write_bigendian_int32(size) +for i, kv in enumerate(value.items()): + key, value = kv + last = i == size - 1 Review comment: Just nest everything. That'll simplify the logic and the definition of the coder. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] mf2199 commented on a change in pull request #11295: [BEAM-3342] CBT `Read` IO Connector Wrapper
mf2199 commented on a change in pull request #11295: URL: https://github.com/apache/beam/pull/11295#discussion_r463294587 ## File path: sdks/python/apache_beam/io/gcp/bigtableio.py ## @@ -1,151 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -#http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -"""BigTable connector - -This module implements writing to BigTable tables. Review comment: If I understand correctly, we are to restore this file in the original version while keeping the altered one in the `experimental` folder. If so, then it's done. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] pabloem commented on pull request #12203: [BEAM-6928] Make Python SDK custom Sink the default Sink for BigQuery
pabloem commented on pull request #12203: URL: https://github.com/apache/beam/pull/12203#issuecomment-666729271 Run Python2_PVR_Flink PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] robertwb merged pull request #12362: Convert katas to use with syntax rather than explicit run call.
robertwb merged pull request #12362: URL: https://github.com/apache/beam/pull/12362 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] robinyqiu commented on a change in pull request #12348: [BEAM-10240] Support ZetaSQL DATETIME functions in BeamSQL
robinyqiu commented on a change in pull request #12348: URL: https://github.com/apache/beam/pull/12348#discussion_r463286427 ## File path: sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlBeamTranslationUtils.java ## @@ -184,6 +188,19 @@ private static Value beamLogicalObjectToZetaSqlValue(Object object, String ident } else { // input type return Value.createTimeValue(CivilTimeEncoder.encodePacked64TimeNanos((LocalTime) object)); } +} else if (SqlTypes.DATETIME.getIdentifier().equals(identifier)) { + // DateTime value + LocalDateTime datetime; + if (object instanceof Row) { // base type +datetime = +LocalDateTime.of( +LocalDate.ofEpochDay(((Row) object).getValue("Date")), +LocalTime.ofNanoOfDay(((Row) object).getValue("Time"))); + } else { // input type +datetime = (LocalDateTime) object; + } + return Value.createDatetimeValue( Review comment: I created a JIRA for this: https://issues.apache.org/jira/browse/BEAM-10611 Please add a TODO here such that we can make improvement on this in the future. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] y1chi commented on pull request #12427: nexmark python queries 0, 1, 2 and 9
y1chi commented on pull request #12427: URL: https://github.com/apache/beam/pull/12427#issuecomment-666716230 Could you rename the title to be more meaningful and add `[BEAM-2855]` in the front so that it will be hooked with the right JIRA ticket https://issues.apache.org/jira/browse/BEAM-2855. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] TheNeuralBit commented on a change in pull request #12399: [BEAM-10559] Add apache_beam.examples.sql_taxi
TheNeuralBit commented on a change in pull request #12399: URL: https://github.com/apache/beam/pull/12399#discussion_r463279232 ## File path: sdks/python/apache_beam/examples/sql_taxi.py ## @@ -0,0 +1,97 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""An example that processes streaming NYC Taxi data with SqlTransform. + + This example reads from the PubSub NYC Taxi stream described in Review comment: Done, thanks This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] pabloem commented on pull request #12151: [BEAM-9896] Add streaming for SnowflakeIO.Write to Java SDK
pabloem commented on pull request #12151: URL: https://github.com/apache/beam/pull/12151#issuecomment-666703403 One more thing: Please add a description of the change to `CHANGES.md`, so we can have this on the release notes. Then I can merge. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] pabloem commented on a change in pull request #12151: [BEAM-9896] Add streaming for SnowflakeIO.Write to Java SDK
pabloem commented on a change in pull request #12151: URL: https://github.com/apache/beam/pull/12151#discussion_r463273363 ## File path: sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/services/SnowflakeStreamingServiceConfig.java ## @@ -20,26 +20,51 @@ import java.util.List; import net.snowflake.ingest.SimpleIngestManager; +/** Class for preparing configuration for streaming write. */ public class SnowflakeStreamingServiceConfig extends ServiceConfig { - SimpleIngestManager ingestManager; - List filesList; - String stagingBucketDir; + private SimpleIngestManager ingestManager; + private List filesList; + private String stagingBucketDir; Review comment: should these attributes also be final? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] pabloem commented on a change in pull request #12151: [BEAM-9896] Add streaming for SnowflakeIO.Write to Java SDK
pabloem commented on a change in pull request #12151: URL: https://github.com/apache/beam/pull/12151#discussion_r463219932 ## File path: sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/SnowflakeIO.java ## @@ -719,7 +906,9 @@ private void checkArguments() { return getUserDataMapper().mapRow(element); } })) - .apply("Map Objects array to CSV lines", ParDo.of(new MapObjectsArrayToCsvFn())) + .apply( + "Map Objects array to CSV lines", + ParDo.of(new MapObjectsArrayToCsvFn(getQuotationMark( Review comment: that makes sense to me. Thanks Kasia! ## File path: sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/SnowflakeIO.java ## @@ -684,14 +819,61 @@ private void checkArguments() { (getDataSourceProviderFn() != null), "withDataSourceConfiguration() or withDataSourceProviderFn() is required"); - checkArgument(getTable() != null, "withTable() is required"); + if (input.isBounded() == PCollection.IsBounded.UNBOUNDED) { +checkArgument(getSnowPipe() != null, "withSnowPipe() is required"); + } else { +checkArgument(getTable() != null, "to() is required"); + } } -private PCollection write(PCollection input, String stagingBucketDir) { +private PCollection writeStream(PCollection input, String stagingBucketDir) { SnowflakeService snowflakeService = - getSnowflakeService() != null ? getSnowflakeService() : new SnowflakeServiceImpl(); + getSnowflakeService() != null + ? getSnowflakeService() + : new SnowflakeStreamingServiceImpl(); + + /* Ensure that files will be created after specific record count or duration specified */ + PCollection inputInGlobalWindow = + input.apply( + "rewindowIntoGlobal", + Window.into(new GlobalWindows()) + .triggering( + Repeatedly.forever( + AfterFirst.of( + AfterProcessingTime.pastFirstElementInPane() + .plusDelayOf(getFlushTimeLimit()), + AfterPane.elementCountAtLeast(getFlushRowLimit() + .discardingFiredPanes()); + + int shards = (getShardsNumber() > 0) ? getShardsNumber() : DEFAULT_STREAMING_SHARDS_NUMBER; + PCollection files = writeFiles(inputInGlobalWindow, stagingBucketDir, shards); + + /* Ensuring that files will be ingested after flush time */ + files = + (PCollection) + files.apply( + "applyUserTrigger", + Window.into(new GlobalWindows()) + .triggering( + Repeatedly.forever( + AfterProcessingTime.pastFirstElementInPane() + .plusDelayOf(getFlushTimeLimit( + .discardingFiredPanes()); Review comment: I think we can leave this as it is. I see that the trigger is not the exact same. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] robinyqiu commented on pull request #12403: [BEAM-10597] Propagate BigQuery streaming insert throttled time to Dataflow worker
robinyqiu commented on pull request #12403: URL: https://github.com/apache/beam/pull/12403#issuecomment-96191 Run Python PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] lostluck merged pull request #12421: Fix go count on an empty pcollection
lostluck merged pull request #12421: URL: https://github.com/apache/beam/pull/12421 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] danielxjd commented on a change in pull request #12223: [Beam-4379] Make ParquetIO read splittable
danielxjd commented on a change in pull request #12223: URL: https://github.com/apache/beam/pull/12223#discussion_r463259859 ## File path: sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java ## @@ -235,12 +277,195 @@ public ReadFiles withAvroDataModel(GenericData model) { return toBuilder().setAvroDataModel(model).build(); } +public ReadFiles withSplit() { + return toBuilder().setSplit(true).build(); +} + @Override public PCollection expand(PCollection input) { checkNotNull(getSchema(), "Schema can not be null"); - return input - .apply(ParDo.of(new ReadFn(getAvroDataModel( - .setCoder(AvroCoder.of(getSchema())); + if (!getSplit()) { +return input +.apply(ParDo.of(new SplitReadFn(getAvroDataModel( +.setCoder(AvroCoder.of(getSchema())); + } else { +return input +.apply(ParDo.of(new ReadFn(getAvroDataModel( +.setCoder(AvroCoder.of(getSchema())); + } +} + +@DoFn.BoundedPerElement +static class SplitReadFn extends DoFn { + private Class modelClass; + private static final Logger LOG = LoggerFactory.getLogger(SplitReadFn.class); + ReadSupport readSupport; + + SplitReadFn(GenericData model) { +this.modelClass = model != null ? model.getClass() : null; + } + + private static Map> toSetMultiMap(Map map) { +Map> setMultiMap = new HashMap>(); +for (Map.Entry entry : map.entrySet()) { + Set set = new HashSet(); + set.add(entry.getValue()); + setMultiMap.put(entry.getKey(), Collections.unmodifiableSet(set)); +} +return Collections.unmodifiableMap(setMultiMap); + } + + private InputFile getInputFile(FileIO.ReadableFile file) throws IOException { +if (!file.getMetadata().isReadSeekEfficient()) { + throw new RuntimeException( + String.format("File has to be seekable: %s", file.getMetadata().resourceId())); +} +return new BeamParquetInputFile(file.openSeekable()); + } + + @ProcessElement + public void processElement( + @Element FileIO.ReadableFile file, + RestrictionTracker tracker, + OutputReceiver outputReceiver) + throws Exception { +ReadSupport readSupport; +InputFile inputFile = getInputFile(file); +Configuration conf = setConf(); +GenericData model = null; +if (modelClass != null) { + model = (GenericData) modelClass.getMethod("get").invoke(null); +} +readSupport = new AvroReadSupport(model); +ParquetReadOptions options = HadoopReadOptions.builder(conf).build(); +ParquetFileReader reader = ParquetFileReader.open(inputFile, options); +Filter filter = checkNotNull(options.getRecordFilter(), "filter"); +conf = ((HadoopReadOptions) options).getConf(); +for (String property : options.getPropertyNames()) { + conf.set(property, options.getProperty(property)); +} +FileMetaData parquetFileMetadata = reader.getFooter().getFileMetaData(); +MessageType fileSchema = parquetFileMetadata.getSchema(); +Map fileMetadata = parquetFileMetadata.getKeyValueMetaData(); + +ReadSupport.ReadContext readContext = +readSupport.init(new InitContext(conf, toSetMultiMap(fileMetadata), fileSchema)); +ColumnIOFactory columnIOFactory = new ColumnIOFactory(parquetFileMetadata.getCreatedBy()); +MessageType requestedSchema = readContext.getRequestedSchema(); +RecordMaterializer recordConverter = +readSupport.prepareForRead(conf, fileMetadata, fileSchema, readContext); +boolean strictTypeChecking = options.isEnabled(STRICT_TYPE_CHECKING, true); +boolean filterRecords = options.useRecordFilter(); +reader.setRequestedSchema(requestedSchema); +MessageColumnIO columnIO = +columnIOFactory.getColumnIO(requestedSchema, fileSchema, strictTypeChecking); +long currentBlock = tracker.currentRestriction().getFrom(); +for (int i = 0; i < currentBlock; i++) { + reader.skipNextRowGroup(); +} +while (tracker.tryClaim(currentBlock)) { + + LOG.info("reading block" + currentBlock); + PageReadStore pages = reader.readNextRowGroup(); + currentBlock += 1; + RecordReader recordReader = + columnIO.getRecordReader( + pages, recordConverter, filterRecords ? filter : FilterCompat.NOOP); + long currentRow = 0; + long totalRows = pages.getRowCount(); + while (currentRow < totalRows) { +try { + GenericRecord record; + currentRow += 1; + try { +record = recordReader.read(); + } catch
[GitHub] [beam] leiyiz commented on pull request #12427: nexmark python queries 0, 1, 2 and 9
leiyiz commented on pull request #12427: URL: https://github.com/apache/beam/pull/12427#issuecomment-40310 R: @pabloem R: @y1chi This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] leiyiz opened a new pull request #12427: nexmark python queries 0, 1, 2 and 9
leiyiz opened a new pull request #12427: URL: https://github.com/apache/beam/pull/12427 changed queries 0-2, implemented query9, coders for all models, new models for query results, changed launcher to count and print results to file Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`). - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] Update `CHANGES.md` with noteworthy changes. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier). Post-Commit Tests Status (on master branch) Lang | SDK | Dataflow | Flink | Samza | Spark | Twister2 --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) | --- Java | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/i con)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build Status](htt ps://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/) Python | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build
[GitHub] [beam] udim commented on pull request #12009: [BEAM-10258] Support type hint annotations on PTransform's expand()
udim commented on pull request #12009: URL: https://github.com/apache/beam/pull/12009#issuecomment-31741 Run Python2_PVR_Flink PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] leiyiz closed pull request #12365: [BEAM-8258] python nexmark query 0-2 improvements
leiyiz closed pull request #12365: URL: https://github.com/apache/beam/pull/12365 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] lostluck commented on a change in pull request #12426: [BEAM-7996] Add support for MapType and Nulls in container types for Python RowCoder
lostluck commented on a change in pull request #12426: URL: https://github.com/apache/beam/pull/12426#discussion_r463208404 ## File path: model/pipeline/src/main/proto/beam_runner_api.proto ## @@ -855,10 +855,21 @@ message StandardCoders { // BOOLEAN: beam:coder:bool:v1 // BYTES: beam:coder:bytes:v1 // ArrayType: beam:coder:iterable:v1 (always has a known length) -// MapType: not yet a standard coder (BEAM-7996) +// MapType: not a standard coder, specification defined below. // RowType: beam:coder:row:v1 // LogicalType: Uses the coder for its representation. // +// The MapType is encoded by: +// - An INT32 representing the size of the map (N) +// - Followed by N interleaved keys and values, encoded with their +// corresponding coder. +// +// Nullable types in container types (ArrayType, MapType) are encoded by: +// - A one byte null indicator, 0x00 for null values, or 0x01 for present +// values. +// - For present values the null indicator is followed by the value +// encoded with it's corresponding coder. +// Review comment: Ack! Thanks! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] TheNeuralBit commented on pull request #12426: [BEAM-7996] Add support for MapType and Nulls in container types for Python RowCoder
TheNeuralBit commented on pull request #12426: URL: https://github.com/apache/beam/pull/12426#issuecomment-666590444 This is relevant for BEAM-10571. Some existing payloads use `List>` fields, which we could replace with Map when using schemas to encode the payload. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] TheNeuralBit edited a comment on pull request #12426: [BEAM-7996] Add support for MapType and Nulls in container types for Python RowCoder
TheNeuralBit edited a comment on pull request #12426: URL: https://github.com/apache/beam/pull/12426#issuecomment-666590444 This is relevant for BEAM-10571. Some existing payloads use `List>` fields, which we could replace with `Map` when using schemas to encode the payload. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] TheNeuralBit commented on a change in pull request #12426: [BEAM-7996] Add support for MapType and Nulls in container types for Python RowCoder
TheNeuralBit commented on a change in pull request #12426: URL: https://github.com/apache/beam/pull/12426#discussion_r463194957 ## File path: model/pipeline/src/main/proto/beam_runner_api.proto ## @@ -855,10 +855,21 @@ message StandardCoders { // BOOLEAN: beam:coder:bool:v1 // BYTES: beam:coder:bytes:v1 // ArrayType: beam:coder:iterable:v1 (always has a known length) -// MapType: not yet a standard coder (BEAM-7996) +// MapType: not a standard coder, specification defined below. // RowType: beam:coder:row:v1 // LogicalType: Uses the coder for its representation. // +// The MapType is encoded by: +// - An INT32 representing the size of the map (N) +// - Followed by N interleaved keys and values, encoded with their +// corresponding coder. +// +// Nullable types in container types (ArrayType, MapType) are encoded by: +// - A one byte null indicator, 0x00 for null values, or 0x01 for present +// values. +// - For present values the null indicator is followed by the value +// encoded with it's corresponding coder. +// Review comment: FYI @lostluck - previously the encoding for MapType and for nullable types within containers was not documented. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] KevinGG commented on a change in pull request #12372: [BEAM-10545] KernelModel and jest tests
KevinGG commented on a change in pull request #12372: URL: https://github.com/apache/beam/pull/12372#discussion_r463182344 ## File path: sdks/python/apache_beam/runners/interactive/extensions/apache-beam-jupyterlab-sidepanel/src/__tests__/kernel/KernelModel.test.ts ## @@ -0,0 +1,124 @@ +// Licensed under the Apache License, Version 2.0 (the 'License'); you may not +// use this file except in compliance with the License. You may obtain a copy of +// the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an 'AS IS' BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations under +// the License. + +/** + * Tests for KernelModel module. + * + * Non camelcase fields are nbformat fields used in notebooks. Lint is ignored + * for them. + */ + +import { KernelModel } from '../../kernel/KernelModel'; + +const fakeSessionContext = { + session: { +kernel: { + requestExecute: function(): object { +return { + onIOPub: function(): void { +// do nothing + } +}; + } +} + } +}; + +it('creates new future with IOPub callbacks when executing new code in kernel', () => { + const kernelModel = new KernelModel(fakeSessionContext as any); + kernelModel.execute('new code'); + expect(kernelModel.future).not.toBe(null); + expect(kernelModel.future.onIOPub).not.toBe(null); +}); + +it('handles execute result from IOPub channel', () => { + const kernelModel = new KernelModel(fakeSessionContext as any); + kernelModel.execute('any code'); + kernelModel.future.onIOPub({ +header: { + // eslint-disable-next-line @typescript-eslint/camelcase + msg_type: 'execute_result' +}, +content: { + data: { +'text/plain': + '\'{"pipelineId": {"metadata": {"name": "pipeline", "inMemoryId": 1, "type": "pipeline"}, "pcolls": {"pcollId": {"name": "pcoll", "inMemoryId": 2, "type": "pcollection"\'' + }, + channel: 'iopub' +} + } as any); + expect(kernelModel.executeResult).toEqual({ +pipelineId: { + metadata: { +name: 'pipeline', +inMemoryId: 1, +type: 'pipeline' + }, + pcolls: { +pcollId: { + name: 'pcoll', + inMemoryId: 2, + type: 'pcollection' +} + } +} + }); +}); + +it('handles display data from IOPub channel', () => { + const kernelModel = new KernelModel(fakeSessionContext as any); + kernelModel.execute('any code'); + const displayData = { +// eslint-disable-next-line @typescript-eslint/camelcase +output_type: 'display_data', +data: { + 'text/html': '', + 'application/javascript': 'console.log(1);' Review comment: It's a valid Javascript statement that logs to the console. In the test, if it is executed. The result shows up in the `terminal`. This unit test doesn't do the execution though because it's just a data model not a UI component. ## File path: sdks/python/apache_beam/runners/interactive/extensions/apache-beam-jupyterlab-sidepanel/src/__tests__/kernel/KernelModel.test.ts ## @@ -0,0 +1,124 @@ +// Licensed under the Apache License, Version 2.0 (the 'License'); you may not +// use this file except in compliance with the License. You may obtain a copy of +// the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an 'AS IS' BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations under +// the License. + +/** + * Tests for KernelModel module. + * + * Non camelcase fields are nbformat fields used in notebooks. Lint is ignored + * for them. + */ + +import { KernelModel } from '../../kernel/KernelModel'; + +const fakeSessionContext = { + session: { +kernel: { + requestExecute: function(): object { +return { + onIOPub: function(): void { +// do nothing + } +}; + } +} + } +}; + +it('creates new future with IOPub callbacks when executing new code in kernel', () => { + const kernelModel = new KernelModel(fakeSessionContext as any); + kernelModel.execute('new code'); + expect(kernelModel.future).not.toBe(null); + expect(kernelModel.future.onIOPub).not.toBe(null); +}); + +it('handles execute result from IOPub channel', () => { + const kernelModel = new KernelModel(fakeSessionContext as any); + kernelModel.execute('any code'); + kernelModel.future.onIOPub({ +header: { + // eslint-disable-next-line @typescript-eslint/camelcase + msg_type: 'execute_result' +},
[GitHub] [beam] TheNeuralBit commented on a change in pull request #12426: [BEAM-7996] Add support for MapType and Nulls in container types for Python RowCoder
TheNeuralBit commented on a change in pull request #12426: URL: https://github.com/apache/beam/pull/12426#discussion_r463194527 ## File path: runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CommonCoderTest.java ## @@ -366,12 +368,15 @@ private static Object parseField(Object value, Schema.FieldType fieldType) { .map((element) -> parseField(element, fieldType.getCollectionElementType())) .collect(toImmutableList()); case MAP: -Map kvMap = (Map) value; -return kvMap.entrySet().stream() -.collect( -toImmutableMap( -(pair) -> parseField(pair.getKey(), fieldType.getMapKeyType()), -(pair) -> parseField(pair.getValue(), fieldType.getMapValueType(; +Map kvMap = new HashMap<>(); +((Map) value) +.entrySet().stream() +.forEach( +(entry) -> +kvMap.put( +parseField(entry.getKey(), fieldType.getMapKeyType()), +parseField(entry.getValue(), fieldType.getMapValueType(; +return kvMap; Review comment: This change is necessary because `ImmutableMap` (as well as `Collectors.toMap`) does not allow null values, so it errors on the new test cases. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] TheNeuralBit opened a new pull request #12426: [BEAM-7996] Add support for MapType and Nulls in container types for Python RowCoder
TheNeuralBit opened a new pull request #12426: URL: https://github.com/apache/beam/pull/12426 This PR adds support for MapType in the Python RowCoder, as well as support for encoding nulls inside of container types (NullableCoder). Post-Commit Tests Status (on master branch) Lang | SDK | Dataflow | Flink | Samza | Spark | Twister2 --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) | --- Java | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/i con)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build Status](htt ps://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/) Python | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_ Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)[![Build
[GitHub] [beam] KevinGG commented on pull request #12372: [BEAM-10545] KernelModel and jest tests
KevinGG commented on pull request #12372: URL: https://github.com/apache/beam/pull/12372#issuecomment-666586747 > LGTM. I do not believe I am a good typescript reviewer. It would be good if you can ask on dev@ and see if there could be better reviewers for future PRs. Thanks! For typescript review, Jason has agreed to take a look at my future PRs. He is the TL of CAIP notebooks and works on other extensions: https://github.com/GoogleCloudPlatform/jupyter-extensions. I'll still need your and other committer's feedback as how the code fits in the Beam repo and how others can contribute to it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] KevinGG commented on pull request #12372: [BEAM-10545] KernelModel and jest tests
KevinGG commented on pull request #12372: URL: https://github.com/apache/beam/pull/12372#issuecomment-666584845 > Retracting my LGTM, I have some questions. > > And a high level question, what is the purpose of this module? (Could you add more details to what you wrote as part of the PR description?) The purpose of this module is to allow the extension to execute code in the kernel silently to retrieve information of current interactive environment, then handles IOPub messages sent back from the kernel and allow other components to use this module as a data model. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] KevinGG commented on a change in pull request #12372: [BEAM-10545] KernelModel and jest tests
KevinGG commented on a change in pull request #12372: URL: https://github.com/apache/beam/pull/12372#discussion_r463182344 ## File path: sdks/python/apache_beam/runners/interactive/extensions/apache-beam-jupyterlab-sidepanel/src/__tests__/kernel/KernelModel.test.ts ## @@ -0,0 +1,124 @@ +// Licensed under the Apache License, Version 2.0 (the 'License'); you may not +// use this file except in compliance with the License. You may obtain a copy of +// the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an 'AS IS' BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations under +// the License. + +/** + * Tests for KernelModel module. + * + * Non camelcase fields are nbformat fields used in notebooks. Lint is ignored + * for them. + */ + +import { KernelModel } from '../../kernel/KernelModel'; + +const fakeSessionContext = { + session: { +kernel: { + requestExecute: function(): object { +return { + onIOPub: function(): void { +// do nothing + } +}; + } +} + } +}; + +it('creates new future with IOPub callbacks when executing new code in kernel', () => { + const kernelModel = new KernelModel(fakeSessionContext as any); + kernelModel.execute('new code'); + expect(kernelModel.future).not.toBe(null); + expect(kernelModel.future.onIOPub).not.toBe(null); +}); + +it('handles execute result from IOPub channel', () => { + const kernelModel = new KernelModel(fakeSessionContext as any); + kernelModel.execute('any code'); + kernelModel.future.onIOPub({ +header: { + // eslint-disable-next-line @typescript-eslint/camelcase + msg_type: 'execute_result' +}, +content: { + data: { +'text/plain': + '\'{"pipelineId": {"metadata": {"name": "pipeline", "inMemoryId": 1, "type": "pipeline"}, "pcolls": {"pcollId": {"name": "pcoll", "inMemoryId": 2, "type": "pcollection"\'' + }, + channel: 'iopub' +} + } as any); + expect(kernelModel.executeResult).toEqual({ +pipelineId: { + metadata: { +name: 'pipeline', +inMemoryId: 1, +type: 'pipeline' + }, + pcolls: { +pcollId: { + name: 'pcoll', + inMemoryId: 2, + type: 'pcollection' +} + } +} + }); +}); + +it('handles display data from IOPub channel', () => { + const kernelModel = new KernelModel(fakeSessionContext as any); + kernelModel.execute('any code'); + const displayData = { +// eslint-disable-next-line @typescript-eslint/camelcase +output_type: 'display_data', +data: { + 'text/html': '', + 'application/javascript': 'console.log(1);' Review comment: It's a valid Javascript statement that logs to the console. In the test, it is executed. The result shows up in the `terminal`. This unit test doesn't do the execution though because it's just a data model not a UI component. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] KevinGG commented on a change in pull request #12372: [BEAM-10545] KernelModel and jest tests
KevinGG commented on a change in pull request #12372: URL: https://github.com/apache/beam/pull/12372#discussion_r463188165 ## File path: sdks/python/apache_beam/runners/interactive/extensions/apache-beam-jupyterlab-sidepanel/src/__tests__/kernel/KernelModel.test.ts ## @@ -0,0 +1,124 @@ +// Licensed under the Apache License, Version 2.0 (the 'License'); you may not +// use this file except in compliance with the License. You may obtain a copy of +// the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an 'AS IS' BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations under +// the License. + +/** + * Tests for KernelModel module. + * + * Non camelcase fields are nbformat fields used in notebooks. Lint is ignored + * for them. + */ + +import { KernelModel } from '../../kernel/KernelModel'; + +const fakeSessionContext = { + session: { +kernel: { + requestExecute: function(): object { +return { + onIOPub: function(): void { +// do nothing + } +}; + } +} + } +}; + +it('creates new future with IOPub callbacks when executing new code in kernel', () => { + const kernelModel = new KernelModel(fakeSessionContext as any); + kernelModel.execute('new code'); + expect(kernelModel.future).not.toBe(null); + expect(kernelModel.future.onIOPub).not.toBe(null); +}); + +it('handles execute result from IOPub channel', () => { + const kernelModel = new KernelModel(fakeSessionContext as any); + kernelModel.execute('any code'); + kernelModel.future.onIOPub({ +header: { + // eslint-disable-next-line @typescript-eslint/camelcase + msg_type: 'execute_result' +}, +content: { + data: { +'text/plain': + '\'{"pipelineId": {"metadata": {"name": "pipeline", "inMemoryId": 1, "type": "pipeline"}, "pcolls": {"pcollId": {"name": "pcoll", "inMemoryId": 2, "type": "pcollection"\'' + }, + channel: 'iopub' +} + } as any); + expect(kernelModel.executeResult).toEqual({ +pipelineId: { + metadata: { +name: 'pipeline', +inMemoryId: 1, +type: 'pipeline' + }, + pcolls: { +pcollId: { + name: 'pcoll', + inMemoryId: 2, + type: 'pcollection' +} + } +} + }); +}); + +it('handles display data from IOPub channel', () => { + const kernelModel = new KernelModel(fakeSessionContext as any); + kernelModel.execute('any code'); + const displayData = { +// eslint-disable-next-line @typescript-eslint/camelcase +output_type: 'display_data', +data: { + 'text/html': '', + 'application/javascript': 'console.log(1);' +}, +metadata: { + some: 'data' +} + }; + + kernelModel.future.onIOPub({ +header: { + // eslint-disable-next-line @typescript-eslint/camelcase + msg_type: 'display_data' +}, +content: displayData + } as any); + expect(kernelModel.displayData).toEqual([displayData]); +}); + +it('handles display update from IOPub channel', () => { Review comment: Think of `display data` as incrementally displaying contents while `display update` as modifying contents already displayed. An example, - display data is [1, 2 ,3], you'll see 1 2 3 being displayed; - display update is [1, 2, 3] for a same display_id, you'll see 1, then it becomes 2, then it becomes 3. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] KevinGG commented on a change in pull request #12372: [BEAM-10545] KernelModel and jest tests
KevinGG commented on a change in pull request #12372: URL: https://github.com/apache/beam/pull/12372#discussion_r463185961 ## File path: sdks/python/apache_beam/runners/interactive/extensions/apache-beam-jupyterlab-sidepanel/src/kernel/KernelModel.ts ## @@ -0,0 +1,161 @@ +// Licensed under the Apache License, Version 2.0 (the 'License'); you may not +// use this file except in compliance with the License. You may obtain a copy of +// the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an 'AS IS' BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations under +// the License. + +/** + * The module holds the model that handles messaging between the frontend and + * the connected kernel. + */ + +import { ISessionContext } from '@jupyterlab/apputils'; + +import { + IDisplayData, + IDisplayUpdate, + IExecuteResult +} from '@jupyterlab/nbformat'; + +import { Kernel, KernelMessage } from '@jupyterlab/services'; + +import { ISignal, Signal } from '@lumino/signaling'; + +import { KernelCode } from '../kernel/KernelCode'; + +export class KernelModel { + constructor(sessionContext: ISessionContext, enableConsoleLog = false) { +this._sessionContext = sessionContext; +this._enableConsoleLog = enableConsoleLog; + } + + get future(): Kernel.IFuture< +KernelMessage.IExecuteRequestMsg, +KernelMessage.IExecuteReplyMsg + > | null { +return this._future; + } + + set future( +value: Kernel.IFuture< + KernelMessage.IExecuteRequestMsg, + KernelMessage.IExecuteReplyMsg +> | null + ) { +if (this._future === value) { + return; +} + +if (this._future) { + this._future.dispose(); +} + +this._future = value; + +if (!value) { + return; +} + +value.onIOPub = this._onIOPub.bind(this); + } + + get executeResult(): object { +if (this._executeResult) { + const dataInPlainText = this._executeResult.data['text/plain'] as string; + if (dataInPlainText) { +try { + // The slice removes trailing single quotes from the nbformat output. + // The replace removes literal backslashes from the nbformat output. + const dataInJsonString = dataInPlainText +.slice(1, -1) +.replace(/\\'/g, "'"); + return JSON.parse(dataInJsonString); +} catch (e) { + console.error(e); + return {}; +} + } +} +return {}; + } + + get displayData(): Array { +return this._displayData; + } + + get displayUpdate(): Array { +return this._displayUpdate; + } + + get stateChanged(): ISignal { +return this._stateChanged; + } + + execute(code: string, expectReply = true): void { +// Dispose the kernel future so that no more IOPub will be handled. +if (this.future) { + this.future.dispose(); + this.future = null; +} +// Clear the outputs from previous kernel executions. +this._executeResult = null; +this._displayData.length = 0; +this._displayUpdate.length = 0; +if (!this._sessionContext || !this._sessionContext.session?.kernel) { + return; +} +this.future = this._sessionContext.session?.kernel?.requestExecute({ + code: KernelCode.COMMON_KERNEL_IMPORTS + code, + silent: !expectReply, Review comment: The problem with `silent` is that it's confusing with the existence of `store_history`. Intuitively, if `silent`, the kernel execution should not store history. However, it's not that case. `silent` only means if the execution should return a result. The history is stored. The side effects are obvious. So you will see execution_count increased even when you set `silent` to true. Facading it with `expectReply` makes it clear when using the model. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] KevinGG commented on a change in pull request #12372: [BEAM-10545] KernelModel and jest tests
KevinGG commented on a change in pull request #12372: URL: https://github.com/apache/beam/pull/12372#discussion_r463183798 ## File path: sdks/python/apache_beam/runners/interactive/extensions/apache-beam-jupyterlab-sidepanel/src/kernel/KernelModel.ts ## @@ -0,0 +1,161 @@ +// Licensed under the Apache License, Version 2.0 (the 'License'); you may not +// use this file except in compliance with the License. You may obtain a copy of +// the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an 'AS IS' BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations under +// the License. + +/** + * The module holds the model that handles messaging between the frontend and + * the connected kernel. + */ + +import { ISessionContext } from '@jupyterlab/apputils'; + +import { + IDisplayData, + IDisplayUpdate, + IExecuteResult +} from '@jupyterlab/nbformat'; + +import { Kernel, KernelMessage } from '@jupyterlab/services'; + +import { ISignal, Signal } from '@lumino/signaling'; + +import { KernelCode } from '../kernel/KernelCode'; + +export class KernelModel { + constructor(sessionContext: ISessionContext, enableConsoleLog = false) { +this._sessionContext = sessionContext; +this._enableConsoleLog = enableConsoleLog; + } + + get future(): Kernel.IFuture< +KernelMessage.IExecuteRequestMsg, +KernelMessage.IExecuteReplyMsg + > | null { +return this._future; + } + + set future( +value: Kernel.IFuture< + KernelMessage.IExecuteRequestMsg, + KernelMessage.IExecuteReplyMsg +> | null + ) { +if (this._future === value) { + return; +} + +if (this._future) { + this._future.dispose(); +} + +this._future = value; + +if (!value) { + return; +} + +value.onIOPub = this._onIOPub.bind(this); + } + + get executeResult(): object { +if (this._executeResult) { + const dataInPlainText = this._executeResult.data['text/plain'] as string; + if (dataInPlainText) { +try { + // The slice removes trailing single quotes from the nbformat output. + // The replace removes literal backslashes from the nbformat output. Review comment: It's removing characters in nbformat texts. Kernel generates json. When messaged to frontend by Jupyter in nbformat, the text becomes 'json_with_backslashes'. This removes the "'" from both ends of the string. Then removes all the backlashes. The result text will be a valid json that is parsable. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] davidcavazos commented on pull request #12118: [BEAM-7705] Add BigQuery Java samples
davidcavazos commented on pull request #12118: URL: https://github.com/apache/beam/pull/12118#issuecomment-666576290 All tests are passing, @pabloem can you take a look whenever you have a chance? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] lostluck commented on pull request #12425: [BEAM-10610][BEAM-9982] Support Loopback mode with universal runners in the Go SDK.
lostluck commented on pull request #12425: URL: https://github.com/apache/beam/pull/12425#issuecomment-666575819 R: @youngoli cc: @robertwb @damondouglas @codeBehindMe The others are cc'd as this has value for setting up Python Portable Runner usage, for local testing of changes against a correct runner. Set up usage of Beam Python per https://cwiki.apache.org/confluence/display/BEAM/Python+Tips, and in a virtualenv navigate to your repo's `beam/sdks/python`. You state the portable runner with `python -m apache_beam.runners.portability.local_job_service_main --port=8099` which allows connections from the Go pipeline at `--endpoint=localhost:8099` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] lostluck edited a comment on pull request #12425: [BEAM-10610][BEAM-9982] Support Loopback mode with universal runners in the Go SDK.
lostluck edited a comment on pull request #12425: URL: https://github.com/apache/beam/pull/12425#issuecomment-666575819 R: @youngoli cc: @robertwb @damondouglas @codeBehindMe The others are cc'd as this has value for setting up Python Portable Runner usage, for local testing of changes against a correct runner. Set up usage of Beam Python per https://cwiki.apache.org/confluence/display/BEAM/Python+Tips, and in a virtualenv navigate to your repo's `beam/sdks/python`. You starte the portable runner with `python -m apache_beam.runners.portability.local_job_service_main --port=8099` which allows connections from the Go pipeline at `--endpoint=localhost:8099` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] KevinGG commented on a change in pull request #12372: [BEAM-10545] KernelModel and jest tests
KevinGG commented on a change in pull request #12372: URL: https://github.com/apache/beam/pull/12372#discussion_r463182344 ## File path: sdks/python/apache_beam/runners/interactive/extensions/apache-beam-jupyterlab-sidepanel/src/__tests__/kernel/KernelModel.test.ts ## @@ -0,0 +1,124 @@ +// Licensed under the Apache License, Version 2.0 (the 'License'); you may not +// use this file except in compliance with the License. You may obtain a copy of +// the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an 'AS IS' BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations under +// the License. + +/** + * Tests for KernelModel module. + * + * Non camelcase fields are nbformat fields used in notebooks. Lint is ignored + * for them. + */ + +import { KernelModel } from '../../kernel/KernelModel'; + +const fakeSessionContext = { + session: { +kernel: { + requestExecute: function(): object { +return { + onIOPub: function(): void { +// do nothing + } +}; + } +} + } +}; + +it('creates new future with IOPub callbacks when executing new code in kernel', () => { + const kernelModel = new KernelModel(fakeSessionContext as any); + kernelModel.execute('new code'); + expect(kernelModel.future).not.toBe(null); + expect(kernelModel.future.onIOPub).not.toBe(null); +}); + +it('handles execute result from IOPub channel', () => { + const kernelModel = new KernelModel(fakeSessionContext as any); + kernelModel.execute('any code'); + kernelModel.future.onIOPub({ +header: { + // eslint-disable-next-line @typescript-eslint/camelcase + msg_type: 'execute_result' +}, +content: { + data: { +'text/plain': + '\'{"pipelineId": {"metadata": {"name": "pipeline", "inMemoryId": 1, "type": "pipeline"}, "pcolls": {"pcollId": {"name": "pcoll", "inMemoryId": 2, "type": "pcollection"\'' + }, + channel: 'iopub' +} + } as any); + expect(kernelModel.executeResult).toEqual({ +pipelineId: { + metadata: { +name: 'pipeline', +inMemoryId: 1, +type: 'pipeline' + }, + pcolls: { +pcollId: { + name: 'pcoll', + inMemoryId: 2, + type: 'pcollection' +} + } +} + }); +}); + +it('handles display data from IOPub channel', () => { + const kernelModel = new KernelModel(fakeSessionContext as any); + kernelModel.execute('any code'); + const displayData = { +// eslint-disable-next-line @typescript-eslint/camelcase +output_type: 'display_data', +data: { + 'text/html': '', + 'application/javascript': 'console.log(1);' Review comment: It's a valid Javascript statement that logs to the console. In the test, it is executed. The result shows up in the `terminal`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] kennknowles commented on pull request #12416: Update google-api-services versions.
kennknowles commented on pull request #12416: URL: https://github.com/apache/beam/pull/12416#issuecomment-666574292 But it also probably isn't a flake, since it is a pretty simple unit test. Most likely the assumptions of the mock were overconstrained so changes in the Pubsub client library can break it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org