[GitHub] [beam] George-Wu commented on a change in pull request #12331: [BEAM-10601] DICOM API Beam IO connector

2020-07-30 Thread GitBox


George-Wu commented on a change in pull request #12331:
URL: https://github.com/apache/beam/pull/12331#discussion_r463411782



##
File path: sdks/python/setup.py
##
@@ -128,6 +128,7 @@ def get_version():
   cythonize = lambda *args, **kwargs: []
 
 REQUIRED_PACKAGES = [
+'google-auth>=1.18.0,<=1.20.0',

Review comment:
   For some reason if I put it under GCP_REQUIREMENTS, there will still be 
'no mudule found
   errors caused by google.auth during the unittest. 
https://ci-beam.apache.org/job/beam_PreCommit_Python_Commit/14212/testReport/junit/(root)/(empty)/apache_beam_io_gcp_dicomio_test_7/

##
File path: sdks/python/setup.py
##
@@ -128,6 +128,7 @@ def get_version():
   cythonize = lambda *args, **kwargs: []
 
 REQUIRED_PACKAGES = [
+'google-auth>=1.18.0,<=1.20.0',

Review comment:
   For some reason if I put it under GCP_REQUIREMENTS, there will be 'no 
mudule found
   errors caused by google.auth during the unittest. 
https://ci-beam.apache.org/job/beam_PreCommit_Python_Commit/14212/testReport/junit/(root)/(empty)/apache_beam_io_gcp_dicomio_test_7/





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] George-Wu commented on a change in pull request #12331: [BEAM-10601] DICOM API Beam IO connector

2020-07-30 Thread GitBox


George-Wu commented on a change in pull request #12331:
URL: https://github.com/apache/beam/pull/12331#discussion_r463411782



##
File path: sdks/python/setup.py
##
@@ -128,6 +128,7 @@ def get_version():
   cythonize = lambda *args, **kwargs: []
 
 REQUIRED_PACKAGES = [
+'google-auth>=1.18.0,<=1.20.0',

Review comment:
   For some reason if I put it under GCP_REQUIREMENTS, there will be 
'ModuleNotFoundError' caused by google.auth during the unittest. 
https://ci-beam.apache.org/job/beam_PreCommit_Python_Commit/14212/testReport/junit/(root)/(empty)/apache_beam_io_gcp_dicomio_test_7/





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] lostluck commented on pull request #12429: Clean ExtractOutput of mean transform

2020-07-30 Thread GitBox


lostluck commented on pull request #12429:
URL: https://github.com/apache/beam/pull/12429#issuecomment-666901638


   Going to close it, largely because while in terms of Beam Semantics it's 
unlikely to trigger, not guarding a divide by zero seems fundamentally wrong. 
Something could change that enabler that clause to be triggered, and I'd rather 
not deal with an emergency cherry pick to deal with it.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] lostluck closed pull request #12429: Clean ExtractOutput of mean transform

2020-07-30 Thread GitBox


lostluck closed pull request #12429:
URL: https://github.com/apache/beam/pull/12429


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] kennknowles merged pull request #12387: Upgrade spotbugs from 3.1.12 to 4.0.6, spotbugs gradle plugin from 2.0.0 to 3.0.0

2020-07-30 Thread GitBox


kennknowles merged pull request #12387:
URL: https://github.com/apache/beam/pull/12387


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] lostluck merged pull request #12425: [BEAM-10610][BEAM-9982] Support Loopback mode with universal runners in the Go SDK.

2020-07-30 Thread GitBox


lostluck merged pull request #12425:
URL: https://github.com/apache/beam/pull/12425


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] George-Wu commented on a change in pull request #12331: [BEAM-10601] DICOM API Beam IO connector

2020-07-30 Thread GitBox


George-Wu commented on a change in pull request #12331:
URL: https://github.com/apache/beam/pull/12331#discussion_r463383155



##
File path: sdks/python/apache_beam/io/gcp/dicomio.py
##
@@ -164,70 +167,109 @@ class DicomSearch(PTransform):
 }
 
   """
-  def __init__(self, credential=None):
+  def __init__(self, buffer_size=8, max_workers=5, credential=None):
 """Initializes DicomSearch.
 Args:
   credential: # type: Google credential object, if it is specified, the
   Http client will use it to create sessions instead of the default.
 """
+self.buffer_size = buffer_size
+self.max_workers = max_workers

Review comment:
   Added, but for some objects reference issues in the unit test it has to 
be added in an ugly way in the WriteToDicomStore connector...





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] George-Wu commented on a change in pull request #12331: [BEAM-10601] DICOM API Beam IO connector

2020-07-30 Thread GitBox


George-Wu commented on a change in pull request #12331:
URL: https://github.com/apache/beam/pull/12331#discussion_r463382756



##
File path: sdks/python/apache_beam/io/gcp/dicomio.py
##
@@ -372,52 +420,63 @@ def __init__(self, destination_dict, input_type, 
credential=None):
   credential: # type: Google credential object, if it is specified, the
   Http client will use it instead of the default one.
 """
-self.credential = credential
 self.destination_dict = destination_dict
 # input_type pre-check
 if input_type not in ['bytes', 'fileio']:
   raise ValueError("input_type could only be 'bytes' or 'fileio'")
 self.input_type = input_type
+self.buffer_size = buffer_size
+self.max_workers = max_workers
+self.credential = credential
 
   def expand(self, pcoll):
 return pcoll | beam.ParDo(
-_StoreInstance(self.destination_dict, self.input_type, 
self.credential))
+_StoreInstance(
+self.destination_dict,
+self.input_type,
+self.buffer_size,
+self.max_workers,
+self.credential))
 
 
 class _StoreInstance(beam.DoFn):
   """A DoFn read or fetch dicom files then push it to a dicom store."""
-  def __init__(self, destination_dict, input_type, credential=None):
-self.credential = credential
+  def __init__(
+  self,
+  destination_dict,
+  input_type,
+  buffer_size,
+  max_workers,
+  credential=None):

Review comment:
   Add custom client support

##
File path: sdks/python/apache_beam/io/gcp/dicomio.py
##
@@ -164,70 +167,109 @@ class DicomSearch(PTransform):
 }
 
   """
-  def __init__(self, credential=None):
+  def __init__(self, buffer_size=8, max_workers=5, credential=None):
 """Initializes DicomSearch.
 Args:
   credential: # type: Google credential object, if it is specified, the
   Http client will use it to create sessions instead of the default.
 """
+self.buffer_size = buffer_size
+self.max_workers = max_workers
 self.credential = credential
 
   def expand(self, pcoll):
-return pcoll | beam.ParDo(_QidoSource(self.credential))
+return pcoll | beam.ParDo(
+_QidoSource(self.buffer_size, self.max_workers, self.credential))
 
 
 class _QidoSource(beam.DoFn):
   """A DoFn for executing every qido query request."""
-  def __init__(self, credential=None):
+  def __init__(self, buffer_size, max_workers, credential=None):
+self.buffer_size = buffer_size
+self.max_workers = max_workers
 self.credential = credential
 
-  def process(self, element):
+  def start_bundle(self):
+self.buffer = []
+
+  def finish_bundle(self):
+return self._flush()
+
+  def validate_element(self, element):
 # Check if all required keys present.
 required_keys = [
 'project_id', 'region', 'dataset_id', 'dicom_store_id', 'search_type'
 ]
 
-error_message = None
-
 for key in required_keys:
   if key not in element:
 error_message = 'Must have %s in the dict.' % (key)
-break
-
-if not error_message:
-  project_id = element['project_id']
-  region = element['region']
-  dataset_id = element['dataset_id']
-  dicom_store_id = element['dicom_store_id']
-  search_type = element['search_type']
-  params = element['params'] if 'params' in element else None
-
-  # Call qido search http client
-  if element['search_type'] in ['instances', "studies", "series"]:
-result, status_code = DicomApiHttpClient().qido_search(
-  project_id, region, dataset_id, dicom_store_id,
-  search_type, params, self.credential
-)
-  else:
-error_message = (
-'Search type can only be "studies", '
-'"instances" or "series"')
-
-  if not error_message:
-out = {}
-out['result'] = result
-out['status'] = status_code
-out['input'] = element
-out['success'] = (status_code == 200)
-return [out]
-
-# Return this when the input dict dose not meet the requirements
+return False, error_message
+
+# Check if return type is correct.
+if element['search_type'] in ['instances', "studies", "series"]:
+  return True, None
+else:
+  error_message = (
+  'Search type can only be "studies", '
+  '"instances" or "series"')
+  return False, error_message
+
+  def process(self, element, window=beam.DoFn.WindowParam):
+# Check if the element is valid
+valid, error_message = self.validate_element(element)
+
+if valid:
+  self.buffer.append((element, window))
+  if len(self.buffer) >= self.buffer_size:
+self._flush()

Review comment:
   Fixed and test added

##
File path: sdks/python/apache_beam/io/gcp/dicomio.py
##
@@ -164,70 +167,109 @@ class DicomSearch(PTransform):
 }
 
   """
-  def __init__(self, credential=None):
+  def 

[GitHub] [beam] George-Wu commented on a change in pull request #12331: [BEAM-10601] DICOM API Beam IO connector

2020-07-30 Thread GitBox


George-Wu commented on a change in pull request #12331:
URL: https://github.com/apache/beam/pull/12331#discussion_r463382633



##
File path: sdks/python/apache_beam/io/gcp/dicomio.py
##
@@ -372,52 +420,63 @@ def __init__(self, destination_dict, input_type, 
credential=None):
   credential: # type: Google credential object, if it is specified, the
   Http client will use it instead of the default one.
 """
-self.credential = credential
 self.destination_dict = destination_dict
 # input_type pre-check
 if input_type not in ['bytes', 'fileio']:
   raise ValueError("input_type could only be 'bytes' or 'fileio'")
 self.input_type = input_type
+self.buffer_size = buffer_size
+self.max_workers = max_workers
+self.credential = credential
 
   def expand(self, pcoll):
 return pcoll | beam.ParDo(
-_StoreInstance(self.destination_dict, self.input_type, 
self.credential))
+_StoreInstance(
+self.destination_dict,
+self.input_type,
+self.buffer_size,
+self.max_workers,
+self.credential))
 
 
 class _StoreInstance(beam.DoFn):
   """A DoFn read or fetch dicom files then push it to a dicom store."""
-  def __init__(self, destination_dict, input_type, credential=None):
-self.credential = credential
+  def __init__(
+  self,
+  destination_dict,
+  input_type,
+  buffer_size,
+  max_workers,
+  credential=None):
 # pre-check destination dict
 required_keys = ['project_id', 'region', 'dataset_id', 'dicom_store_id']
 for key in required_keys:
   if key not in destination_dict:
 raise ValueError('Must have %s in the dict.' % (key))
 self.destination_dict = destination_dict
 self.input_type = input_type
+self.buffer_size = buffer_size
+self.max_workers = max_workers
+self.credential = credential
 
-  def process(self, element):
+  def start_bundle(self):
+self.buffer = []
+
+  def finish_bundle(self):
+return self._flush()
+
+  def process(self, element, window=beam.DoFn.WindowParam):
+self.buffer.append((element, window))
+if len(self.buffer) >= self.buffer_size:
+  self._flush()

Review comment:
   Good catch, fixed and tests added!





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] George-Wu commented on a change in pull request #12331: [BEAM-10601] DICOM API Beam IO connector

2020-07-30 Thread GitBox


George-Wu commented on a change in pull request #12331:
URL: https://github.com/apache/beam/pull/12331#discussion_r463382681



##
File path: sdks/python/apache_beam/io/gcp/dicomio.py
##
@@ -426,6 +485,44 @@ def process(self, element):
 
 out = {}
 out['status'] = status_code
-out['input'] = None if status_code == 200 else element
+out['input'] = None if status_code == 200 else dicom_file
 out['success'] = (status_code == 200)
-return [out]
+return out
+
+  def read_dicom_file(self, buffer_element):
+# Read the file based on different input. If the read fails ,return
+# an error dict which records input and error messages.
+try:
+  if self.input_type == 'fileio':
+f = buffer_element.open()
+return True, f.read()
+  else:
+return True, buffer_element
+except Exception as error_message:
+  error_out = {}
+  error_out['status'] = error_message
+  error_out['input'] = buffer_element
+  error_out['success'] = False
+  return False, error_out
+
+  def process_buffer_element(self, buffer_element):
+# Thread job runner - each thread stores a DICOM file
+success, read_result = self.read_dicom_file(buffer_element[0])
+windows = [buffer_element[1]]
+value = None
+if success:
+  value = self.make_request(read_result)
+else:
+  value = read_result
+return beam.utils.windowed_value.WindowedValue(
+value=value, timestamp=0, windows=windows)

Review comment:
   Added timestamps





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] George-Wu commented on a change in pull request #12331: [BEAM-10601] DICOM API Beam IO connector

2020-07-30 Thread GitBox


George-Wu commented on a change in pull request #12331:
URL: https://github.com/apache/beam/pull/12331#discussion_r463382501



##
File path: sdks/python/apache_beam/io/gcp/dicomio.py
##
@@ -114,55 +115,60 @@
 
 class DicomSearch(PTransform):
   """A PTransform used for retrieving DICOM instance metadata from Google
-Cloud DICOM store. It takes a Pcollection of dicts as input and return
-a Pcollection of dict as results:
+Cloud DICOM store. It takes a PCollection of dicts as input and return
+a PCollection of dict as results:
 INPUT:
 The input dict represents DICOM web path parameters, which has the 
following
 string keys and values:
 {
-  'project_id': str,
-  'region': str,
-  'dataset_id': str,
-  'dicom_store_id': str,
-  'search_type': str,
-  'params': dict(str,str) (Optional),
+'project_id': str,
+'region': str,
+'dataset_id': str,
+'dicom_store_id': str,
+'search_type': str,
+'params': dict(str,str) (Optional),
 }
+
 Key-value pairs:
-  project_id: Id of the project in which DICOM store locates. (Required)
+  project_id: Id of the project in which the DICOM store is
+  located. (Required)
   region: Region where the DICOM store resides. (Required)
   dataset_id: Id of the dataset where DICOM store belongs to. (Required)
   dicom_store_id: Id of the dicom store. (Required)
   search_type: Which type of search it is, could only be one of the three
-values: 'instances', 'series', or 'studies'. (Required)
+  values: 'instances', 'series', or 'studies'. (Required)
   params: A dict of str:str pairs used to refine QIDO search. (Optional)
-Supported tags in three categories:
-  1. Studies:
-StudyInstanceUID
-PatientName
-PatientID
-AccessionNumber
-ReferringPhysicianName
-StudyDate
-  2. Series: all study level search terms and
-SeriesInstanceUID
-Modality
-  3. Instances: all study/series level search terms and
-SOPInstanceUID
-e.g. {"StudyInstanceUID":"1","SeriesInstanceUID":"2"}
+  Supported tags in three categories:
+  1.Studies:
+  StudyInstanceUID,
+  PatientName,
+  PatientID,
+  AccessionNumber,
+  ReferringPhysicianName,
+  StudyDate,
+  2.Series: all study level search terms and
+  SeriesInstanceUID,
+  Modality,
+  3.Instances: all study/series level search terms and
+  SOPInstanceUID,

Review comment:
   Fixed

##
File path: sdks/python/apache_beam/io/gcp/dicomio.py
##
@@ -324,41 +332,45 @@ def process(self, element):
 return [out]
 
 
