[GitHub] [beam] pabloem commented on a change in pull request #11702: [BEAM-9990] Add Conditional Update and Conditional Create to FhirIO

2020-06-10 Thread GitBox


pabloem commented on a change in pull request #11702:
URL: https://github.com/apache/beam/pull/11702#discussion_r438404800



##
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java
##
@@ -1173,4 +1276,339 @@ public void executeBundles(ProcessContext context) {
   }
 }
   }
+
+  /**
+   * Create resources fhir io . create resources.
+   *
+   * @param  the type parameter
+   * @param fhirStore the fhir store
+   * @return the fhir io . create resources
+   */
+  public static  FhirIO.CreateResources 
createResources(ValueProvider fhirStore) {
+return new CreateResources(fhirStore);
+  }
+
+  /**
+   * Create resources fhir io . create resources.
+   *
+   * @param  the type parameter
+   * @param fhirStore the fhir store
+   * @return the fhir io . create resources
+   */
+  public static  FhirIO.CreateResources createResources(String 
fhirStore) {
+return new CreateResources(fhirStore);
+  }
+  /**
+   * {@link PTransform} for Creating FHIR resources.
+   *
+   * 
https://cloud.google.com/healthcare/docs/reference/rest/v1beta1/projects.locations.datasets.fhirStores.fhir/create
+   */
+  public static class CreateResources extends PTransform, 
Write.Result> {
+private final String fhirStore;
+private SerializableFunction ifNoneExistFunction;
+private SerializableFunction formatBodyFunction;
+private SerializableFunction typeFunction;
+private static final Logger LOG = 
LoggerFactory.getLogger(CreateResources.class);
+
+/**
+ * Instantiates a new Create resources transform.
+ *
+ * @param fhirStore the fhir store
+ */
+CreateResources(ValueProvider fhirStore) {
+  this.fhirStore = fhirStore.get();
+}
+
+/**
+ * Instantiates a new Create resources.
+ *
+ * @param fhirStore the fhir store
+ */
+CreateResources(String fhirStore) {
+  this.fhirStore = fhirStore;
+}
+
+/**
+ * This adds a {@link SerializableFunction} that reads an resource string 
and extracts an
+ * If-None-Exists query for conditional create. Typically this will just 
be extracting an ID to
+ * look for.
+ *
+ * 
https://cloud.google.com/healthcare/docs/reference/rest/v1beta1/projects.locations.datasets.fhirStores.fhir/create
+ *
+ * @param ifNoneExistFunction the if none exist function
+ * @return the create resources
+ */
+public CreateResources withIfNotExistFunction(
+SerializableFunction ifNoneExistFunction) {
+  this.ifNoneExistFunction = ifNoneExistFunction;
+  return this;
+}
+
+/**
+ * This adds a {@link SerializableFunction} that reads an resource string 
and extracts an
+ * resource type.
+ *
+ * 
https://cloud.google.com/healthcare/docs/reference/rest/v1beta1/projects.locations.datasets.fhirStores.fhir/create
+ *
+ * @param typeFunction for extracting type from a resource.
+ * @return the create resources
+ */
+public CreateResources withTypeFunction(SerializableFunction 
typeFunction) {
+  this.typeFunction = typeFunction;
+  return this;
+}
+/**
+ * With format body function create resources.
+ *
+ * @param formatBodyFunction the format body function
+ * @return the create resources
+ */
+public CreateResources withFormatBodyFunction(

Review comment:
   Hm - I am not sure. I feel that `Resource` may be better? Since we take 
in any type, and format a resource that gets inserted? But I trust your 
judgement here.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] pabloem commented on a change in pull request #11702: [BEAM-9990] Add Conditional Update and Conditional Create to FhirIO

2020-06-10 Thread GitBox


pabloem commented on a change in pull request #11702:
URL: https://github.com/apache/beam/pull/11702#discussion_r438403098



##
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java
##
@@ -155,17 +168,53 @@
  * FhirIO.Write.Result writeResult =
  * output.apply("Execute FHIR Bundles", 
FhirIO.executeBundles(options.getExistingFhirStore()));
  *
+ * // Alternatively you could use import for high throughput to a new store.
+ * FhirIO.Write.Result writeResult =
+ * output.apply("Import FHIR Resources", 
FhirIO.executeBundles(options.getNewFhirStore()));
+ * // [End Writing ]
+ *
  * PCollection> failedBundles = 
writeResult.getFailedInsertsWithErr();
  *
+ * // [Begin Writing to Dead Letter Queue]
  * failedBundles.apply("Write failed bundles to BigQuery",
  * BigQueryIO
  * .write()
  * .to(option.getBQFhirExecuteBundlesDeadLetterTable())
  * .withFormatFunction(new HealthcareIOErrorToTableRow()));
+ * // [End Writing to Dead Letter Queue]
+ *
+ * // Alternatively you may want to handle DeadLetter with conditional update
+ * // [Begin Reconciliation with Conditional Update]
+ * failedBundles
+ * .apply("Reconcile with Conditional Update",
+ * FhirIO.ConditionalUpdate(fhirStore)
+ * 
.withFormatBodyFunction(HealthcareIOError::getDataResource)
+ * .withTypeFunction((HealthcareIOError err) -> {
+ *   String body = err.getDataResource();
+ *   // TODO(user) insert logic to exctract type.
+ *   return params;

Review comment:
   Sounds good. No need to add!





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] pabloem commented on a change in pull request #11702: [BEAM-9990] Add Conditional Update and Conditional Create to FhirIO

2020-06-09 Thread GitBox


pabloem commented on a change in pull request #11702:
URL: https://github.com/apache/beam/pull/11702#discussion_r437721434



##
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java
##
@@ -155,17 +168,53 @@
  * FhirIO.Write.Result writeResult =
  * output.apply("Execute FHIR Bundles", 
FhirIO.executeBundles(options.getExistingFhirStore()));
  *
+ * // Alternatively you could use import for high throughput to a new store.
+ * FhirIO.Write.Result writeResult =
+ * output.apply("Import FHIR Resources", 
FhirIO.executeBundles(options.getNewFhirStore()));
+ * // [End Writing ]
+ *
  * PCollection> failedBundles = 
writeResult.getFailedInsertsWithErr();
  *
+ * // [Begin Writing to Dead Letter Queue]
  * failedBundles.apply("Write failed bundles to BigQuery",
  * BigQueryIO
  * .write()
  * .to(option.getBQFhirExecuteBundlesDeadLetterTable())
  * .withFormatFunction(new HealthcareIOErrorToTableRow()));
+ * // [End Writing to Dead Letter Queue]
+ *
+ * // Alternatively you may want to handle DeadLetter with conditional update
+ * // [Begin Reconciliation with Conditional Update]
+ * failedBundles
+ * .apply("Reconcile with Conditional Update",
+ * FhirIO.ConditionalUpdate(fhirStore)
+ * 
.withFormatBodyFunction(HealthcareIOError::getDataResource)
+ * .withTypeFunction((HealthcareIOError err) -> {
+ *   String body = err.getDataResource();
+ *   // TODO(user) insert logic to exctract type.
+ *   return params;
+ * })
+ * .withSearchParametersFunction((HealthcareIOError err) 
-> {

Review comment:
   I think that makes total sense. Thanks!





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] pabloem commented on a change in pull request #11702: [BEAM-9990] Add Conditional Update and Conditional Create to FhirIO

2020-06-05 Thread GitBox


pabloem commented on a change in pull request #11702:
URL: https://github.com/apache/beam/pull/11702#discussion_r436210077



##
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java
##
@@ -1173,4 +1276,339 @@ public void executeBundles(ProcessContext context) {
   }
 }
   }
+
+  /**
+   * Create resources fhir io . create resources.
+   *
+   * @param  the type parameter
+   * @param fhirStore the fhir store
+   * @return the fhir io . create resources
+   */
+  public static  FhirIO.CreateResources 
createResources(ValueProvider fhirStore) {
+return new CreateResources(fhirStore);
+  }
+
+  /**
+   * Create resources fhir io . create resources.
+   *
+   * @param  the type parameter
+   * @param fhirStore the fhir store
+   * @return the fhir io . create resources
+   */
+  public static  FhirIO.CreateResources createResources(String 
fhirStore) {
+return new CreateResources(fhirStore);
+  }
+  /**
+   * {@link PTransform} for Creating FHIR resources.
+   *
+   * 
https://cloud.google.com/healthcare/docs/reference/rest/v1beta1/projects.locations.datasets.fhirStores.fhir/create
+   */
+  public static class CreateResources extends PTransform, 
Write.Result> {
+private final String fhirStore;
+private SerializableFunction ifNoneExistFunction;
+private SerializableFunction formatBodyFunction;
+private SerializableFunction typeFunction;
+private static final Logger LOG = 
LoggerFactory.getLogger(CreateResources.class);
+
+/**
+ * Instantiates a new Create resources transform.
+ *
+ * @param fhirStore the fhir store
+ */
+CreateResources(ValueProvider fhirStore) {
+  this.fhirStore = fhirStore.get();
+}
+
+/**
+ * Instantiates a new Create resources.
+ *
+ * @param fhirStore the fhir store
+ */
+CreateResources(String fhirStore) {
+  this.fhirStore = fhirStore;
+}
+
+/**
+ * This adds a {@link SerializableFunction} that reads an resource string 
and extracts an
+ * If-None-Exists query for conditional create. Typically this will just 
be extracting an ID to
+ * look for.
+ *
+ * 
https://cloud.google.com/healthcare/docs/reference/rest/v1beta1/projects.locations.datasets.fhirStores.fhir/create
+ *
+ * @param ifNoneExistFunction the if none exist function
+ * @return the create resources
+ */
+public CreateResources withIfNotExistFunction(
+SerializableFunction ifNoneExistFunction) {
+  this.ifNoneExistFunction = ifNoneExistFunction;
+  return this;
+}
+
+/**
+ * This adds a {@link SerializableFunction} that reads an resource string 
and extracts an
+ * resource type.
+ *
+ * 
https://cloud.google.com/healthcare/docs/reference/rest/v1beta1/projects.locations.datasets.fhirStores.fhir/create
+ *
+ * @param typeFunction for extracting type from a resource.
+ * @return the create resources
+ */
+public CreateResources withTypeFunction(SerializableFunction 
typeFunction) {
+  this.typeFunction = typeFunction;
+  return this;
+}
+/**
+ * With format body function create resources.
+ *
+ * @param formatBodyFunction the format body function
+ * @return the create resources
+ */
+public CreateResources withFormatBodyFunction(

Review comment:
   I don't think I understand this function very well. It seems like a fn 
to format a resource properly in case its formatting is not correct? Could you 
detail the documentation for it?

##
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java
##
@@ -155,17 +168,53 @@
  * FhirIO.Write.Result writeResult =
  * output.apply("Execute FHIR Bundles", 
FhirIO.executeBundles(options.getExistingFhirStore()));
  *
+ * // Alternatively you could use import for high throughput to a new store.
+ * FhirIO.Write.Result writeResult =
+ * output.apply("Import FHIR Resources", 
FhirIO.executeBundles(options.getNewFhirStore()));
+ * // [End Writing ]
+ *
  * PCollection> failedBundles = 
writeResult.getFailedInsertsWithErr();
  *
+ * // [Begin Writing to Dead Letter Queue]
  * failedBundles.apply("Write failed bundles to BigQuery",
  * BigQueryIO
  * .write()
  * .to(option.getBQFhirExecuteBundlesDeadLetterTable())
  * .withFormatFunction(new HealthcareIOErrorToTableRow()));
+ * // [End Writing to Dead Letter Queue]
+ *
+ * // Alternatively you may want to handle DeadLetter with conditional update
+ * // [Begin Reconciliation with Conditional Update]
+ * failedBundles
+ * .apply("Reconcile with Conditional Update",
+ * FhirIO.ConditionalUpdate(fhirStore)
+ * 
.withFormatBodyFunction(HealthcareIOError::getDataResource)
+ * .withTypeFunction((HealthcareIOError err) -> {
+ *   String body = err.getDataResource();
+ *