-class DicomStoreInstance(PTransform):
+class WriteToDicomStore(PTransform):
   """A PTransform for storing instances to a DICOM store.
-Takes Pcollection of byte[] as input and return a Pcollection of dict as
+Takes PCollection of byte[] as input and return a PCollection of dict as
 results. The inputs are normally DICOM file in bytes or str filename.
 INPUT:
-  This PTransform supports two types of input:
-1. Byte[]: representing dicom file.
-2. Fileio object: stream file object.
+This PTransform supports two types of input:
+1. Byte[]: representing dicom file.
+2. Fileio object: stream file object.
+
 OUTPUT:
 The output dict encodes status as well as error messages:
 {
-  'success': boolean value telling whether the store is successful
-  'input': undeliverable data. Exactly the same as the input,
-only set if the operation is failed.
-  'status': status code from the server, used as error messages.
+'success': boolean value telling whether the store is successful.
+'input': undeliverable data. Exactly the same as the input,
+only set if the operation is failed.
+'status': status code from the server, used as error messages.
 }
+
   """
   def __init__(self, destination_dict, input_type, credential=None):
-"""Initializes DicomStoreInstance.
+"""Initializes WriteToDicomStore.
 Args:
   destination_dict: # type: python dict, encodes DICOM endpoint 
information:
-{
-  'project_id': str,
-  'region': str,
-  'dataset_id': str,
-  'dicom_store_id': str,
-}
-Key-value pairs:
-  project_id: Id of the project in which DICOM store locates. 
(Required)
-  region: Region where the DICOM store resides. (Required)
-  dataset_id: Id of the dataset where DICOM store belongs to. 
(Required)
-  dicom_store_id: Id of the dicom store. (Required)
+  {
+  'project_id': str,
+  'region': str,
+  'dataset_id': str,
+  'dicom_store_id': str,
+  }
+
+  Key-value pairs:
+  project_id: Id of the project in which DICOM store locates. (Required)
+  region: Region where the 

[GitHub] [beam] ZijieSong946 commented on a change in pull request #12348: [BEAM-10240] Support ZetaSQL DATETIME functions in BeamSQL

2020-07-30 Thread GitBox


ZijieSong946 commented on a change in pull request #12348:
URL: https://github.com/apache/beam/pull/12348#discussion_r463375064



##
File path: 
sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlBeamTranslationUtils.java
##
@@ -184,6 +188,19 @@ private static Value 
beamLogicalObjectToZetaSqlValue(Object object, String ident
   } else { // input type
 return 
Value.createTimeValue(CivilTimeEncoder.encodePacked64TimeNanos((LocalTime) 
object));
   }
+} else if (SqlTypes.DATETIME.getIdentifier().equals(identifier)) {
+  // DateTime value
+  LocalDateTime datetime;
+  if (object instanceof Row) { // base type
+datetime =
+LocalDateTime.of(
+LocalDate.ofEpochDay(((Row) object).getValue("Date")),
+LocalTime.ofNanoOfDay(((Row) object).getValue("Time")));
+  } else { // input type
+datetime = (LocalDateTime) object;
+  }
+  return Value.createDatetimeValue(

Review comment:
   Ack.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] lostluck commented on a change in pull request #12350: [BEAM-10289] Dynamic splitting implementation.

2020-07-30 Thread GitBox


lostluck commented on a change in pull request #12350:
URL: https://github.com/apache/beam/pull/12350#discussion_r462445798



##
File path: sdks/go/pkg/beam/core/runtime/exec/datasource.go
##
@@ -302,31 +308,76 @@ func (n *DataSource) Split(splits []int64, frac float64, 
bufSize int64) (int64,
}
 
n.mu.Lock()
+   defer n.mu.Unlock()
+
var currProg float64 // Current element progress.
-   if n.index < 0 { // Progress is at the end of the non-existant -1st 
element.
+   var su SplittableUnit
+   if n.index < 0 { // Progress is at the end of the non-existant -1st 
element.
currProg = 1.0
-   } else if n.rt == nil { // If this isn't sub-element splittable, 
estimate some progress.
+   } else if n.su == nil { // If this isn't sub-element splittable, 
estimate some progress.
currProg = 0.5
} else { // If this is sub-element splittable, get progress of the 
current element.
-   rt := <-n.rt
-   d, r := rt.GetProgress()
-   currProg = d / (d + r)
-   n.rt <- rt
+   // If splittable, hold this tracker for the rest of the 
function so the element
+   // doesn't finish processing during a split.
+   su = <-n.su
+   if su == nil {
+   return SplitResult{}, fmt.Errorf("failed to split: 
splittable unit was nil")
+   }
+   defer func() {
+   n.su <- su
+   }()
+   currProg = su.GetProgress()
}
// Size to split within is the minimum of bufSize or splitIdx so we 
avoid
// including elements we already know won't be processed.
if bufSize <= 0 || n.splitIdx < bufSize {
bufSize = n.splitIdx
}
-   s, _, err := splitHelper(n.index, bufSize, currProg, splits, frac, 
false)
+   s, f, err := splitHelper(n.index, bufSize, currProg, splits, frac, su 
!= nil)
if err != nil {
-   n.mu.Unlock()
-   return 0, err
+   return SplitResult{}, err
+   }
+
+   // No fraction returned, perform channel split.
+   if f < 0 {
+   n.splitIdx = s
+   return SplitResult{PI: s - 1, RI: s}, nil
+   }
+   // Otherwise, perform a sub-element split.
+   fr := f / (1.0 - currProg)
+   p, r, err := su.Split(fr)
+   if err != nil {
+   return SplitResult{}, err
+   }
+
+   if p != nil && r != nil { // Successful split.

Review comment:
   Consider that reversal technique I mentioned, so the short case is 
what's indented and returns early, and the long case is unindented.
   ```
   if p == nil || r == nil  {
   // Fallback to channel split, so split at next elm, not current.
n.splitIdx = s + 1
return SplitResult{PI: s, RI: s + 1}, nil
   }  // no need for an else.
   // .. the original contents of the if block ...
   ```

##
File path: sdks/go/pkg/beam/core/runtime/exec/datasource_test.go
##
@@ -412,17 +423,120 @@ func TestDataSource_Split(t *testing.T) {
 
// SDK never splits on 0, so check that every test.
sp := SplitPoints{Splits: test.splitPts, Frac: test.frac, 
BufSize: test.bufSize}
-   if splitIdx, err := p.Split(sp); err != nil {
+   if splitRes, err := p.Split(sp); err != nil {
t.Fatalf("error in Split: %v", err)
-   } else if got, want := splitIdx, test.splitIdx; got != want {
-   t.Fatalf("error in Split: got splitIdx = %v, want %v ", 
got, want)
+   } else {

Review comment:
   And here.

##
File path: sdks/go/pkg/beam/core/runtime/exec/sdf.go
##
@@ -215,14 +215,25 @@ func (n *SplitAndSizeRestrictions) String() string {
 // changes to support the SDF's method signatures and the expected structure
 // of the FullValue being received.
 type ProcessSizedElementsAndRestrictions struct {
-   PDo *ParDo
-
-   inv *ctInvoker
-
-   // Rt allows this unit to send out restriction trackers being processed.
-   // Receivers of the tracker do not own it, and must send it back 
through the
-   // same channel once finished with it.
-   Rt chan sdf.RTracker
+   PDo *ParDo
+   TfIdstring // Transform ID. Needed for splitting.
+   ctInv   *ctInvoker
+   sizeInv *rsInvoker
+
+   // SU is a buffered channel for indicating when this unit is splittable.
+   // When this unit is processing an element, it sends a SplittableUnit
+   // interface through the channel. That interface can be received on 
other
+   // threads and used to perform splitting or other related operation.
+   //
+   // Receiving the SplittableUnit prevents the current element from 
finishing
+   // processing, so he element does not unexpectedly 

[GitHub] [beam] pabloem commented on pull request #12203: [BEAM-6928] Make Python SDK custom Sink the default Sink for BigQuery

2020-07-30 Thread GitBox


pabloem commented on pull request #12203:
URL: https://github.com/apache/beam/pull/12203#issuecomment-666858821


   Run Python2_PVR_Flink PreCommit



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] pabloem commented on pull request #12203: [BEAM-6928] Make Python SDK custom Sink the default Sink for BigQuery

2020-07-30 Thread GitBox


pabloem commented on pull request #12203:
URL: https://github.com/apache/beam/pull/12203#issuecomment-666851408


   only PullLicenses is failing @chamikaramj PTAL



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] ibzib commented on pull request #12385: [BEAM-10527] Migrate Flink and Spark tests to pytest.

2020-07-30 Thread GitBox


ibzib commented on pull request #12385:
URL: https://github.com/apache/beam/pull/12385#issuecomment-666850930


   > I have already spent a long time trying to fix quotes, so I can't help but 
wondering: why do we need flinkCompatibilityMatrixPROCESS in the first place, 
when it is not being run anywhere? If it's important, shouldn't we add it to 
some postcommit?
   
   Another solution I had in mind was reworking the `--environment_config` 
option. JSON blobs are unwieldy, and overloading the `--environment_config` 
option is confusing to the user. We could make each field in the PROCESS 
`--environment_config` blob its own argument, and then reject these arguments 
when `environment_type != PROCESS`.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] pabloem merged pull request #12082: Standardizing BigQuery job names in Beam Python and Java SDKs

2020-07-30 Thread GitBox


pabloem merged pull request #12082:
URL: https://github.com/apache/beam/pull/12082


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] ibzib commented on pull request #12385: [BEAM-10527] Migrate Flink and Spark tests to pytest.

2020-07-30 Thread GitBox


ibzib commented on pull request #12385:
URL: https://github.com/apache/beam/pull/12385#issuecomment-666849394


   I got `flinkCompatibilityMatrixPROCESS` to pass on my machine by escaping 
the arguments via `${1@Q}`. Apparently whatever shell Jenkins is using does not 
support this. I will have to find a better solution.
   
   I have already spent a long time trying to fix quotes, so I can't help but 
wondering: why do we need `flinkCompatibilityMatrixPROCESS` in the first place, 
when it is not being run anywhere? If it's important, shouldn't we add it to 
some postcommit?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] ibzib commented on a change in pull request #12385: [BEAM-10527] Migrate Flink and Spark tests to pytest.

2020-07-30 Thread GitBox


ibzib commented on a change in pull request #12385:
URL: https://github.com/apache/beam/pull/12385#discussion_r463348975



##
File path: sdks/python/apache_beam/runners/portability/flink_runner_test.py
##
@@ -53,361 +53,380 @@
 from apache_beam.transforms import userstate
 from apache_beam.transforms.sql import SqlTransform
 
+# Run as
+#
+# pytest flink_runner_test.py \
+# [--test_pipeline_options "--flink_job_server_jar=/path/to/job_server.jar 
\
+#   --environment_type=DOCKER"] \
+# [FlinkRunnerTest.test_method, ...]
+
 _LOGGER = logging.getLogger(__name__)
 
 Row = typing.NamedTuple("Row", [("col1", int), ("col2", unicode)])
 beam.coders.registry.register_coder(Row, beam.coders.RowCoder)
 
-if __name__ == '__main__':
-  # Run as
-  #
-  # python -m apache_beam.runners.portability.flink_runner_test \
-  # --flink_job_server_jar=/path/to/job_server.jar \
-  # --environment_type=docker \
-  # --extra_experiments=beam_experiments \
-  # [FlinkRunnerTest.test_method, ...]
-
-  parser = argparse.ArgumentParser(add_help=True)
-  parser.add_argument(
-  '--flink_job_server_jar', help='Job server jar to submit jobs.')
-  parser.add_argument(
-  '--streaming',
-  default=False,
-  action='store_true',
-  help='Job type. batch or streaming')
-  parser.add_argument(
-  '--environment_type',
-  default='loopback',
-  help='Environment type. docker, process, or loopback.')
-  parser.add_argument('--environment_config', help='Environment config.')
-  parser.add_argument(
-  '--extra_experiments',
-  default=[],
-  action='append',
-  help='Beam experiments config.')
-  known_args, args = parser.parse_known_args(sys.argv)
-  sys.argv = args
-
-  flink_job_server_jar = (
-  known_args.flink_job_server_jar or
-  job_server.JavaJarJobServer.path_to_beam_jar(
-  'runners:flink:%s:job-server:shadowJar' %
-  FlinkRunnerOptions.PUBLISHED_FLINK_VERSIONS[-1]))
-  streaming = known_args.streaming
-  environment_type = known_args.environment_type.lower()
-  environment_config = (
-  known_args.environment_config if known_args.environment_config else None)
-  extra_experiments = known_args.extra_experiments
-
-  # This is defined here to only be run when we invoke this file explicitly.
-  class FlinkRunnerTest(portable_runner_test.PortableRunnerTest):
-_use_grpc = True
-_use_subprocesses = True
-
-conf_dir = None
-expansion_port = None
-
-@classmethod
-def tearDownClass(cls):
-  if cls.conf_dir and exists(cls.conf_dir):
-_LOGGER.info("removing conf dir: %s" % cls.conf_dir)
-rmtree(cls.conf_dir)
-  super(FlinkRunnerTest, cls).tearDownClass()
-
-@classmethod
-def _create_conf_dir(cls):
-  """Create (and save a static reference to) a "conf dir", used to provide
-   metrics configs and verify metrics output
-
-   It gets cleaned up when the suite is done executing"""
-
-  if hasattr(cls, 'conf_dir'):
-cls.conf_dir = mkdtemp(prefix='flinktest-conf')
-
-# path for a FileReporter to write metrics to
-cls.test_metrics_path = path.join(cls.conf_dir, 'test-metrics.txt')
-
-# path to write Flink configuration to
-conf_path = path.join(cls.conf_dir, 'flink-conf.yaml')
-file_reporter = 'org.apache.beam.runners.flink.metrics.FileReporter'
-with open(conf_path, 'w') as f:
-  f.write(
-  linesep.join([
-  'metrics.reporters: file',
-  'metrics.reporter.file.class: %s' % file_reporter,
-  'metrics.reporter.file.path: %s' % cls.test_metrics_path,
-  'metrics.scope.operator: ',
-  ]))
-
-@classmethod
-def _subprocess_command(cls, job_port, expansion_port):
-  # will be cleaned up at the end of this method, and recreated and used by
-  # the job server
-  tmp_dir = mkdtemp(prefix='flinktest')
-
-  cls._create_conf_dir()
-  cls.expansion_port = expansion_port
-
-  try:
-return [
-'java',
-'-Dorg.slf4j.simpleLogger.defaultLogLevel=warn',
-'-jar',
-flink_job_server_jar,
-'--flink-master',
-'[local]',
-'--flink-conf-dir',
-cls.conf_dir,
-'--artifacts-dir',
-tmp_dir,
-'--job-port',
-str(job_port),
-'--artifact-port',
-'0',
-'--expansion-port',
-str(expansion_port),
-]
-  finally:
-rmtree(tmp_dir)
-
-@classmethod
-def get_runner(cls):
-  return portable_runner.PortableRunner()
-
-@classmethod
-def get_expansion_service(cls):
-  # TODO Move expansion address resides into PipelineOptions
-  return 'localhost:%s' % cls.expansion_port
-
-def create_options(self):
-  options = super(FlinkRunnerTest, 

[GitHub] [beam] ibzib commented on a change in pull request #12385: [BEAM-10527] Migrate Flink and Spark tests to pytest.

2020-07-30 Thread GitBox


ibzib commented on a change in pull request #12385:
URL: https://github.com/apache/beam/pull/12385#discussion_r463348975



##
File path: sdks/python/apache_beam/runners/portability/flink_runner_test.py
##
@@ -53,361 +53,380 @@
 from apache_beam.transforms import userstate
 from apache_beam.transforms.sql import SqlTransform
 
+# Run as
+#
+# pytest flink_runner_test.py \
+# [--test_pipeline_options "--flink_job_server_jar=/path/to/job_server.jar 
\
+#   --environment_type=DOCKER"] \
+# [FlinkRunnerTest.test_method, ...]
+
 _LOGGER = logging.getLogger(__name__)
 
 Row = typing.NamedTuple("Row", [("col1", int), ("col2", unicode)])
 beam.coders.registry.register_coder(Row, beam.coders.RowCoder)
 
-if __name__ == '__main__':
-  # Run as
-  #
-  # python -m apache_beam.runners.portability.flink_runner_test \
-  # --flink_job_server_jar=/path/to/job_server.jar \
-  # --environment_type=docker \
-  # --extra_experiments=beam_experiments \
-  # [FlinkRunnerTest.test_method, ...]
-
-  parser = argparse.ArgumentParser(add_help=True)
-  parser.add_argument(
-  '--flink_job_server_jar', help='Job server jar to submit jobs.')
-  parser.add_argument(
-  '--streaming',
-  default=False,
-  action='store_true',
-  help='Job type. batch or streaming')
-  parser.add_argument(
-  '--environment_type',
-  default='loopback',
-  help='Environment type. docker, process, or loopback.')
-  parser.add_argument('--environment_config', help='Environment config.')
-  parser.add_argument(
-  '--extra_experiments',
-  default=[],
-  action='append',
-  help='Beam experiments config.')
-  known_args, args = parser.parse_known_args(sys.argv)
-  sys.argv = args
-
-  flink_job_server_jar = (
-  known_args.flink_job_server_jar or
-  job_server.JavaJarJobServer.path_to_beam_jar(
-  'runners:flink:%s:job-server:shadowJar' %
-  FlinkRunnerOptions.PUBLISHED_FLINK_VERSIONS[-1]))
-  streaming = known_args.streaming
-  environment_type = known_args.environment_type.lower()
-  environment_config = (
-  known_args.environment_config if known_args.environment_config else None)
-  extra_experiments = known_args.extra_experiments
-
-  # This is defined here to only be run when we invoke this file explicitly.
-  class FlinkRunnerTest(portable_runner_test.PortableRunnerTest):
-_use_grpc = True
-_use_subprocesses = True
-
-conf_dir = None
-expansion_port = None
-
-@classmethod
-def tearDownClass(cls):
-  if cls.conf_dir and exists(cls.conf_dir):
-_LOGGER.info("removing conf dir: %s" % cls.conf_dir)
-rmtree(cls.conf_dir)
-  super(FlinkRunnerTest, cls).tearDownClass()
-
-@classmethod
-def _create_conf_dir(cls):
-  """Create (and save a static reference to) a "conf dir", used to provide
-   metrics configs and verify metrics output
-
-   It gets cleaned up when the suite is done executing"""
-
-  if hasattr(cls, 'conf_dir'):
-cls.conf_dir = mkdtemp(prefix='flinktest-conf')
-
-# path for a FileReporter to write metrics to
-cls.test_metrics_path = path.join(cls.conf_dir, 'test-metrics.txt')
-
-# path to write Flink configuration to
-conf_path = path.join(cls.conf_dir, 'flink-conf.yaml')
-file_reporter = 'org.apache.beam.runners.flink.metrics.FileReporter'
-with open(conf_path, 'w') as f:
-  f.write(
-  linesep.join([
-  'metrics.reporters: file',
-  'metrics.reporter.file.class: %s' % file_reporter,
-  'metrics.reporter.file.path: %s' % cls.test_metrics_path,
-  'metrics.scope.operator: ',
-  ]))
-
-@classmethod
-def _subprocess_command(cls, job_port, expansion_port):
-  # will be cleaned up at the end of this method, and recreated and used by
-  # the job server
-  tmp_dir = mkdtemp(prefix='flinktest')
-
-  cls._create_conf_dir()
-  cls.expansion_port = expansion_port
-
-  try:
-return [
-'java',
-'-Dorg.slf4j.simpleLogger.defaultLogLevel=warn',
-'-jar',
-flink_job_server_jar,
-'--flink-master',
-'[local]',
-'--flink-conf-dir',
-cls.conf_dir,
-'--artifacts-dir',
-tmp_dir,
-'--job-port',
-str(job_port),
-'--artifact-port',
-'0',
-'--expansion-port',
-str(expansion_port),
-]
-  finally:
-rmtree(tmp_dir)
-
-@classmethod
-def get_runner(cls):
-  return portable_runner.PortableRunner()
-
-@classmethod
-def get_expansion_service(cls):
-  # TODO Move expansion address resides into PipelineOptions
-  return 'localhost:%s' % cls.expansion_port
-
-def create_options(self):
-  options = super(FlinkRunnerTest, 

[GitHub] [beam] ibzib commented on pull request #12385: [BEAM-10527] Migrate Flink and Spark tests to pytest.

2020-07-30 Thread GitBox


ibzib commented on pull request #12385:
URL: https://github.com/apache/beam/pull/12385#issuecomment-666846231


   > I couldn't find the published test results in Jenkins. Do we have to add 
this separately? 
https://ci-beam.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Commit/6126/
   
   This is because publishing the test results relies on a change to the 
Jenkins job config, and I didn't want to `run seed job` because Flink PVR is a 
precommit and if I made a mistake, `run seed job` could break it for others. I 
tried testing locally on dockerized Jenkins, but I couldn't get it to work.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] ibzib commented on a change in pull request #12385: [BEAM-10527] Migrate Flink and Spark tests to pytest.

2020-07-30 Thread GitBox


ibzib commented on a change in pull request #12385:
URL: https://github.com/apache/beam/pull/12385#discussion_r463346402



##
File path: sdks/python/apache_beam/runners/portability/flink_runner_test.py
##
@@ -53,361 +53,380 @@
 from apache_beam.transforms import userstate
 from apache_beam.transforms.sql import SqlTransform
 
+# Run as
+#
+# pytest flink_runner_test.py \
+# [--test_pipeline_options "--flink_job_server_jar=/path/to/job_server.jar 
\
+#   --environment_type=DOCKER"] \
+# [FlinkRunnerTest.test_method, ...]
+
 _LOGGER = logging.getLogger(__name__)
 
 Row = typing.NamedTuple("Row", [("col1", int), ("col2", unicode)])
 beam.coders.registry.register_coder(Row, beam.coders.RowCoder)
 
-if __name__ == '__main__':
-  # Run as
-  #
-  # python -m apache_beam.runners.portability.flink_runner_test \
-  # --flink_job_server_jar=/path/to/job_server.jar \
-  # --environment_type=docker \
-  # --extra_experiments=beam_experiments \
-  # [FlinkRunnerTest.test_method, ...]
-
-  parser = argparse.ArgumentParser(add_help=True)
-  parser.add_argument(
-  '--flink_job_server_jar', help='Job server jar to submit jobs.')
-  parser.add_argument(
-  '--streaming',
-  default=False,
-  action='store_true',
-  help='Job type. batch or streaming')
-  parser.add_argument(
-  '--environment_type',
-  default='loopback',
-  help='Environment type. docker, process, or loopback.')
-  parser.add_argument('--environment_config', help='Environment config.')
-  parser.add_argument(
-  '--extra_experiments',
-  default=[],
-  action='append',
-  help='Beam experiments config.')
-  known_args, args = parser.parse_known_args(sys.argv)
-  sys.argv = args
-
-  flink_job_server_jar = (
-  known_args.flink_job_server_jar or
-  job_server.JavaJarJobServer.path_to_beam_jar(
-  'runners:flink:%s:job-server:shadowJar' %
-  FlinkRunnerOptions.PUBLISHED_FLINK_VERSIONS[-1]))
-  streaming = known_args.streaming
-  environment_type = known_args.environment_type.lower()
-  environment_config = (
-  known_args.environment_config if known_args.environment_config else None)
-  extra_experiments = known_args.extra_experiments
-
-  # This is defined here to only be run when we invoke this file explicitly.
-  class FlinkRunnerTest(portable_runner_test.PortableRunnerTest):
-_use_grpc = True
-_use_subprocesses = True
-
-conf_dir = None
-expansion_port = None
-
-@classmethod
-def tearDownClass(cls):
-  if cls.conf_dir and exists(cls.conf_dir):
-_LOGGER.info("removing conf dir: %s" % cls.conf_dir)
-rmtree(cls.conf_dir)
-  super(FlinkRunnerTest, cls).tearDownClass()
-
-@classmethod
-def _create_conf_dir(cls):
-  """Create (and save a static reference to) a "conf dir", used to provide
-   metrics configs and verify metrics output
-
-   It gets cleaned up when the suite is done executing"""
-
-  if hasattr(cls, 'conf_dir'):
-cls.conf_dir = mkdtemp(prefix='flinktest-conf')
-
-# path for a FileReporter to write metrics to
-cls.test_metrics_path = path.join(cls.conf_dir, 'test-metrics.txt')
-
-# path to write Flink configuration to
-conf_path = path.join(cls.conf_dir, 'flink-conf.yaml')
-file_reporter = 'org.apache.beam.runners.flink.metrics.FileReporter'
-with open(conf_path, 'w') as f:
-  f.write(
-  linesep.join([
-  'metrics.reporters: file',
-  'metrics.reporter.file.class: %s' % file_reporter,
-  'metrics.reporter.file.path: %s' % cls.test_metrics_path,
-  'metrics.scope.operator: ',
-  ]))
-
-@classmethod
-def _subprocess_command(cls, job_port, expansion_port):
-  # will be cleaned up at the end of this method, and recreated and used by
-  # the job server
-  tmp_dir = mkdtemp(prefix='flinktest')
-
-  cls._create_conf_dir()
-  cls.expansion_port = expansion_port
-
-  try:
-return [
-'java',
-'-Dorg.slf4j.simpleLogger.defaultLogLevel=warn',
-'-jar',
-flink_job_server_jar,
-'--flink-master',
-'[local]',
-'--flink-conf-dir',
-cls.conf_dir,
-'--artifacts-dir',
-tmp_dir,
-'--job-port',
-str(job_port),
-'--artifact-port',
-'0',
-'--expansion-port',
-str(expansion_port),
-]
-  finally:
-rmtree(tmp_dir)
-
-@classmethod
-def get_runner(cls):
-  return portable_runner.PortableRunner()
-
-@classmethod
-def get_expansion_service(cls):
-  # TODO Move expansion address resides into PipelineOptions
-  return 'localhost:%s' % cls.expansion_port
-
-def create_options(self):
-  options = super(FlinkRunnerTest, 

[GitHub] [beam] ibzib commented on a change in pull request #12385: [BEAM-10527] Migrate Flink and Spark tests to pytest.

2020-07-30 Thread GitBox


ibzib commented on a change in pull request #12385:
URL: https://github.com/apache/beam/pull/12385#discussion_r463345888



##
File path: sdks/python/apache_beam/runners/portability/flink_runner_test.py
##
@@ -53,361 +53,380 @@
 from apache_beam.transforms import userstate
 from apache_beam.transforms.sql import SqlTransform
 
+# Run as
+#
+# pytest flink_runner_test.py \
+# [--test_pipeline_options "--flink_job_server_jar=/path/to/job_server.jar 
\
+#   --environment_type=DOCKER"] \
+# [FlinkRunnerTest.test_method, ...]
+
 _LOGGER = logging.getLogger(__name__)
 
 Row = typing.NamedTuple("Row", [("col1", int), ("col2", unicode)])
 beam.coders.registry.register_coder(Row, beam.coders.RowCoder)
 
-if __name__ == '__main__':
-  # Run as
-  #
-  # python -m apache_beam.runners.portability.flink_runner_test \
-  # --flink_job_server_jar=/path/to/job_server.jar \
-  # --environment_type=docker \
-  # --extra_experiments=beam_experiments \
-  # [FlinkRunnerTest.test_method, ...]
-
-  parser = argparse.ArgumentParser(add_help=True)
-  parser.add_argument(
-  '--flink_job_server_jar', help='Job server jar to submit jobs.')
-  parser.add_argument(
-  '--streaming',
-  default=False,
-  action='store_true',
-  help='Job type. batch or streaming')
-  parser.add_argument(
-  '--environment_type',
-  default='loopback',
-  help='Environment type. docker, process, or loopback.')
-  parser.add_argument('--environment_config', help='Environment config.')
-  parser.add_argument(
-  '--extra_experiments',
-  default=[],
-  action='append',
-  help='Beam experiments config.')
-  known_args, args = parser.parse_known_args(sys.argv)
-  sys.argv = args
-
-  flink_job_server_jar = (
-  known_args.flink_job_server_jar or
-  job_server.JavaJarJobServer.path_to_beam_jar(
-  'runners:flink:%s:job-server:shadowJar' %
-  FlinkRunnerOptions.PUBLISHED_FLINK_VERSIONS[-1]))
-  streaming = known_args.streaming
-  environment_type = known_args.environment_type.lower()
-  environment_config = (
-  known_args.environment_config if known_args.environment_config else None)
-  extra_experiments = known_args.extra_experiments
-
-  # This is defined here to only be run when we invoke this file explicitly.
-  class FlinkRunnerTest(portable_runner_test.PortableRunnerTest):
-_use_grpc = True
-_use_subprocesses = True
-
-conf_dir = None
-expansion_port = None
-
-@classmethod
-def tearDownClass(cls):
-  if cls.conf_dir and exists(cls.conf_dir):
-_LOGGER.info("removing conf dir: %s" % cls.conf_dir)
-rmtree(cls.conf_dir)
-  super(FlinkRunnerTest, cls).tearDownClass()
-
-@classmethod
-def _create_conf_dir(cls):
-  """Create (and save a static reference to) a "conf dir", used to provide
-   metrics configs and verify metrics output
-
-   It gets cleaned up when the suite is done executing"""
-
-  if hasattr(cls, 'conf_dir'):
-cls.conf_dir = mkdtemp(prefix='flinktest-conf')
-
-# path for a FileReporter to write metrics to
-cls.test_metrics_path = path.join(cls.conf_dir, 'test-metrics.txt')
-
-# path to write Flink configuration to
-conf_path = path.join(cls.conf_dir, 'flink-conf.yaml')
-file_reporter = 'org.apache.beam.runners.flink.metrics.FileReporter'
-with open(conf_path, 'w') as f:
-  f.write(
-  linesep.join([
-  'metrics.reporters: file',
-  'metrics.reporter.file.class: %s' % file_reporter,
-  'metrics.reporter.file.path: %s' % cls.test_metrics_path,
-  'metrics.scope.operator: ',
-  ]))
-
-@classmethod
-def _subprocess_command(cls, job_port, expansion_port):
-  # will be cleaned up at the end of this method, and recreated and used by
-  # the job server
-  tmp_dir = mkdtemp(prefix='flinktest')
-
-  cls._create_conf_dir()
-  cls.expansion_port = expansion_port
-
-  try:
-return [
-'java',
-'-Dorg.slf4j.simpleLogger.defaultLogLevel=warn',
-'-jar',
-flink_job_server_jar,
-'--flink-master',
-'[local]',
-'--flink-conf-dir',
-cls.conf_dir,
-'--artifacts-dir',
-tmp_dir,
-'--job-port',
-str(job_port),
-'--artifact-port',
-'0',
-'--expansion-port',
-str(expansion_port),
-]
-  finally:
-rmtree(tmp_dir)
-
-@classmethod
-def get_runner(cls):
-  return portable_runner.PortableRunner()
-
-@classmethod
-def get_expansion_service(cls):
-  # TODO Move expansion address resides into PipelineOptions
-  return 'localhost:%s' % cls.expansion_port
-
-def create_options(self):
-  options = super(FlinkRunnerTest, 

[GitHub] [beam] pabloem commented on pull request #12203: [BEAM-6928] Make Python SDK custom Sink the default Sink for BigQuery

2020-07-30 Thread GitBox


pabloem commented on pull request #12203:
URL: https://github.com/apache/beam/pull/12203#issuecomment-666830620


   Run Python2_PVR_Flink PreCommit



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] chamikaramj commented on pull request #12198: Widen ranges for GCP libraries

2020-07-30 Thread GitBox


chamikaramj commented on pull request #12198:
URL: https://github.com/apache/beam/pull/12198#issuecomment-666825962


   Thanks for clarifying. LGTM. 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] chamikaramj commented on a change in pull request #12149: [BEAM-9897] Add cross-language support to SnowflakeIO.Read

2020-07-30 Thread GitBox


chamikaramj commented on a change in pull request #12149:
URL: https://github.com/apache/beam/pull/12149#discussion_r463341503



##
File path: sdks/python/apache_beam/io/external/snowflake.py
##
@@ -0,0 +1,185 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+import typing
+
+from past.builtins import unicode
+
+import apache_beam as beam
+from apache_beam.transforms.external import BeamJarExpansionService
+from apache_beam.transforms.external import ExternalTransform
+from apache_beam.transforms.external import NamedTupleBasedPayloadBuilder
+
+"""Snowflake transforms tested against Flink portable runner.
+  **Setup**
+  Transforms provided in this module are cross-language transforms
+  implemented in the Beam Java SDK. During the pipeline construction, Python 
SDK
+  will connect to a Java expansion service to expand these transforms.
+  To facilitate this, a small amount of setup is needed before using these
+  transforms in a Beam Python pipeline.
+  There are several ways to setup cross-language Snowflake transforms.
+  * Option 1: use the default expansion service
+  * Option 2: specify a custom expansion service
+  See below for details regarding each of these options.
+  *Option 1: Use the default expansion service*
+  This is the recommended and easiest setup option for using Python Kafka
+  transforms. This option is only available for Beam 2.22.0 and later.

Review comment:
   Pls drop "This option is only available for Beam 2.22.0 and later". This 
was just for Kafka.

##
File path: sdks/python/apache_beam/io/external/snowflake.py
##
@@ -0,0 +1,144 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+import typing
+
+from past.builtins import unicode
+
+import apache_beam as beam
+from apache_beam.transforms.external import BeamJarExpansionService
+from apache_beam.transforms.external import ExternalTransform
+from apache_beam.transforms.external import NamedTupleBasedPayloadBuilder
+
+__all__ = ['ReadFromSnowflake']
+
+
+def default_io_expansion_service():
+  return BeamJarExpansionService('sdks:java:io:expansion-service:shadowJar')
+
+
+ReadFromSnowflakeSchema = typing.NamedTuple(
+'WriteToSnowflakeSchema',
+[
+('server_name', unicode),
+('schema', unicode),
+('database', unicode),
+('staging_bucket_name', unicode),
+('storage_integration_name', unicode),
+('username', typing.Optional[unicode]),
+('password', typing.Optional[unicode]),
+('private_key_path', typing.Optional[unicode]),
+('private_key_passphrase', typing.Optional[unicode]),
+('o_auth_token', typing.Optional[unicode]),
+('table', typing.Optional[unicode]),
+('query', typing.Optional[unicode]),
+])
+
+
+class ReadFromSnowflake(beam.PTransform):
+  """An external PTransform which reads from Snowflake."""
+
+  URN = 'beam:external:java:snowflake:read:v1'
+
+  def __init__(
+  self,
+  server_name,
+  schema,
+  database,
+  staging_bucket_name,
+  storage_integration_name,
+  csv_mapper,
+  username=None,
+  password=None,
+  private_key_path=None,
+  private_key_passphrase=None,
+  o_auth_token=None,
+  table=None,
+  query=None,
+  expansion_service=None):
+"""
+Initializes a read operation from Snowflake.
+
+   

[GitHub] [beam] tysonjh commented on pull request #12416: Update google-api-services versions.

2020-07-30 Thread GitBox


tysonjh commented on pull request #12416:
URL: https://github.com/apache/beam/pull/12416#issuecomment-666812322







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] tysonjh commented on pull request #12416: Update google-api-services versions.

2020-07-30 Thread GitBox


tysonjh commented on pull request #12416:
URL: https://github.com/apache/beam/pull/12416#issuecomment-666812699


   No, my mistake. It fails locally too, I was running it incorrectly.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] tysonjh commented on pull request #12416: Update google-api-services versions.

2020-07-30 Thread GitBox


tysonjh commented on pull request #12416:
URL: https://github.com/apache/beam/pull/12416#issuecomment-666808017


   More test failures. NPEs in what seems like unrelated code, but notably the 
pubsub one again which passes locally when I run it for some reason.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] TheNeuralBit merged pull request #12399: [BEAM-10559] Add apache_beam.examples.sql_taxi

2020-07-30 Thread GitBox


TheNeuralBit merged pull request #12399:
URL: https://github.com/apache/beam/pull/12399


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] pabloem commented on pull request #12082: Standardizing BigQuery job names in Beam Python and Java SDKs

2020-07-30 Thread GitBox


pabloem commented on pull request #12082:
URL: https://github.com/apache/beam/pull/12082#issuecomment-666774552


   Run Python PreCommit



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] TheNeuralBit commented on a change in pull request #12422: [BEAM-601] Run KinesisIOIT with localstack

2020-07-30 Thread GitBox


TheNeuralBit commented on a change in pull request #12422:
URL: https://github.com/apache/beam/pull/12422#discussion_r463325571



##
File path: 
sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisIOIT.java
##
@@ -34,34 +35,65 @@
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.PCollection;
 import org.joda.time.Duration;
-import org.joda.time.Instant;
-import org.junit.BeforeClass;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
+import org.testcontainers.containers.localstack.LocalStackContainer;
 
 /**
  * Integration test, that writes and reads data to and from real Kinesis. You 
need to provide {@link
  * KinesisTestOptions} in order to run this.
  */
 @RunWith(JUnit4.class)
 public class KinesisIOIT implements Serializable {
-  private static int numberOfShards;
-  private static int numberOfRows;
+  private static final String STREAM_NAME = "beam_kinesis";
+  private static final int NUM_RECORDS = 1000;
 
   @Rule public TestPipeline pipelineWrite = TestPipeline.create();
   @Rule public TestPipeline pipelineRead = TestPipeline.create();
 
-  private static KinesisTestOptions options;
-  private static final Instant now = Instant.now();
+  static {
+
System.setProperty(SDKGlobalConfiguration.DISABLE_CERT_CHECKING_SYSTEM_PROPERTY,
 "true");
+
System.setProperty(SDKGlobalConfiguration.AWS_CBOR_DISABLE_SYSTEM_PROPERTY, 
"true");
+  }
+
+  private final LocalStackContainer localstackContainer =
+  new LocalStackContainer("0.11.3")
+  .withServices(LocalStackContainer.Service.KINESIS)
+  .withEnv("USE_SSL", "true")
+  .withStartupAttempts(3);
+
+  private String endpoint;
+  private String region;
+  private String accessKey;
+  private String secretKey;
+
+  @Before
+  public void setup() throws Exception {
+localstackContainer.start();
+endpoint =
+localstackContainer
+.getEndpointConfiguration(LocalStackContainer.Service.KINESIS)
+.getServiceEndpoint()
+.replace("http", "https");
+region =
+localstackContainer
+.getEndpointConfiguration(LocalStackContainer.Service.KINESIS)
+.getSigningRegion();
+accessKey =
+
localstackContainer.getDefaultCredentialsProvider().getCredentials().getAWSAccessKeyId();
+secretKey =
+
localstackContainer.getDefaultCredentialsProvider().getCredentials().getAWSSecretKey();
+
+createStream();
+  }
 
-  @BeforeClass
-  public static void setup() {
-PipelineOptionsFactory.register(KinesisTestOptions.class);
-options = 
TestPipeline.testingPipelineOptions().as(KinesisTestOptions.class);
-numberOfShards = options.getNumberOfShards();
-numberOfRows = options.getNumberOfRecords();

Review comment:
   I think we should maintain the ability to test against production AWS. 
Someday maybe we'll get some AWS credits to run this continuously against prod, 
and it could still be useful for local testing. Could you make it so we only 
start up a localstack container if nothing in `KinesisTestOptions` is modified?
   
   When starting the localstack you could just set the relevant fields in the 
PipelineOptions and let the rest of the test read them as it does now.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] boyuanzz opened a new pull request #12430: [WIP][BEAM-10303] Scale progress with respect to windows observation.

2020-07-30 Thread GitBox


boyuanzz opened a new pull request #12430:
URL: https://github.com/apache/beam/pull/12430


   **Please** add a meaningful description for your change here
   
   
   
   Thank you for your contribution! Follow this checklist to help us 
incorporate your contribution quickly and easily:
   
- [ ] [**Choose 
reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and 
mention them in a comment (`R: @username`).
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
- [ ] Update `CHANGES.md` with noteworthy changes.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more 
tips on [how to make review process 
smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Dataflow | Flink | Samza | Spark | Twister2
   --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
 | --- | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/)
 | --- | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
 | ---
   Java | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/i
 
con)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build
 Status](htt
 
ps://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/)
   Python | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)[![Build
 

[GitHub] [beam] jiyongjung0 commented on pull request #12404: [BEAM-10598] Bumps google cloud bigquery to 1.26.1

2020-07-30 Thread GitBox


jiyongjung0 commented on pull request #12404:
URL: https://github.com/apache/beam/pull/12404#issuecomment-666770943


   Thanks!



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] simonepri commented on pull request #12429: Clean ExtractOutput of mean transform

2020-07-30 Thread GitBox


simonepri commented on pull request #12429:
URL: https://github.com/apache/beam/pull/12429#issuecomment-666770785


   R: @youngoli, @lostluck
   
   Feel free to close it without merging if you don't feel it is needed.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] amaliujia merged pull request #12417: Remove redundant pre-commits.

2020-07-30 Thread GitBox


amaliujia merged pull request #12417:
URL: https://github.com/apache/beam/pull/12417


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] youngoli commented on pull request #12350: [BEAM-10289] Dynamic splitting implementation.

2020-07-30 Thread GitBox


youngoli commented on pull request #12350:
URL: https://github.com/apache/beam/pull/12350#issuecomment-666769902


   Good idea.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] aaltay commented on pull request #12304: [BEAM-10499] Adds a descriptive toString to SamzaRunner KeyedTimerData

2020-07-30 Thread GitBox


aaltay commented on pull request #12304:
URL: https://github.com/apache/beam/pull/12304#issuecomment-666768573


   Change LGTM. Could you rebase?
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] aaltay commented on pull request #11967: [BEAM-9992] | use Sets transform in BeamSQL

2020-07-30 Thread GitBox


aaltay commented on pull request #11967:
URL: https://github.com/apache/beam/pull/11967#issuecomment-666767908


   What is the next step for this PR?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] aaltay commented on pull request #12042: [BEAM-9962] Jenkins Plugin

2020-07-30 Thread GitBox


aaltay commented on pull request #12042:
URL: https://github.com/apache/beam/pull/12042#issuecomment-666767694


   What are the next steps for this PR?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] pabloem commented on a change in pull request #12331: [BEAM-10601] DICOM API Beam IO connector

2020-07-30 Thread GitBox


pabloem commented on a change in pull request #12331:
URL: https://github.com/apache/beam/pull/12331#discussion_r463282351



##
File path: sdks/python/apache_beam/io/gcp/dicomio.py
##
@@ -372,52 +420,63 @@ def __init__(self, destination_dict, input_type, 
credential=None):
   credential: # type: Google credential object, if it is specified, the
   Http client will use it instead of the default one.
 """
-self.credential = credential
 self.destination_dict = destination_dict
 # input_type pre-check
 if input_type not in ['bytes', 'fileio']:
   raise ValueError("input_type could only be 'bytes' or 'fileio'")
 self.input_type = input_type
+self.buffer_size = buffer_size
+self.max_workers = max_workers
+self.credential = credential
 
   def expand(self, pcoll):
 return pcoll | beam.ParDo(
-_StoreInstance(self.destination_dict, self.input_type, 
self.credential))
+_StoreInstance(
+self.destination_dict,
+self.input_type,
+self.buffer_size,
+self.max_workers,
+self.credential))
 
 
 class _StoreInstance(beam.DoFn):
   """A DoFn read or fetch dicom files then push it to a dicom store."""
-  def __init__(self, destination_dict, input_type, credential=None):
-self.credential = credential
+  def __init__(
+  self,
+  destination_dict,
+  input_type,
+  buffer_size,
+  max_workers,
+  credential=None):
 # pre-check destination dict
 required_keys = ['project_id', 'region', 'dataset_id', 'dicom_store_id']
 for key in required_keys:
   if key not in destination_dict:
 raise ValueError('Must have %s in the dict.' % (key))
 self.destination_dict = destination_dict
 self.input_type = input_type
+self.buffer_size = buffer_size
+self.max_workers = max_workers
+self.credential = credential
 
-  def process(self, element):
+  def start_bundle(self):
+self.buffer = []
+
+  def finish_bundle(self):
+return self._flush()
+
+  def process(self, element, window=beam.DoFn.WindowParam):
+self.buffer.append((element, window))
+if len(self.buffer) >= self.buffer_size:
+  self._flush()

Review comment:
   you may need to `return self._flush()`, right?

##
File path: sdks/python/apache_beam/io/gcp/dicomio.py
##
@@ -164,70 +167,109 @@ class DicomSearch(PTransform):
 }
 
   """
-  def __init__(self, credential=None):
+  def __init__(self, buffer_size=8, max_workers=5, credential=None):
 """Initializes DicomSearch.
 Args:
   credential: # type: Google credential object, if it is specified, the
   Http client will use it to create sessions instead of the default.
 """
+self.buffer_size = buffer_size
+self.max_workers = max_workers
 self.credential = credential
 
   def expand(self, pcoll):
-return pcoll | beam.ParDo(_QidoSource(self.credential))
+return pcoll | beam.ParDo(
+_QidoSource(self.buffer_size, self.max_workers, self.credential))
 
 
 class _QidoSource(beam.DoFn):
   """A DoFn for executing every qido query request."""
-  def __init__(self, credential=None):
+  def __init__(self, buffer_size, max_workers, credential=None):
+self.buffer_size = buffer_size
+self.max_workers = max_workers
 self.credential = credential
 
-  def process(self, element):
+  def start_bundle(self):
+self.buffer = []
+
+  def finish_bundle(self):
+return self._flush()
+
+  def validate_element(self, element):
 # Check if all required keys present.
 required_keys = [
 'project_id', 'region', 'dataset_id', 'dicom_store_id', 'search_type'
 ]
 
-error_message = None
-
 for key in required_keys:
   if key not in element:
 error_message = 'Must have %s in the dict.' % (key)
-break
-
-if not error_message:
-  project_id = element['project_id']
-  region = element['region']
-  dataset_id = element['dataset_id']
-  dicom_store_id = element['dicom_store_id']
-  search_type = element['search_type']
-  params = element['params'] if 'params' in element else None
-
-  # Call qido search http client
-  if element['search_type'] in ['instances', "studies", "series"]:
-result, status_code = DicomApiHttpClient().qido_search(
-  project_id, region, dataset_id, dicom_store_id,
-  search_type, params, self.credential
-)
-  else:
-error_message = (
-'Search type can only be "studies", '
-'"instances" or "series"')
-
-  if not error_message:
-out = {}
-out['result'] = result
-out['status'] = status_code
-out['input'] = element
-out['success'] = (status_code == 200)
-return [out]
-
-# Return this when the input dict dose not meet the requirements
+return False, error_message
+
+# Check if return type is correct.
+if 

[GitHub] [beam] aaltay commented on pull request #12320: [BEAM-10512] adds parsedData member variable to HL7v2Message and a Builder for the class

2020-07-30 Thread GitBox


aaltay commented on pull request #12320:
URL: https://github.com/apache/beam/pull/12320#issuecomment-666767097


   @dranderson1117 - Could someone from healthcare team do a first pass review?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] simonepri opened a new pull request #12429: Clean ExtractOutput of mean transform

2020-07-30 Thread GitBox


simonepri opened a new pull request #12429:
URL: https://github.com/apache/beam/pull/12429


   ExtractOutput it is not called when the input PCollection is empty so the if 
statement that this PR removes is reduntant.
   
   
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Dataflow | Flink | Samza | Spark | Twister2
   --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
 | --- | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/)
 | --- | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
 | ---
   Java | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/i
 
con)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build
 Status](htt
 
ps://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/)
   Python | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/)
 | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_
 
Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)[![Build
 

[GitHub] [beam] aaltay commented on pull request #12404: [BEAM-10598] Bumps google cloud bigquery to 1.26.1

2020-07-30 Thread GitBox


aaltay commented on pull request #12404:
URL: https://github.com/apache/beam/pull/12404#issuecomment-666766550


   I think we can merge this as is, and let #12198 extend the range a bit more. 
Based on PR description this sounds like a blocker for using 
google-cloud-storage.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] TheNeuralBit closed pull request #12298: [BEAM-10137][BEAM-10138] Python wrapper KinesisIO integration tests

2020-07-30 Thread GitBox


TheNeuralBit closed pull request #12298:
URL: https://github.com/apache/beam/pull/12298


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] TheNeuralBit commented on pull request #12298: [BEAM-10137][BEAM-10138] Python wrapper KinesisIO integration tests

2020-07-30 Thread GitBox


TheNeuralBit commented on pull request #12298:
URL: https://github.com/apache/beam/pull/12298#issuecomment-666766488


   Closing in favor of https://github.com/apache/beam/pull/12297



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] TheNeuralBit commented on pull request #12234: [BEAM-10138] Add Cross-language KinesisWrite external transform

2020-07-30 Thread GitBox


TheNeuralBit commented on pull request #12234:
URL: https://github.com/apache/beam/pull/12234#issuecomment-666766596


   Closing in favor of #12297



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] aaltay merged pull request #12404: [BEAM-10598] Bumps google cloud bigquery to 1.26.1

2020-07-30 Thread GitBox


aaltay merged pull request #12404:
URL: https://github.com/apache/beam/pull/12404


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] TheNeuralBit commented on pull request #12235: [BEAM-10138] Python wrapper for Cross-language Java's KinesisIO

2020-07-30 Thread GitBox


TheNeuralBit commented on pull request #12235:
URL: https://github.com/apache/beam/pull/12235#issuecomment-666766546


   Closing in favor of #12297



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] TheNeuralBit closed pull request #12235: [BEAM-10138] Python wrapper for Cross-language Java's KinesisIO

2020-07-30 Thread GitBox


TheNeuralBit closed pull request #12235:
URL: https://github.com/apache/beam/pull/12235


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] TheNeuralBit closed pull request #12234: [BEAM-10138] Add Cross-language KinesisWrite external transform

2020-07-30 Thread GitBox


TheNeuralBit closed pull request #12234:
URL: https://github.com/apache/beam/pull/12234


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] jiyongjung0 commented on pull request #12404: [BEAM-10598] Bumps google cloud bigquery to 1.26.1

2020-07-30 Thread GitBox


jiyongjung0 commented on pull request #12404:
URL: https://github.com/apache/beam/pull/12404#issuecomment-666765978


   Thank you for the reviews. I'll wait for #12198 which has wider effect.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] aaltay merged pull request #12372: [BEAM-10545] KernelModel and jest tests

2020-07-30 Thread GitBox


aaltay merged pull request #12372:
URL: https://github.com/apache/beam/pull/12372


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] simonepri commented on pull request #12421: Fix go count on an empty pcollection

2020-07-30 Thread GitBox


simonepri commented on pull request #12421:
URL: https://github.com/apache/beam/pull/12421#issuecomment-666763106


   > In practice, having no elements at all is so uncommon, it's not reasonable 
to include that in basic utility transforms, since they never apply, and no 
data = no compute is a better condition to maintain.
   I agree it is uncommon, but still it is probably good to have some sane 
default for common operations.
   
   For instance (when in doubt) it might be a good idea (or maybe not) to be 
consistent with what happen in SQL.
   In SQL the sum of an empty set is null (i.e. in our case an empty 
PCollection), while the count of an empty set is 0.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] TheNeuralBit commented on a change in pull request #12426: [BEAM-7996] Add support for MapType and Nulls in container types for Python RowCoder

2020-07-30 Thread GitBox


TheNeuralBit commented on a change in pull request #12426:
URL: https://github.com/apache/beam/pull/12426#discussion_r463311197



##
File path: model/pipeline/src/main/proto/beam_runner_api.proto
##
@@ -855,10 +855,21 @@ message StandardCoders {
 // BOOLEAN:   beam:coder:bool:v1
 // BYTES: beam:coder:bytes:v1
 //   ArrayType:   beam:coder:iterable:v1 (always has a known length)
-//   MapType: not yet a standard coder (BEAM-7996)
+//   MapType: not a standard coder, specification defined below.
 //   RowType: beam:coder:row:v1
 //   LogicalType: Uses the coder for its representation.
 //
+// The MapType is encoded by:
+//   - An INT32 representing the size of the map (N)
+//   - Followed by N interleaved keys and values, encoded with their
+// corresponding coder.
+//
+// Nullable types in container types (ArrayType, MapType) are encoded by:
+//   - A one byte null indicator, 0x00 for null values, or 0x01 for present
+// values.
+//   - For present values the null indicator is followed by the value
+// encoded with it's corresponding coder.
+//

Review comment:
   This is specifically for nullable types that are elements of an Array or 
keys/values of a Map. For rows we encode nulls in a separate bitmask:
   
https://github.com/apache/beam/blob/587dde57cbb2b0095a1fa04b59798d1b62c66f18/model/pipeline/src/main/proto/beam_runner_api.proto#L837-L843
   
   Java schemas will already let you use nullable types for keys and values in 
Maps. I doubt anyone is relying on it.. but there's a risk if we disallow it 
now.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] TheNeuralBit commented on a change in pull request #12426: [BEAM-7996] Add support for MapType and Nulls in container types for Python RowCoder

2020-07-30 Thread GitBox


TheNeuralBit commented on a change in pull request #12426:
URL: https://github.com/apache/beam/pull/12426#discussion_r463311197



##
File path: model/pipeline/src/main/proto/beam_runner_api.proto
##
@@ -855,10 +855,21 @@ message StandardCoders {
 // BOOLEAN:   beam:coder:bool:v1
 // BYTES: beam:coder:bytes:v1
 //   ArrayType:   beam:coder:iterable:v1 (always has a known length)
-//   MapType: not yet a standard coder (BEAM-7996)
+//   MapType: not a standard coder, specification defined below.
 //   RowType: beam:coder:row:v1
 //   LogicalType: Uses the coder for its representation.
 //
+// The MapType is encoded by:
+//   - An INT32 representing the size of the map (N)
+//   - Followed by N interleaved keys and values, encoded with their
+// corresponding coder.
+//
+// Nullable types in container types (ArrayType, MapType) are encoded by:
+//   - A one byte null indicator, 0x00 for null values, or 0x01 for present
+// values.
+//   - For present values the null indicator is followed by the value
+// encoded with it's corresponding coder.
+//

Review comment:
   This is specifically for nullable types that are elements of an Array or 
keys/values of a Map. For rows we nulls in a separate bitmask:
   
https://github.com/apache/beam/blob/587dde57cbb2b0095a1fa04b59798d1b62c66f18/model/pipeline/src/main/proto/beam_runner_api.proto#L837-L843
   
   Java schemas will already let you use nullable types for keys and values in 
Maps. I doubt anyone is relying on it.. but there's a risk if we disallow it 
now.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] TheNeuralBit commented on a change in pull request #12426: [BEAM-7996] Add support for MapType and Nulls in container types for Python RowCoder

2020-07-30 Thread GitBox


TheNeuralBit commented on a change in pull request #12426:
URL: https://github.com/apache/beam/pull/12426#discussion_r463308917



##
File path: sdks/python/apache_beam/coders/coder_impl.py
##
@@ -530,6 +530,88 @@ def estimate_size(self, unused_value, nested=False):
 return 1
 
 
+class MapCoderImpl(StreamCoderImpl):
+  """For internal use only; no backwards-compatibility guarantees.
+
+  A coder for typing.Mapping objects."""
+  def __init__(
+  self,
+  key_coder,  # type: CoderImpl
+  value_coder  # type: CoderImpl
+  ):
+self._key_coder = key_coder
+self._value_coder = value_coder
+
+  def encode_to_stream(self, value, out, nested):
+size = len(value)
+out.write_bigendian_int32(size)
+for i, kv in enumerate(value.items()):
+  key, value = kv
+  last = i == size - 1

Review comment:
   Same comment as above, this was just an attempt to replicate the Java 
logic. I could change them both if you think we should





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] TheNeuralBit commented on a change in pull request #12426: [BEAM-7996] Add support for MapType and Nulls in container types for Python RowCoder

2020-07-30 Thread GitBox


TheNeuralBit commented on a change in pull request #12426:
URL: https://github.com/apache/beam/pull/12426#discussion_r463308693



##
File path: sdks/python/apache_beam/coders/coder_impl.py
##
@@ -530,6 +530,88 @@ def estimate_size(self, unused_value, nested=False):
 return 1
 
 
+class MapCoderImpl(StreamCoderImpl):
+  """For internal use only; no backwards-compatibility guarantees.
+
+  A coder for typing.Mapping objects."""
+  def __init__(
+  self,
+  key_coder,  # type: CoderImpl
+  value_coder  # type: CoderImpl
+  ):
+self._key_coder = key_coder
+self._value_coder = value_coder
+
+  def encode_to_stream(self, value, out, nested):
+size = len(value)
+out.write_bigendian_int32(size)

Review comment:
   Yeah I thought about that too. I was just trying to exactly replicate 
what we do in 
[MapCoder.java](https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/MapCoder.java)
 for now though. Do you think I should switch this and MapCoder.java over to 
using varint here?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] pabloem commented on pull request #12203: [BEAM-6928] Make Python SDK custom Sink the default Sink for BigQuery

2020-07-30 Thread GitBox


pabloem commented on pull request #12203:
URL: https://github.com/apache/beam/pull/12203#issuecomment-666747008


   Run Python2_PVR_Flink PreCommit



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] ibzib opened a new pull request #12428: Update Flink Version Compatibility table

2020-07-30 Thread GitBox


ibzib opened a new pull request #12428:
URL: https://github.com/apache/beam/pull/12428


   R: @mxm
   
   
   
   Thank you for your contribution! Follow this checklist to help us 
incorporate your contribution quickly and easily:
   
- [ ] [**Choose 
reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and 
mention them in a comment (`R: @username`).
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
- [ ] Update `CHANGES.md` with noteworthy changes.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more 
tips on [how to make review process 
smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Dataflow | Flink | Samza | Spark | Twister2
   --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
 | --- | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/)
 | --- | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
 | ---
   Java | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/i
 
con)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build
 Status](htt
 
ps://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/)
   Python | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)[![Build
 

[GitHub] [beam] lostluck commented on pull request #12350: [BEAM-10289] Dynamic splitting implementation.

2020-07-30 Thread GitBox


lostluck commented on pull request #12350:
URL: https://github.com/apache/beam/pull/12350#issuecomment-666735372


   Consider updating the PR description to match the current state :)



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] pabloem commented on pull request #12203: [BEAM-6928] Make Python SDK custom Sink the default Sink for BigQuery

2020-07-30 Thread GitBox


pabloem commented on pull request #12203:
URL: https://github.com/apache/beam/pull/12203#issuecomment-666735046


   Run Python2_PVR_Flink PreCommit



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] lostluck merged pull request #12402: [BEAM-9679] Add Composite transforms to Go SDK katas

2020-07-30 Thread GitBox


lostluck merged pull request #12402:
URL: https://github.com/apache/beam/pull/12402


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] pabloem commented on a change in pull request #12082: Standardizing BigQuery job names in Beam Python and Java SDKs

2020-07-30 Thread GitBox


pabloem commented on a change in pull request #12082:
URL: https://github.com/apache/beam/pull/12082#discussion_r463295587



##
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
##
@@ -457,6 +456,22 @@
  *  for security and permission related information specific to BigQuery.
  */
 public class BigQueryIO {
+
+  /**
+   * Template for BigQuery jobs created by BigQueryIO. This template is: {@code
+   * "beam_bq_job_{TYPE}_{JOB_ID}_{STEP}_{RANDOM}"}, where:
+   *
+   * 
+   *   {@code TYPE} represents the BigQuery job type (e.g. extract / copy 
/ load / query)
+   *   {@code JOB_ID} is the Dataflow job name.

Review comment:
   ah the job name comes from Beam, and is autogenerated if one is not 
provided. Changed the documentation to express that.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] pabloem commented on pull request #12082: Standardizing BigQuery job names in Beam Python and Java SDKs

2020-07-30 Thread GitBox


pabloem commented on pull request #12082:
URL: https://github.com/apache/beam/pull/12082#issuecomment-666732227


   Run Java PostCommit



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] robertwb commented on a change in pull request #12426: [BEAM-7996] Add support for MapType and Nulls in container types for Python RowCoder

2020-07-30 Thread GitBox


robertwb commented on a change in pull request #12426:
URL: https://github.com/apache/beam/pull/12426#discussion_r463291968



##
File path: model/pipeline/src/main/proto/beam_runner_api.proto
##
@@ -855,10 +855,21 @@ message StandardCoders {
 // BOOLEAN:   beam:coder:bool:v1
 // BYTES: beam:coder:bytes:v1
 //   ArrayType:   beam:coder:iterable:v1 (always has a known length)
-//   MapType: not yet a standard coder (BEAM-7996)
+//   MapType: not a standard coder, specification defined below.
 //   RowType: beam:coder:row:v1
 //   LogicalType: Uses the coder for its representation.
 //
+// The MapType is encoded by:
+//   - An INT32 representing the size of the map (N)
+//   - Followed by N interleaved keys and values, encoded with their
+// corresponding coder.
+//
+// Nullable types in container types (ArrayType, MapType) are encoded by:
+//   - A one byte null indicator, 0x00 for null values, or 0x01 for present
+// values.
+//   - For present values the null indicator is followed by the value
+// encoded with it's corresponding coder.
+//

Review comment:
   For maps specifically, do we want to allow null keys? Is it valuable to 
have null values (as distinct from just not present)? I might lean towards 
disallowing nulls and then possibly allowing it in the future if we have good 
reason to, which will be forward compatible. 

##
File path: sdks/python/apache_beam/coders/coder_impl.py
##
@@ -530,6 +530,88 @@ def estimate_size(self, unused_value, nested=False):
 return 1
 
 
+class MapCoderImpl(StreamCoderImpl):
+  """For internal use only; no backwards-compatibility guarantees.
+
+  A coder for typing.Mapping objects."""
+  def __init__(
+  self,
+  key_coder,  # type: CoderImpl
+  value_coder  # type: CoderImpl
+  ):
+self._key_coder = key_coder
+self._value_coder = value_coder
+
+  def encode_to_stream(self, value, out, nested):
+size = len(value)
+out.write_bigendian_int32(size)

Review comment:
   We don't care about larger maps? Also, why not varint? (If that's what's 
conventionally used elsewhere, I'm fine with that, but iterable coder uses 
varint64.)

##
File path: model/pipeline/src/main/proto/beam_runner_api.proto
##
@@ -855,10 +855,21 @@ message StandardCoders {
 // BOOLEAN:   beam:coder:bool:v1
 // BYTES: beam:coder:bytes:v1
 //   ArrayType:   beam:coder:iterable:v1 (always has a known length)
-//   MapType: not yet a standard coder (BEAM-7996)
+//   MapType: not a standard coder, specification defined below.
 //   RowType: beam:coder:row:v1
 //   LogicalType: Uses the coder for its representation.
 //
+// The MapType is encoded by:
+//   - An INT32 representing the size of the map (N)
+//   - Followed by N interleaved keys and values, encoded with their
+// corresponding coder.
+//
+// Nullable types in container types (ArrayType, MapType) are encoded by:
+//   - A one byte null indicator, 0x00 for null values, or 0x01 for present
+// values.
+//   - For present values the null indicator is followed by the value
+// encoded with it's corresponding coder.
+//

Review comment:
   Is this just the encoding of a nullable type? (Why does it have to be 
called out specially?)

##
File path: sdks/python/apache_beam/coders/coder_impl.py
##
@@ -530,6 +530,88 @@ def estimate_size(self, unused_value, nested=False):
 return 1
 
 
+class MapCoderImpl(StreamCoderImpl):
+  """For internal use only; no backwards-compatibility guarantees.
+
+  A coder for typing.Mapping objects."""
+  def __init__(
+  self,
+  key_coder,  # type: CoderImpl
+  value_coder  # type: CoderImpl
+  ):
+self._key_coder = key_coder
+self._value_coder = value_coder
+
+  def encode_to_stream(self, value, out, nested):
+size = len(value)
+out.write_bigendian_int32(size)
+for i, kv in enumerate(value.items()):
+  key, value = kv
+  last = i == size - 1

Review comment:
   Just nest everything. That'll simplify the logic and the definition of 
the coder. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] mf2199 commented on a change in pull request #11295: [BEAM-3342] CBT `Read` IO Connector Wrapper

2020-07-30 Thread GitBox


mf2199 commented on a change in pull request #11295:
URL: https://github.com/apache/beam/pull/11295#discussion_r463294587



##
File path: sdks/python/apache_beam/io/gcp/bigtableio.py
##
@@ -1,151 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""BigTable connector
-
-This module implements writing to BigTable tables.

Review comment:
   If I understand correctly, we are to restore this file in the original 
version while keeping the altered one in the `experimental` folder. If so, then 
it's done.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] pabloem commented on pull request #12203: [BEAM-6928] Make Python SDK custom Sink the default Sink for BigQuery

2020-07-30 Thread GitBox


pabloem commented on pull request #12203:
URL: https://github.com/apache/beam/pull/12203#issuecomment-666729271


   Run Python2_PVR_Flink PreCommit



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] robertwb merged pull request #12362: Convert katas to use with syntax rather than explicit run call.

2020-07-30 Thread GitBox


robertwb merged pull request #12362:
URL: https://github.com/apache/beam/pull/12362


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] robinyqiu commented on a change in pull request #12348: [BEAM-10240] Support ZetaSQL DATETIME functions in BeamSQL

2020-07-30 Thread GitBox


robinyqiu commented on a change in pull request #12348:
URL: https://github.com/apache/beam/pull/12348#discussion_r463286427



##
File path: 
sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlBeamTranslationUtils.java
##
@@ -184,6 +188,19 @@ private static Value 
beamLogicalObjectToZetaSqlValue(Object object, String ident
   } else { // input type
 return 
Value.createTimeValue(CivilTimeEncoder.encodePacked64TimeNanos((LocalTime) 
object));
   }
+} else if (SqlTypes.DATETIME.getIdentifier().equals(identifier)) {
+  // DateTime value
+  LocalDateTime datetime;
+  if (object instanceof Row) { // base type
+datetime =
+LocalDateTime.of(
+LocalDate.ofEpochDay(((Row) object).getValue("Date")),
+LocalTime.ofNanoOfDay(((Row) object).getValue("Time")));
+  } else { // input type
+datetime = (LocalDateTime) object;
+  }
+  return Value.createDatetimeValue(

Review comment:
   I created a JIRA for this: 
https://issues.apache.org/jira/browse/BEAM-10611
   
   Please add a TODO here such that we can make improvement on this in the 
future.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] y1chi commented on pull request #12427: nexmark python queries 0, 1, 2 and 9

2020-07-30 Thread GitBox


y1chi commented on pull request #12427:
URL: https://github.com/apache/beam/pull/12427#issuecomment-666716230


   Could you rename the title to be more meaningful and add `[BEAM-2855]` in 
the front so that it will be hooked with the right JIRA ticket 
https://issues.apache.org/jira/browse/BEAM-2855.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] TheNeuralBit commented on a change in pull request #12399: [BEAM-10559] Add apache_beam.examples.sql_taxi

2020-07-30 Thread GitBox


TheNeuralBit commented on a change in pull request #12399:
URL: https://github.com/apache/beam/pull/12399#discussion_r463279232



##
File path: sdks/python/apache_beam/examples/sql_taxi.py
##
@@ -0,0 +1,97 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""An example that processes streaming NYC Taxi data with SqlTransform.
+
+ This example reads from the PubSub NYC Taxi stream described in

Review comment:
   Done, thanks





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] pabloem commented on pull request #12151: [BEAM-9896] Add streaming for SnowflakeIO.Write to Java SDK

2020-07-30 Thread GitBox


pabloem commented on pull request #12151:
URL: https://github.com/apache/beam/pull/12151#issuecomment-666703403


   One more thing: Please add a description of the change to `CHANGES.md`, so 
we can have this on the release notes. Then I can merge.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] pabloem commented on a change in pull request #12151: [BEAM-9896] Add streaming for SnowflakeIO.Write to Java SDK

2020-07-30 Thread GitBox


pabloem commented on a change in pull request #12151:
URL: https://github.com/apache/beam/pull/12151#discussion_r463273363



##
File path: 
sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/services/SnowflakeStreamingServiceConfig.java
##
@@ -20,26 +20,51 @@
 import java.util.List;
 import net.snowflake.ingest.SimpleIngestManager;
 
+/** Class for preparing configuration for streaming write. */
 public class SnowflakeStreamingServiceConfig extends ServiceConfig {
-  SimpleIngestManager ingestManager;
-  List filesList;
-  String stagingBucketDir;
+  private SimpleIngestManager ingestManager;
+  private List filesList;
+  private String stagingBucketDir;

Review comment:
   should these attributes also be final?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] pabloem commented on a change in pull request #12151: [BEAM-9896] Add streaming for SnowflakeIO.Write to Java SDK

2020-07-30 Thread GitBox


pabloem commented on a change in pull request #12151:
URL: https://github.com/apache/beam/pull/12151#discussion_r463219932



##
File path: 
sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/SnowflakeIO.java
##
@@ -719,7 +906,9 @@ private void checkArguments() {
   return getUserDataMapper().mapRow(element);
 }
   }))
-  .apply("Map Objects array to CSV lines", ParDo.of(new 
MapObjectsArrayToCsvFn()))
+  .apply(
+  "Map Objects array to CSV lines",
+  ParDo.of(new MapObjectsArrayToCsvFn(getQuotationMark(

Review comment:
   that makes sense to me. Thanks Kasia!

##
File path: 
sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/SnowflakeIO.java
##
@@ -684,14 +819,61 @@ private void checkArguments() {
   (getDataSourceProviderFn() != null),
   "withDataSourceConfiguration() or withDataSourceProviderFn() is 
required");
 
-  checkArgument(getTable() != null, "withTable() is required");
+  if (input.isBounded() == PCollection.IsBounded.UNBOUNDED) {
+checkArgument(getSnowPipe() != null, "withSnowPipe() is required");
+  } else {
+checkArgument(getTable() != null, "to() is required");
+  }
 }
 
-private PCollection write(PCollection input, String 
stagingBucketDir) {
+private PCollection writeStream(PCollection input, String 
stagingBucketDir) {
   SnowflakeService snowflakeService =
-  getSnowflakeService() != null ? getSnowflakeService() : new 
SnowflakeServiceImpl();
+  getSnowflakeService() != null
+  ? getSnowflakeService()
+  : new SnowflakeStreamingServiceImpl();
+
+  /* Ensure that files will be created after specific record count or 
duration specified */
+  PCollection inputInGlobalWindow =
+  input.apply(
+  "rewindowIntoGlobal",
+  Window.into(new GlobalWindows())
+  .triggering(
+  Repeatedly.forever(
+  AfterFirst.of(
+  AfterProcessingTime.pastFirstElementInPane()
+  .plusDelayOf(getFlushTimeLimit()),
+  
AfterPane.elementCountAtLeast(getFlushRowLimit()
+  .discardingFiredPanes());
+
+  int shards = (getShardsNumber() > 0) ? getShardsNumber() : 
DEFAULT_STREAMING_SHARDS_NUMBER;
+  PCollection files = writeFiles(inputInGlobalWindow, stagingBucketDir, 
shards);
+
+  /* Ensuring that files will be ingested after flush time */
+  files =
+  (PCollection)
+  files.apply(
+  "applyUserTrigger",
+  Window.into(new GlobalWindows())
+  .triggering(
+  Repeatedly.forever(
+  AfterProcessingTime.pastFirstElementInPane()
+  .plusDelayOf(getFlushTimeLimit(
+  .discardingFiredPanes());

Review comment:
   I think we can leave this as it is. I see that the trigger is not the 
exact same.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] robinyqiu commented on pull request #12403: [BEAM-10597] Propagate BigQuery streaming insert throttled time to Dataflow worker

2020-07-30 Thread GitBox


robinyqiu commented on pull request #12403:
URL: https://github.com/apache/beam/pull/12403#issuecomment-96191


   Run Python PreCommit



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] lostluck merged pull request #12421: Fix go count on an empty pcollection

2020-07-30 Thread GitBox


lostluck merged pull request #12421:
URL: https://github.com/apache/beam/pull/12421


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] danielxjd commented on a change in pull request #12223: [Beam-4379] Make ParquetIO read splittable

2020-07-30 Thread GitBox


danielxjd commented on a change in pull request #12223:
URL: https://github.com/apache/beam/pull/12223#discussion_r463259859



##
File path: 
sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
##
@@ -235,12 +277,195 @@ public ReadFiles withAvroDataModel(GenericData model) {
   return toBuilder().setAvroDataModel(model).build();
 }
 
+public ReadFiles withSplit() {
+  return toBuilder().setSplit(true).build();
+}
+
 @Override
 public PCollection expand(PCollection 
input) {
   checkNotNull(getSchema(), "Schema can not be null");
-  return input
-  .apply(ParDo.of(new ReadFn(getAvroDataModel(
-  .setCoder(AvroCoder.of(getSchema()));
+  if (!getSplit()) {
+return input
+.apply(ParDo.of(new SplitReadFn(getAvroDataModel(
+.setCoder(AvroCoder.of(getSchema()));
+  } else {
+return input
+.apply(ParDo.of(new ReadFn(getAvroDataModel(
+.setCoder(AvroCoder.of(getSchema()));
+  }
+}
+
+@DoFn.BoundedPerElement
+static class SplitReadFn extends DoFn {
+  private Class modelClass;
+  private static final Logger LOG = 
LoggerFactory.getLogger(SplitReadFn.class);
+  ReadSupport readSupport;
+
+  SplitReadFn(GenericData model) {
+this.modelClass = model != null ? model.getClass() : null;
+  }
+
+  private static  Map> toSetMultiMap(Map map) {
+Map> setMultiMap = new HashMap>();
+for (Map.Entry entry : map.entrySet()) {
+  Set set = new HashSet();
+  set.add(entry.getValue());
+  setMultiMap.put(entry.getKey(), Collections.unmodifiableSet(set));
+}
+return Collections.unmodifiableMap(setMultiMap);
+  }
+
+  private InputFile getInputFile(FileIO.ReadableFile file) throws 
IOException {
+if (!file.getMetadata().isReadSeekEfficient()) {
+  throw new RuntimeException(
+  String.format("File has to be seekable: %s", 
file.getMetadata().resourceId()));
+}
+return new BeamParquetInputFile(file.openSeekable());
+  }
+
+  @ProcessElement
+  public void processElement(
+  @Element FileIO.ReadableFile file,
+  RestrictionTracker tracker,
+  OutputReceiver outputReceiver)
+  throws Exception {
+ReadSupport readSupport;
+InputFile inputFile = getInputFile(file);
+Configuration conf = setConf();
+GenericData model = null;
+if (modelClass != null) {
+  model = (GenericData) modelClass.getMethod("get").invoke(null);
+}
+readSupport = new AvroReadSupport(model);
+ParquetReadOptions options = HadoopReadOptions.builder(conf).build();
+ParquetFileReader reader = ParquetFileReader.open(inputFile, options);
+Filter filter = checkNotNull(options.getRecordFilter(), "filter");
+conf = ((HadoopReadOptions) options).getConf();
+for (String property : options.getPropertyNames()) {
+  conf.set(property, options.getProperty(property));
+}
+FileMetaData parquetFileMetadata = 
reader.getFooter().getFileMetaData();
+MessageType fileSchema = parquetFileMetadata.getSchema();
+Map fileMetadata = 
parquetFileMetadata.getKeyValueMetaData();
+
+ReadSupport.ReadContext readContext =
+readSupport.init(new InitContext(conf, 
toSetMultiMap(fileMetadata), fileSchema));
+ColumnIOFactory columnIOFactory = new 
ColumnIOFactory(parquetFileMetadata.getCreatedBy());
+MessageType requestedSchema = readContext.getRequestedSchema();
+RecordMaterializer recordConverter =
+readSupport.prepareForRead(conf, fileMetadata, fileSchema, 
readContext);
+boolean strictTypeChecking = options.isEnabled(STRICT_TYPE_CHECKING, 
true);
+boolean filterRecords = options.useRecordFilter();
+reader.setRequestedSchema(requestedSchema);
+MessageColumnIO columnIO =
+columnIOFactory.getColumnIO(requestedSchema, fileSchema, 
strictTypeChecking);
+long currentBlock = tracker.currentRestriction().getFrom();
+for (int i = 0; i < currentBlock; i++) {
+  reader.skipNextRowGroup();
+}
+while (tracker.tryClaim(currentBlock)) {
+
+  LOG.info("reading block" + currentBlock);
+  PageReadStore pages = reader.readNextRowGroup();
+  currentBlock += 1;
+  RecordReader recordReader =
+  columnIO.getRecordReader(
+  pages, recordConverter, filterRecords ? filter : 
FilterCompat.NOOP);
+  long currentRow = 0;
+  long totalRows = pages.getRowCount();
+  while (currentRow < totalRows) {
+try {
+  GenericRecord record;
+  currentRow += 1;
+  try {
+record = recordReader.read();
+  } catch 

[GitHub] [beam] leiyiz commented on pull request #12427: nexmark python queries 0, 1, 2 and 9

2020-07-30 Thread GitBox


leiyiz commented on pull request #12427:
URL: https://github.com/apache/beam/pull/12427#issuecomment-40310


   R: @pabloem 
   R: @y1chi 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] leiyiz opened a new pull request #12427: nexmark python queries 0, 1, 2 and 9

2020-07-30 Thread GitBox


leiyiz opened a new pull request #12427:
URL: https://github.com/apache/beam/pull/12427


   changed queries 0-2, implemented query9, coders for all models, new models 
for query results, changed launcher to count and print results to file
   
   
   
   Thank you for your contribution! Follow this checklist to help us 
incorporate your contribution quickly and easily:
   
- [ ] [**Choose 
reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and 
mention them in a comment (`R: @username`).
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
- [ ] Update `CHANGES.md` with noteworthy changes.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more 
tips on [how to make review process 
smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Dataflow | Flink | Samza | Spark | Twister2
   --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
 | --- | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/)
 | --- | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
 | ---
   Java | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/i
 
con)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build
 Status](htt
 
ps://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/)
   Python | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build
 

[GitHub] [beam] udim commented on pull request #12009: [BEAM-10258] Support type hint annotations on PTransform's expand()

2020-07-30 Thread GitBox


udim commented on pull request #12009:
URL: https://github.com/apache/beam/pull/12009#issuecomment-31741


   Run Python2_PVR_Flink PreCommit



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] leiyiz closed pull request #12365: [BEAM-8258] python nexmark query 0-2 improvements

2020-07-30 Thread GitBox


leiyiz closed pull request #12365:
URL: https://github.com/apache/beam/pull/12365


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] lostluck commented on a change in pull request #12426: [BEAM-7996] Add support for MapType and Nulls in container types for Python RowCoder

2020-07-30 Thread GitBox


lostluck commented on a change in pull request #12426:
URL: https://github.com/apache/beam/pull/12426#discussion_r463208404



##
File path: model/pipeline/src/main/proto/beam_runner_api.proto
##
@@ -855,10 +855,21 @@ message StandardCoders {
 // BOOLEAN:   beam:coder:bool:v1
 // BYTES: beam:coder:bytes:v1
 //   ArrayType:   beam:coder:iterable:v1 (always has a known length)
-//   MapType: not yet a standard coder (BEAM-7996)
+//   MapType: not a standard coder, specification defined below.
 //   RowType: beam:coder:row:v1
 //   LogicalType: Uses the coder for its representation.
 //
+// The MapType is encoded by:
+//   - An INT32 representing the size of the map (N)
+//   - Followed by N interleaved keys and values, encoded with their
+// corresponding coder.
+//
+// Nullable types in container types (ArrayType, MapType) are encoded by:
+//   - A one byte null indicator, 0x00 for null values, or 0x01 for present
+// values.
+//   - For present values the null indicator is followed by the value
+// encoded with it's corresponding coder.
+//

Review comment:
   Ack! Thanks!





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] TheNeuralBit commented on pull request #12426: [BEAM-7996] Add support for MapType and Nulls in container types for Python RowCoder

2020-07-30 Thread GitBox


TheNeuralBit commented on pull request #12426:
URL: https://github.com/apache/beam/pull/12426#issuecomment-666590444


   This is relevant for BEAM-10571. Some existing payloads use `List>` 
fields, which we could replace with Map when using schemas to encode the 
payload.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] TheNeuralBit edited a comment on pull request #12426: [BEAM-7996] Add support for MapType and Nulls in container types for Python RowCoder

2020-07-30 Thread GitBox


TheNeuralBit edited a comment on pull request #12426:
URL: https://github.com/apache/beam/pull/12426#issuecomment-666590444


   This is relevant for BEAM-10571. Some existing payloads use `List>` 
fields, which we could replace with `Map` when using schemas to encode the 
payload.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] TheNeuralBit commented on a change in pull request #12426: [BEAM-7996] Add support for MapType and Nulls in container types for Python RowCoder

2020-07-30 Thread GitBox


TheNeuralBit commented on a change in pull request #12426:
URL: https://github.com/apache/beam/pull/12426#discussion_r463194957



##
File path: model/pipeline/src/main/proto/beam_runner_api.proto
##
@@ -855,10 +855,21 @@ message StandardCoders {
 // BOOLEAN:   beam:coder:bool:v1
 // BYTES: beam:coder:bytes:v1
 //   ArrayType:   beam:coder:iterable:v1 (always has a known length)
-//   MapType: not yet a standard coder (BEAM-7996)
+//   MapType: not a standard coder, specification defined below.
 //   RowType: beam:coder:row:v1
 //   LogicalType: Uses the coder for its representation.
 //
+// The MapType is encoded by:
+//   - An INT32 representing the size of the map (N)
+//   - Followed by N interleaved keys and values, encoded with their
+// corresponding coder.
+//
+// Nullable types in container types (ArrayType, MapType) are encoded by:
+//   - A one byte null indicator, 0x00 for null values, or 0x01 for present
+// values.
+//   - For present values the null indicator is followed by the value
+// encoded with it's corresponding coder.
+//

Review comment:
   FYI @lostluck - previously the encoding for MapType and for nullable 
types within containers was not documented.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] KevinGG commented on a change in pull request #12372: [BEAM-10545] KernelModel and jest tests

2020-07-30 Thread GitBox


KevinGG commented on a change in pull request #12372:
URL: https://github.com/apache/beam/pull/12372#discussion_r463182344



##
File path: 
sdks/python/apache_beam/runners/interactive/extensions/apache-beam-jupyterlab-sidepanel/src/__tests__/kernel/KernelModel.test.ts
##
@@ -0,0 +1,124 @@
+// Licensed under the Apache License, Version 2.0 (the 'License'); you may not
+// use this file except in compliance with the License. You may obtain a copy 
of
+// the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an 'AS IS' BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations 
under
+// the License.
+
+/**
+ * Tests for KernelModel module.
+ *
+ * Non camelcase fields are nbformat fields used in notebooks. Lint is ignored
+ * for them.
+ */
+
+import { KernelModel } from '../../kernel/KernelModel';
+
+const fakeSessionContext = {
+  session: {
+kernel: {
+  requestExecute: function(): object {
+return {
+  onIOPub: function(): void {
+// do nothing
+  }
+};
+  }
+}
+  }
+};
+
+it('creates new future with IOPub callbacks when executing new code in 
kernel', () => {
+  const kernelModel = new KernelModel(fakeSessionContext as any);
+  kernelModel.execute('new code');
+  expect(kernelModel.future).not.toBe(null);
+  expect(kernelModel.future.onIOPub).not.toBe(null);
+});
+
+it('handles execute result from IOPub channel', () => {
+  const kernelModel = new KernelModel(fakeSessionContext as any);
+  kernelModel.execute('any code');
+  kernelModel.future.onIOPub({
+header: {
+  // eslint-disable-next-line @typescript-eslint/camelcase
+  msg_type: 'execute_result'
+},
+content: {
+  data: {
+'text/plain':
+  '\'{"pipelineId": {"metadata": {"name": "pipeline", "inMemoryId": 1, 
"type": "pipeline"}, "pcolls": {"pcollId": {"name": "pcoll", "inMemoryId": 2, 
"type": "pcollection"\''
+  },
+  channel: 'iopub'
+}
+  } as any);
+  expect(kernelModel.executeResult).toEqual({
+pipelineId: {
+  metadata: {
+name: 'pipeline',
+inMemoryId: 1,
+type: 'pipeline'
+  },
+  pcolls: {
+pcollId: {
+  name: 'pcoll',
+  inMemoryId: 2,
+  type: 'pcollection'
+}
+  }
+}
+  });
+});
+
+it('handles display data from IOPub channel', () => {
+  const kernelModel = new KernelModel(fakeSessionContext as any);
+  kernelModel.execute('any code');
+  const displayData = {
+// eslint-disable-next-line @typescript-eslint/camelcase
+output_type: 'display_data',
+data: {
+  'text/html': '',
+  'application/javascript': 'console.log(1);'

Review comment:
   It's a valid Javascript statement that logs to the console.
   
   In the test, if it is executed. The result shows up in the `terminal`.
   This unit test doesn't do the execution though because it's just a data 
model not a UI component.

##
File path: 
sdks/python/apache_beam/runners/interactive/extensions/apache-beam-jupyterlab-sidepanel/src/__tests__/kernel/KernelModel.test.ts
##
@@ -0,0 +1,124 @@
+// Licensed under the Apache License, Version 2.0 (the 'License'); you may not
+// use this file except in compliance with the License. You may obtain a copy 
of
+// the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an 'AS IS' BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations 
under
+// the License.
+
+/**
+ * Tests for KernelModel module.
+ *
+ * Non camelcase fields are nbformat fields used in notebooks. Lint is ignored
+ * for them.
+ */
+
+import { KernelModel } from '../../kernel/KernelModel';
+
+const fakeSessionContext = {
+  session: {
+kernel: {
+  requestExecute: function(): object {
+return {
+  onIOPub: function(): void {
+// do nothing
+  }
+};
+  }
+}
+  }
+};
+
+it('creates new future with IOPub callbacks when executing new code in 
kernel', () => {
+  const kernelModel = new KernelModel(fakeSessionContext as any);
+  kernelModel.execute('new code');
+  expect(kernelModel.future).not.toBe(null);
+  expect(kernelModel.future.onIOPub).not.toBe(null);
+});
+
+it('handles execute result from IOPub channel', () => {
+  const kernelModel = new KernelModel(fakeSessionContext as any);
+  kernelModel.execute('any code');
+  kernelModel.future.onIOPub({
+header: {
+  // eslint-disable-next-line @typescript-eslint/camelcase
+  msg_type: 'execute_result'
+},

[GitHub] [beam] TheNeuralBit commented on a change in pull request #12426: [BEAM-7996] Add support for MapType and Nulls in container types for Python RowCoder

2020-07-30 Thread GitBox


TheNeuralBit commented on a change in pull request #12426:
URL: https://github.com/apache/beam/pull/12426#discussion_r463194527



##
File path: 
runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CommonCoderTest.java
##
@@ -366,12 +368,15 @@ private static Object parseField(Object value, 
Schema.FieldType fieldType) {
 .map((element) -> parseField(element, 
fieldType.getCollectionElementType()))
 .collect(toImmutableList());
   case MAP:
-Map kvMap = (Map) value;
-return kvMap.entrySet().stream()
-.collect(
-toImmutableMap(
-(pair) -> parseField(pair.getKey(), 
fieldType.getMapKeyType()),
-(pair) -> parseField(pair.getValue(), 
fieldType.getMapValueType(;
+Map kvMap = new HashMap<>();
+((Map) value)
+.entrySet().stream()
+.forEach(
+(entry) ->
+kvMap.put(
+parseField(entry.getKey(), 
fieldType.getMapKeyType()),
+parseField(entry.getValue(), 
fieldType.getMapValueType(;
+return kvMap;

Review comment:
   This change is necessary because `ImmutableMap` (as well as 
`Collectors.toMap`) does not allow null values, so it errors on the new test 
cases.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] TheNeuralBit opened a new pull request #12426: [BEAM-7996] Add support for MapType and Nulls in container types for Python RowCoder

2020-07-30 Thread GitBox


TheNeuralBit opened a new pull request #12426:
URL: https://github.com/apache/beam/pull/12426


   This PR adds support for MapType in the Python RowCoder, as well as support 
for encoding nulls inside of container types (NullableCoder).
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Dataflow | Flink | Samza | Spark | Twister2
   --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
 | --- | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/)
 | --- | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
 | ---
   Java | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/i
 
con)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build
 Status](htt
 
ps://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/)
   Python | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/)
 | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_
 
Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)[![Build
 

[GitHub] [beam] KevinGG commented on pull request #12372: [BEAM-10545] KernelModel and jest tests

2020-07-30 Thread GitBox


KevinGG commented on pull request #12372:
URL: https://github.com/apache/beam/pull/12372#issuecomment-666586747


   > LGTM. I do not believe I am a good typescript reviewer. It would be good 
if you can ask on dev@ and see if there could be better reviewers for future 
PRs.
   
   Thanks! For typescript review, Jason has agreed to take a look at my future 
PRs. He is the TL of CAIP notebooks and works on other extensions: 
https://github.com/GoogleCloudPlatform/jupyter-extensions.
   I'll still need your and other committer's feedback as how the code fits in 
the Beam repo and how others can contribute to it.
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] KevinGG commented on pull request #12372: [BEAM-10545] KernelModel and jest tests

2020-07-30 Thread GitBox


KevinGG commented on pull request #12372:
URL: https://github.com/apache/beam/pull/12372#issuecomment-666584845


   > Retracting my LGTM, I have some questions.
   > 
   > And a high level question, what is the purpose of this module? (Could you 
add more details to what you wrote as part of the PR description?)
   
   The purpose of this module is to allow the extension to execute code in the 
kernel silently to retrieve information of current interactive environment, 
then handles IOPub messages sent back from the kernel and allow other 
components to use this module as a data model.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] KevinGG commented on a change in pull request #12372: [BEAM-10545] KernelModel and jest tests

2020-07-30 Thread GitBox


KevinGG commented on a change in pull request #12372:
URL: https://github.com/apache/beam/pull/12372#discussion_r463182344



##
File path: 
sdks/python/apache_beam/runners/interactive/extensions/apache-beam-jupyterlab-sidepanel/src/__tests__/kernel/KernelModel.test.ts
##
@@ -0,0 +1,124 @@
+// Licensed under the Apache License, Version 2.0 (the 'License'); you may not
+// use this file except in compliance with the License. You may obtain a copy 
of
+// the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an 'AS IS' BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations 
under
+// the License.
+
+/**
+ * Tests for KernelModel module.
+ *
+ * Non camelcase fields are nbformat fields used in notebooks. Lint is ignored
+ * for them.
+ */
+
+import { KernelModel } from '../../kernel/KernelModel';
+
+const fakeSessionContext = {
+  session: {
+kernel: {
+  requestExecute: function(): object {
+return {
+  onIOPub: function(): void {
+// do nothing
+  }
+};
+  }
+}
+  }
+};
+
+it('creates new future with IOPub callbacks when executing new code in 
kernel', () => {
+  const kernelModel = new KernelModel(fakeSessionContext as any);
+  kernelModel.execute('new code');
+  expect(kernelModel.future).not.toBe(null);
+  expect(kernelModel.future.onIOPub).not.toBe(null);
+});
+
+it('handles execute result from IOPub channel', () => {
+  const kernelModel = new KernelModel(fakeSessionContext as any);
+  kernelModel.execute('any code');
+  kernelModel.future.onIOPub({
+header: {
+  // eslint-disable-next-line @typescript-eslint/camelcase
+  msg_type: 'execute_result'
+},
+content: {
+  data: {
+'text/plain':
+  '\'{"pipelineId": {"metadata": {"name": "pipeline", "inMemoryId": 1, 
"type": "pipeline"}, "pcolls": {"pcollId": {"name": "pcoll", "inMemoryId": 2, 
"type": "pcollection"\''
+  },
+  channel: 'iopub'
+}
+  } as any);
+  expect(kernelModel.executeResult).toEqual({
+pipelineId: {
+  metadata: {
+name: 'pipeline',
+inMemoryId: 1,
+type: 'pipeline'
+  },
+  pcolls: {
+pcollId: {
+  name: 'pcoll',
+  inMemoryId: 2,
+  type: 'pcollection'
+}
+  }
+}
+  });
+});
+
+it('handles display data from IOPub channel', () => {
+  const kernelModel = new KernelModel(fakeSessionContext as any);
+  kernelModel.execute('any code');
+  const displayData = {
+// eslint-disable-next-line @typescript-eslint/camelcase
+output_type: 'display_data',
+data: {
+  'text/html': '',
+  'application/javascript': 'console.log(1);'

Review comment:
   It's a valid Javascript statement that logs to the console.
   
   In the test, it is executed. The result shows up in the `terminal`.
   This unit test doesn't do the execution though because it's just a data 
model not a UI component.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] KevinGG commented on a change in pull request #12372: [BEAM-10545] KernelModel and jest tests

2020-07-30 Thread GitBox


KevinGG commented on a change in pull request #12372:
URL: https://github.com/apache/beam/pull/12372#discussion_r463188165



##
File path: 
sdks/python/apache_beam/runners/interactive/extensions/apache-beam-jupyterlab-sidepanel/src/__tests__/kernel/KernelModel.test.ts
##
@@ -0,0 +1,124 @@
+// Licensed under the Apache License, Version 2.0 (the 'License'); you may not
+// use this file except in compliance with the License. You may obtain a copy 
of
+// the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an 'AS IS' BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations 
under
+// the License.
+
+/**
+ * Tests for KernelModel module.
+ *
+ * Non camelcase fields are nbformat fields used in notebooks. Lint is ignored
+ * for them.
+ */
+
+import { KernelModel } from '../../kernel/KernelModel';
+
+const fakeSessionContext = {
+  session: {
+kernel: {
+  requestExecute: function(): object {
+return {
+  onIOPub: function(): void {
+// do nothing
+  }
+};
+  }
+}
+  }
+};
+
+it('creates new future with IOPub callbacks when executing new code in 
kernel', () => {
+  const kernelModel = new KernelModel(fakeSessionContext as any);
+  kernelModel.execute('new code');
+  expect(kernelModel.future).not.toBe(null);
+  expect(kernelModel.future.onIOPub).not.toBe(null);
+});
+
+it('handles execute result from IOPub channel', () => {
+  const kernelModel = new KernelModel(fakeSessionContext as any);
+  kernelModel.execute('any code');
+  kernelModel.future.onIOPub({
+header: {
+  // eslint-disable-next-line @typescript-eslint/camelcase
+  msg_type: 'execute_result'
+},
+content: {
+  data: {
+'text/plain':
+  '\'{"pipelineId": {"metadata": {"name": "pipeline", "inMemoryId": 1, 
"type": "pipeline"}, "pcolls": {"pcollId": {"name": "pcoll", "inMemoryId": 2, 
"type": "pcollection"\''
+  },
+  channel: 'iopub'
+}
+  } as any);
+  expect(kernelModel.executeResult).toEqual({
+pipelineId: {
+  metadata: {
+name: 'pipeline',
+inMemoryId: 1,
+type: 'pipeline'
+  },
+  pcolls: {
+pcollId: {
+  name: 'pcoll',
+  inMemoryId: 2,
+  type: 'pcollection'
+}
+  }
+}
+  });
+});
+
+it('handles display data from IOPub channel', () => {
+  const kernelModel = new KernelModel(fakeSessionContext as any);
+  kernelModel.execute('any code');
+  const displayData = {
+// eslint-disable-next-line @typescript-eslint/camelcase
+output_type: 'display_data',
+data: {
+  'text/html': '',
+  'application/javascript': 'console.log(1);'
+},
+metadata: {
+  some: 'data'
+}
+  };
+
+  kernelModel.future.onIOPub({
+header: {
+  // eslint-disable-next-line @typescript-eslint/camelcase
+  msg_type: 'display_data'
+},
+content: displayData
+  } as any);
+  expect(kernelModel.displayData).toEqual([displayData]);
+});
+
+it('handles display update from IOPub channel', () => {

Review comment:
   Think of `display data` as incrementally displaying contents while 
`display update` as modifying contents already displayed.
   
   An example, 
   - display data is [1, 2 ,3], you'll see 1 2 3 being displayed;
   - display update is [1, 2, 3] for a same display_id, you'll see 1, then it 
becomes 2, then it becomes 3.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] KevinGG commented on a change in pull request #12372: [BEAM-10545] KernelModel and jest tests

2020-07-30 Thread GitBox


KevinGG commented on a change in pull request #12372:
URL: https://github.com/apache/beam/pull/12372#discussion_r463185961



##
File path: 
sdks/python/apache_beam/runners/interactive/extensions/apache-beam-jupyterlab-sidepanel/src/kernel/KernelModel.ts
##
@@ -0,0 +1,161 @@
+// Licensed under the Apache License, Version 2.0 (the 'License'); you may not
+// use this file except in compliance with the License. You may obtain a copy 
of
+// the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an 'AS IS' BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations 
under
+// the License.
+
+/**
+ * The module holds the model that handles messaging between the frontend and
+ * the connected kernel.
+ */
+
+import { ISessionContext } from '@jupyterlab/apputils';
+
+import {
+  IDisplayData,
+  IDisplayUpdate,
+  IExecuteResult
+} from '@jupyterlab/nbformat';
+
+import { Kernel, KernelMessage } from '@jupyterlab/services';
+
+import { ISignal, Signal } from '@lumino/signaling';
+
+import { KernelCode } from '../kernel/KernelCode';
+
+export class KernelModel {
+  constructor(sessionContext: ISessionContext, enableConsoleLog = false) {
+this._sessionContext = sessionContext;
+this._enableConsoleLog = enableConsoleLog;
+  }
+
+  get future(): Kernel.IFuture<
+KernelMessage.IExecuteRequestMsg,
+KernelMessage.IExecuteReplyMsg
+  > | null {
+return this._future;
+  }
+
+  set future(
+value: Kernel.IFuture<
+  KernelMessage.IExecuteRequestMsg,
+  KernelMessage.IExecuteReplyMsg
+> | null
+  ) {
+if (this._future === value) {
+  return;
+}
+
+if (this._future) {
+  this._future.dispose();
+}
+
+this._future = value;
+
+if (!value) {
+  return;
+}
+
+value.onIOPub = this._onIOPub.bind(this);
+  }
+
+  get executeResult(): object {
+if (this._executeResult) {
+  const dataInPlainText = this._executeResult.data['text/plain'] as string;
+  if (dataInPlainText) {
+try {
+  // The slice removes trailing single quotes from the nbformat output.
+  // The replace removes literal backslashes from the nbformat output.
+  const dataInJsonString = dataInPlainText
+.slice(1, -1)
+.replace(/\\'/g, "'");
+  return JSON.parse(dataInJsonString);
+} catch (e) {
+  console.error(e);
+  return {};
+}
+  }
+}
+return {};
+  }
+
+  get displayData(): Array {
+return this._displayData;
+  }
+
+  get displayUpdate(): Array {
+return this._displayUpdate;
+  }
+
+  get stateChanged(): ISignal {
+return this._stateChanged;
+  }
+
+  execute(code: string, expectReply = true): void {
+// Dispose the kernel future so that no more IOPub will be handled.
+if (this.future) {
+  this.future.dispose();
+  this.future = null;
+}
+// Clear the outputs from previous kernel executions.
+this._executeResult = null;
+this._displayData.length = 0;
+this._displayUpdate.length = 0;
+if (!this._sessionContext || !this._sessionContext.session?.kernel) {
+  return;
+}
+this.future = this._sessionContext.session?.kernel?.requestExecute({
+  code: KernelCode.COMMON_KERNEL_IMPORTS + code,
+  silent: !expectReply,

Review comment:
   The problem with `silent` is that it's confusing with the existence of 
`store_history`.
   
   Intuitively, if `silent`, the kernel execution should not store history. 
However, it's not that case.
   
   `silent` only means if the execution should return a result. The history is 
stored. The side effects are obvious.
   
   So you will see execution_count increased even when you set `silent` to true.
   
   Facading it with `expectReply` makes it clear when using the model.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] KevinGG commented on a change in pull request #12372: [BEAM-10545] KernelModel and jest tests

2020-07-30 Thread GitBox


KevinGG commented on a change in pull request #12372:
URL: https://github.com/apache/beam/pull/12372#discussion_r463183798



##
File path: 
sdks/python/apache_beam/runners/interactive/extensions/apache-beam-jupyterlab-sidepanel/src/kernel/KernelModel.ts
##
@@ -0,0 +1,161 @@
+// Licensed under the Apache License, Version 2.0 (the 'License'); you may not
+// use this file except in compliance with the License. You may obtain a copy 
of
+// the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an 'AS IS' BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations 
under
+// the License.
+
+/**
+ * The module holds the model that handles messaging between the frontend and
+ * the connected kernel.
+ */
+
+import { ISessionContext } from '@jupyterlab/apputils';
+
+import {
+  IDisplayData,
+  IDisplayUpdate,
+  IExecuteResult
+} from '@jupyterlab/nbformat';
+
+import { Kernel, KernelMessage } from '@jupyterlab/services';
+
+import { ISignal, Signal } from '@lumino/signaling';
+
+import { KernelCode } from '../kernel/KernelCode';
+
+export class KernelModel {
+  constructor(sessionContext: ISessionContext, enableConsoleLog = false) {
+this._sessionContext = sessionContext;
+this._enableConsoleLog = enableConsoleLog;
+  }
+
+  get future(): Kernel.IFuture<
+KernelMessage.IExecuteRequestMsg,
+KernelMessage.IExecuteReplyMsg
+  > | null {
+return this._future;
+  }
+
+  set future(
+value: Kernel.IFuture<
+  KernelMessage.IExecuteRequestMsg,
+  KernelMessage.IExecuteReplyMsg
+> | null
+  ) {
+if (this._future === value) {
+  return;
+}
+
+if (this._future) {
+  this._future.dispose();
+}
+
+this._future = value;
+
+if (!value) {
+  return;
+}
+
+value.onIOPub = this._onIOPub.bind(this);
+  }
+
+  get executeResult(): object {
+if (this._executeResult) {
+  const dataInPlainText = this._executeResult.data['text/plain'] as string;
+  if (dataInPlainText) {
+try {
+  // The slice removes trailing single quotes from the nbformat output.
+  // The replace removes literal backslashes from the nbformat output.

Review comment:
   It's removing characters in nbformat texts.
   
   Kernel generates json.
   When messaged to frontend by Jupyter in nbformat, the text becomes 
'json_with_backslashes'.
   
   This removes the "'" from both ends of the string.
   Then removes all the backlashes.
   
   The result text will be a valid json that is parsable.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] davidcavazos commented on pull request #12118: [BEAM-7705] Add BigQuery Java samples

2020-07-30 Thread GitBox


davidcavazos commented on pull request #12118:
URL: https://github.com/apache/beam/pull/12118#issuecomment-666576290


   All tests are passing, @pabloem can you take a look whenever you have a 
chance?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] lostluck commented on pull request #12425: [BEAM-10610][BEAM-9982] Support Loopback mode with universal runners in the Go SDK.

2020-07-30 Thread GitBox


lostluck commented on pull request #12425:
URL: https://github.com/apache/beam/pull/12425#issuecomment-666575819


   R: @youngoli 
   cc: @robertwb @damondouglas @codeBehindMe  
   
   The others are cc'd as this has value for setting up Python Portable Runner 
usage, for local testing of changes against a correct runner.
   Set up usage of Beam Python per 
https://cwiki.apache.org/confluence/display/BEAM/Python+Tips, and in a 
virtualenv navigate to your repo's `beam/sdks/python`. You state the portable 
runner with `python -m apache_beam.runners.portability.local_job_service_main 
--port=8099` which allows connections from the Go pipeline at 
`--endpoint=localhost:8099`



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] lostluck edited a comment on pull request #12425: [BEAM-10610][BEAM-9982] Support Loopback mode with universal runners in the Go SDK.

2020-07-30 Thread GitBox


lostluck edited a comment on pull request #12425:
URL: https://github.com/apache/beam/pull/12425#issuecomment-666575819


   R: @youngoli 
   cc: @robertwb @damondouglas @codeBehindMe  
   
   The others are cc'd as this has value for setting up Python Portable Runner 
usage, for local testing of changes against a correct runner.
   Set up usage of Beam Python per 
https://cwiki.apache.org/confluence/display/BEAM/Python+Tips, and in a 
virtualenv navigate to your repo's `beam/sdks/python`. You starte the portable 
runner with `python -m apache_beam.runners.portability.local_job_service_main 
--port=8099` which allows connections from the Go pipeline at 
`--endpoint=localhost:8099`



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] KevinGG commented on a change in pull request #12372: [BEAM-10545] KernelModel and jest tests

2020-07-30 Thread GitBox


KevinGG commented on a change in pull request #12372:
URL: https://github.com/apache/beam/pull/12372#discussion_r463182344



##
File path: 
sdks/python/apache_beam/runners/interactive/extensions/apache-beam-jupyterlab-sidepanel/src/__tests__/kernel/KernelModel.test.ts
##
@@ -0,0 +1,124 @@
+// Licensed under the Apache License, Version 2.0 (the 'License'); you may not
+// use this file except in compliance with the License. You may obtain a copy 
of
+// the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an 'AS IS' BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations 
under
+// the License.
+
+/**
+ * Tests for KernelModel module.
+ *
+ * Non camelcase fields are nbformat fields used in notebooks. Lint is ignored
+ * for them.
+ */
+
+import { KernelModel } from '../../kernel/KernelModel';
+
+const fakeSessionContext = {
+  session: {
+kernel: {
+  requestExecute: function(): object {
+return {
+  onIOPub: function(): void {
+// do nothing
+  }
+};
+  }
+}
+  }
+};
+
+it('creates new future with IOPub callbacks when executing new code in 
kernel', () => {
+  const kernelModel = new KernelModel(fakeSessionContext as any);
+  kernelModel.execute('new code');
+  expect(kernelModel.future).not.toBe(null);
+  expect(kernelModel.future.onIOPub).not.toBe(null);
+});
+
+it('handles execute result from IOPub channel', () => {
+  const kernelModel = new KernelModel(fakeSessionContext as any);
+  kernelModel.execute('any code');
+  kernelModel.future.onIOPub({
+header: {
+  // eslint-disable-next-line @typescript-eslint/camelcase
+  msg_type: 'execute_result'
+},
+content: {
+  data: {
+'text/plain':
+  '\'{"pipelineId": {"metadata": {"name": "pipeline", "inMemoryId": 1, 
"type": "pipeline"}, "pcolls": {"pcollId": {"name": "pcoll", "inMemoryId": 2, 
"type": "pcollection"\''
+  },
+  channel: 'iopub'
+}
+  } as any);
+  expect(kernelModel.executeResult).toEqual({
+pipelineId: {
+  metadata: {
+name: 'pipeline',
+inMemoryId: 1,
+type: 'pipeline'
+  },
+  pcolls: {
+pcollId: {
+  name: 'pcoll',
+  inMemoryId: 2,
+  type: 'pcollection'
+}
+  }
+}
+  });
+});
+
+it('handles display data from IOPub channel', () => {
+  const kernelModel = new KernelModel(fakeSessionContext as any);
+  kernelModel.execute('any code');
+  const displayData = {
+// eslint-disable-next-line @typescript-eslint/camelcase
+output_type: 'display_data',
+data: {
+  'text/html': '',
+  'application/javascript': 'console.log(1);'

Review comment:
   It's a valid Javascript statement that logs to the console.
   
   In the test, it is executed. The result shows up in the `terminal`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] kennknowles commented on pull request #12416: Update google-api-services versions.

2020-07-30 Thread GitBox


kennknowles commented on pull request #12416:
URL: https://github.com/apache/beam/pull/12416#issuecomment-666574292


   But it also probably isn't a flake, since it is a pretty simple unit test. 
Most likely the assumptions of the mock were overconstrained so changes in the 
Pubsub client library can break it.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




  1   2   3   >