[jira] [Work logged] (BEAM-3925) Allow ValueProvider for KafkaIO

2018-10-10 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3925?focusedWorklogId=153255&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-153255
 ]

ASF GitHub Bot logged work on BEAM-3925:


Author: ASF GitHub Bot
Created on: 10/Oct/18 19:52
Start Date: 10/Oct/18 19:52
Worklog Time Spent: 10m 
  Work Description: rangadi opened a new pull request #6636: [BEAM-3925] 
[DO NOT MERGE] KafkaIO : Value provider support for reader configuration. 
URL: https://github.com/apache/beam/pull/6636
 
 
   ValueProvider support for KafkaIO reader configuration (input topics & 
bootstrap servers).
   [NOTE: This is not meant for merge into master yet]
   
   This enables users to write Dataflow templates using KafkaIO.
   
   The main funcational changes is that partition information for topics is 
fetched at runtime by each of the source splits just before reading. This is 
required since Kafka topic names and cluster configuration may not be available 
at graph construction time.
   
   This implies that we cannot adjust number of splits based on number of Kafka 
partitions. In order to get around this limitation this lets users explicitly 
specify number of splits (`withNumSplits()`). We can handle this better if we 
can let some splits not to have any partitions assigned at runtime. We need to 
something like that to be able to merge this into master. 
   
   For now, this PR will serve as good starting point for developing templates 
with KafkaIO.
   [cc: @azurezyq, @pupamanyu] 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 153255)
Time Spent: 6h 40m  (was: 6.5h)

> Allow ValueProvider for KafkaIO
> ---
>
> Key: BEAM-3925
> URL: https://issues.apache.org/jira/browse/BEAM-3925
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core
>Reporter: Sameer Abhyankar
>Assignee: Pramod Upamanyu
>Priority: Major
>  Time Spent: 6h 40m
>  Remaining Estimate: 0h
>
> Add ValueProvider support for the various methods in KafkaIO. This would 
> allow us to use KafkaIO in reusable pipeline templates.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3925) Allow ValueProvider for KafkaIO

2018-09-03 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3925?focusedWorklogId=140685&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-140685
 ]

ASF GitHub Bot logged work on BEAM-3925:


Author: ASF GitHub Bot
Created on: 03/Sep/18 18:13
Start Date: 03/Sep/18 18:13
Worklog Time Spent: 10m 
  Work Description: stale[bot] commented on issue #5141: [BEAM-3925] Allow 
ValueProvider for KafkaIO so that we can create Beam Templates using KafkaIO
URL: https://github.com/apache/beam/pull/5141#issuecomment-418172037
 
 
   This pull request has been closed due to lack of activity. If you think that 
is incorrect, or the pull request requires review, you can revive the PR at any 
time.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 140685)
Time Spent: 6h 20m  (was: 6h 10m)

> Allow ValueProvider for KafkaIO
> ---
>
> Key: BEAM-3925
> URL: https://issues.apache.org/jira/browse/BEAM-3925
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core
>Reporter: Sameer Abhyankar
>Assignee: Pramod Upamanyu
>Priority: Major
>  Time Spent: 6h 20m
>  Remaining Estimate: 0h
>
> Add ValueProvider support for the various methods in KafkaIO. This would 
> allow us to use KafkaIO in reusable pipeline templates.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3925) Allow ValueProvider for KafkaIO

2018-08-27 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3925?focusedWorklogId=138477&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-138477
 ]

ASF GitHub Bot logged work on BEAM-3925:


Author: ASF GitHub Bot
Created on: 27/Aug/18 17:21
Start Date: 27/Aug/18 17:21
Worklog Time Spent: 10m 
  Work Description: stale[bot] commented on issue #5141: [BEAM-3925] Allow 
ValueProvider for KafkaIO so that we can create Beam Templates using KafkaIO
URL: https://github.com/apache/beam/pull/5141#issuecomment-416300423
 
 
   This pull request has been marked as stale due to 60 days of inactivity. It 
will be closed in 1 week if no further activity occurs. If you think that’s 
incorrect or this pull request requires a review, please simply write any 
comment. If closed, you can revive the PR at any time and @mention a reviewer 
or discuss it on the d...@beam.apache.org list. Thank you for your 
contributions.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 138477)
Time Spent: 6h 10m  (was: 6h)

> Allow ValueProvider for KafkaIO
> ---
>
> Key: BEAM-3925
> URL: https://issues.apache.org/jira/browse/BEAM-3925
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core
>Reporter: Sameer Abhyankar
>Assignee: Pramod Upamanyu
>Priority: Major
>  Time Spent: 6h 10m
>  Remaining Estimate: 0h
>
> Add ValueProvider support for the various methods in KafkaIO. This would 
> allow us to use KafkaIO in reusable pipeline templates.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3925) Allow ValueProvider for KafkaIO

2018-06-28 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3925?focusedWorklogId=116896&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-116896
 ]

ASF GitHub Bot logged work on BEAM-3925:


Author: ASF GitHub Bot
Created on: 28/Jun/18 16:46
Start Date: 28/Jun/18 16:46
Worklog Time Spent: 10m 
  Work Description: rangadi commented on issue #5141: [BEAM-3925] Allow 
ValueProvider for KafkaIO so that we can create Beam Templates using KafkaIO
URL: https://github.com/apache/beam/pull/5141#issuecomment-401100080
 
 
   I spoke with the author @pupamanyu in April and discussed about the 
approach. I think this PR can be closed in favor of a fresh PR in the future.
   @pupamanyu what do you think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 116896)
Time Spent: 6h  (was: 5h 50m)

> Allow ValueProvider for KafkaIO
> ---
>
> Key: BEAM-3925
> URL: https://issues.apache.org/jira/browse/BEAM-3925
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core
>Reporter: Sameer Abhyankar
>Assignee: Pramod Upamanyu
>Priority: Major
>  Time Spent: 6h
>  Remaining Estimate: 0h
>
> Add ValueProvider support for the various methods in KafkaIO. This would 
> allow us to use KafkaIO in reusable pipeline templates.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3925) Allow ValueProvider for KafkaIO

2018-06-28 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3925?focusedWorklogId=116876&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-116876
 ]

ASF GitHub Bot logged work on BEAM-3925:


Author: ASF GitHub Bot
Created on: 28/Jun/18 16:25
Start Date: 28/Jun/18 16:25
Worklog Time Spent: 10m 
  Work Description: kennknowles commented on issue #5141: [BEAM-3925] Allow 
ValueProvider for KafkaIO so that we can create Beam Templates using KafkaIO
URL: https://github.com/apache/beam/pull/5141#issuecomment-401093503
 
 
   We have turned on autoformatting of the codebase, which causes small 
conflicts across the board. You can probably safely rebase and just keep your 
changes. Like this:
   
   ```
   $ git rebase
   ... see some conflicts
   $ git diff
   ... confirmed that the conflicts are just autoformatting
   ... so we can just keep our changes are do our own autoformat
   $ git checkout --theirs --
   $ git add -u
   $ git rebase --continue
   $ ./gradlew spotlessJavaApply
   ```
   
   Please ping me if you run into any difficulty. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 116876)
Time Spent: 5h 50m  (was: 5h 40m)

> Allow ValueProvider for KafkaIO
> ---
>
> Key: BEAM-3925
> URL: https://issues.apache.org/jira/browse/BEAM-3925
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core
>Reporter: Sameer Abhyankar
>Assignee: Pramod Upamanyu
>Priority: Major
>  Time Spent: 5h 50m
>  Remaining Estimate: 0h
>
> Add ValueProvider support for the various methods in KafkaIO. This would 
> allow us to use KafkaIO in reusable pipeline templates.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3925) Allow ValueProvider for KafkaIO

2018-04-26 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3925?focusedWorklogId=95798&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-95798
 ]

ASF GitHub Bot logged work on BEAM-3925:


Author: ASF GitHub Bot
Created on: 27/Apr/18 00:14
Start Date: 27/Apr/18 00:14
Worklog Time Spent: 10m 
  Work Description: pupamanyu commented on issue #5141: [BEAM-3925] Allow 
ValueProvider for KafkaIO so that we can create Beam Templates using KafkaIO
URL: https://github.com/apache/beam/pull/5141#issuecomment-384825979
 
 
   I think it is good idea to try this out. I was planning to do this once the
   review is completed. With no significant changes pending, I will try the
   template.
   
   On Thu, Apr 26, 2018 at 3:09 PM, Raghu Angadi 
   wrote:
   
   > Let's step back a bit and see how a KafkaIO source would work in a
   > template:
   > AFAIK, the driver part of launching a Dataflow templated pipeline runs
   > only once while installing the template. Each run of a template executes a
   > serialized job that was stored and does not run the driver that builds the
   > graph of computations.
   >
   > This does not work well for Beam unbounded sources. E.g. split()1
   > 

   > on an unbounded source like KafkaIO is invoked on the driver, this also
   > decides parallelism. In KafkaIO, how many splits should we return? One
   > option is to just return a fixed number like 100 during template
   > preparation, and distribute the actual partitions among these 100 at
   > template runtime.
   >
   > In order to iron out such issues early, it might be a good idea to try a
   > template before finalizing this PR. WDYT?
   >
   > —
   > You are receiving this because you authored the thread.
   > Reply to this email directly, view it on GitHub
   > , or mute
   > the thread
   > 

   > .
   >
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 95798)
Time Spent: 5h 40m  (was: 5.5h)

> Allow ValueProvider for KafkaIO
> ---
>
> Key: BEAM-3925
> URL: https://issues.apache.org/jira/browse/BEAM-3925
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core
>Reporter: Sameer Abhyankar
>Assignee: Pramod Upamanyu
>Priority: Major
>  Time Spent: 5h 40m
>  Remaining Estimate: 0h
>
> Add ValueProvider support for the various methods in KafkaIO. This would 
> allow us to use KafkaIO in reusable pipeline templates.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3925) Allow ValueProvider for KafkaIO

2018-04-26 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3925?focusedWorklogId=95749&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-95749
 ]

ASF GitHub Bot logged work on BEAM-3925:


Author: ASF GitHub Bot
Created on: 26/Apr/18 22:08
Start Date: 26/Apr/18 22:08
Worklog Time Spent: 10m 
  Work Description: rangadi commented on issue #5141: [BEAM-3925] Allow 
ValueProvider for KafkaIO so that we can create Beam Templates using KafkaIO
URL: https://github.com/apache/beam/pull/5141#issuecomment-384804212
 
 
   Let's step back a bit and see how a KafkaIO source would work in a template: 
   AFAIK,  the driver part of launching a Dataflow templated pipeline runs only 
once while installing the template. Each run of a template executes a 
serialized job that was stored and does not run the driver that builds the 
graph of computations. 
   
   This does not work well for Beam unbounded sources. E.g. `split()`[1] on an 
unbounded source like KafkaIO is invoked on the driver, this also decides 
parallelism. In KafkaIO, how many splits should we return? One option is to 
just return a fixed number like 100 during template preparation, and distribute 
the actual partitions among these 100 at template runtime. 
   
   In order to iron out such issues early, it might be a good idea to try a 
template before finalizing this PR. WDYT?
   
   [1]: 
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java#L68


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 95749)
Time Spent: 5.5h  (was: 5h 20m)

> Allow ValueProvider for KafkaIO
> ---
>
> Key: BEAM-3925
> URL: https://issues.apache.org/jira/browse/BEAM-3925
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core
>Reporter: Sameer Abhyankar
>Assignee: Pramod Upamanyu
>Priority: Major
>  Time Spent: 5.5h
>  Remaining Estimate: 0h
>
> Add ValueProvider support for the various methods in KafkaIO. This would 
> allow us to use KafkaIO in reusable pipeline templates.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3925) Allow ValueProvider for KafkaIO

2018-04-26 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3925?focusedWorklogId=95674&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-95674
 ]

ASF GitHub Bot logged work on BEAM-3925:


Author: ASF GitHub Bot
Created on: 26/Apr/18 17:45
Start Date: 26/Apr/18 17:45
Worklog Time Spent: 10m 
  Work Description: pupamanyu commented on issue #5141: [BEAM-3925] Allow 
ValueProvider for KafkaIO so that we can create Beam Templates using KafkaIO
URL: https://github.com/apache/beam/pull/5141#issuecomment-384728529
 
 
   If subscribing to Kafka topic, then Kafka handles partition assignment and
   rebalancing in case of failures(inside consumer group) automatically. But
   if certain process requires a consumer to subscribe to list of topic
   partitions for a specific situation as described in Manual Assignment of
   Kafka Documentation, then the option of TopicPartition helps. Templates are
   staged beam pipelines so that users can execute the pipeline without coding
   but still provide a flexibility of setting some parameters(only the ones
   that are exposed) for beam without any modification to the code. I hope
   this answers your first question.
   
   
I can provide more insight into why TopicPartitions options helps provide
   more flexibility to the Beam Templates. Kafka Client can read from a
   specific list of partitions or from the topic as a whole. Providing the
   List of TopicPartitions as a CSV in the format :, :, :...
   
   This provides more flexibility. The Kafka documentation provides more
   details on how Manual Assignment of Topics are useful for consumers.
   
   
https://kafka.apache.org/0110/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
   
   Snippet from the documentation below:
   
   However, in some cases you may need finer control over the specific
   partitions that are assigned. For example:
   
   
  - If the process is maintaining some kind of local state associated with
  that partition (like a local on-disk key-value store), then it should only
  get records for the partition it is maintaining on disk.
  - If the process itself is highly available and will be restarted if it
  fails (perhaps using a cluster management framework like YARN, Mesos, or
  AWS facilities, or as part of a stream processing framework). In this case
  there is no need for Kafka to detect the failure and reassign the 
partition
  since the consuming process will be restarted on another machine.
   
   
   
   
   
   
   
   On Thu, Apr 26, 2018 at 10:08 AM, Raghu Angadi 
   wrote:
   
   > Thanks for the update Pramod. Made a few comments above. I will think a
   > bit more about the over all approach to providing these options. If reader
   > is initialized with topic, the partition list is read at the graph building
   > time. I don't have much experience with templates, but the graph building
   > phase is not executed for a template, right? When is the number of source
   > partitions decided in a template?
   >
   > —
   > You are receiving this because you authored the thread.
   > Reply to this email directly, view it on GitHub
   > , or mute
   > the thread
   > 

   > .
   >
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 95674)
Time Spent: 5h 20m  (was: 5h 10m)

> Allow ValueProvider for KafkaIO
> ---
>
> Key: BEAM-3925
> URL: https://issues.apache.org/jira/browse/BEAM-3925
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core
>Reporter: Sameer Abhyankar
>Assignee: Pramod Upamanyu
>Priority: Major
>  Time Spent: 5h 20m
>  Remaining Estimate: 0h
>
> Add ValueProvider support for the various methods in KafkaIO. This would 
> allow us to use KafkaIO in reusable pipeline templates.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3925) Allow ValueProvider for KafkaIO

2018-04-26 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3925?focusedWorklogId=95654&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-95654
 ]

ASF GitHub Bot logged work on BEAM-3925:


Author: ASF GitHub Bot
Created on: 26/Apr/18 17:26
Start Date: 26/Apr/18 17:26
Worklog Time Spent: 10m 
  Work Description: pupamanyu commented on a change in pull request #5141: 
[BEAM-3925] Allow ValueProvider for KafkaIO so that we can create Beam 
Templates using KafkaIO
URL: https://github.com/apache/beam/pull/5141#discussion_r184469912
 
 

 ##
 File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
 ##
 @@ -360,32 +387,85 @@
  * of how the partitions are distributed among the splits.
  */
 public Read withTopicPartitions(List 
topicPartitions) {
-  checkState(getTopics().isEmpty(), "Only topics or topicPartitions can be 
set, not both");
-  return 
toBuilder().setTopicPartitions(ImmutableList.copyOf(topicPartitions)).build();
+  return 
withTopicPartitions(ValueProvider.StaticValueProvider.of(topicPartitions));
+}
+
+/**
+ * Like above but with a {@link ValueProvider 
ValueProvider>}.
+ *
+ */
+public Read withTopicPartitions(ValueProvider> 
topicPartitions) {
+  checkArgument(
+  getTopics().isAccessible(),
+  "Only topics or topicPartitions can be set, not both");
+  return toBuilder().setTopicPartitions(topicPartitions).build();
 }
 
 /**
- * Sets a Kafka {@link Deserializer} to interpret key bytes read from 
Kafka.
+ * Sets a Kafka {@link Deserializer Deserializer} for 
interpreting key bytes read.
+ * This uses the {@link String} provided to set the Deserializer
+ */
+public Read withKeyDeserializerClassName(String keyDeserializer) {
+  return 
withKeyDeserializerClassName(ValueProvider.StaticValueProvider.of(keyDeserializer));
+}
+
+/**
+ * Like above but with a {@link ValueProvider ValueProvider}.
 
 Review comment:
   May I know What is odd here? 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 95654)
Time Spent: 5h 10m  (was: 5h)

> Allow ValueProvider for KafkaIO
> ---
>
> Key: BEAM-3925
> URL: https://issues.apache.org/jira/browse/BEAM-3925
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core
>Reporter: Sameer Abhyankar
>Assignee: Pramod Upamanyu
>Priority: Major
>  Time Spent: 5h 10m
>  Remaining Estimate: 0h
>
> Add ValueProvider support for the various methods in KafkaIO. This would 
> allow us to use KafkaIO in reusable pipeline templates.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3925) Allow ValueProvider for KafkaIO

2018-04-26 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3925?focusedWorklogId=95653&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-95653
 ]

ASF GitHub Bot logged work on BEAM-3925:


Author: ASF GitHub Bot
Created on: 26/Apr/18 17:25
Start Date: 26/Apr/18 17:25
Worklog Time Spent: 10m 
  Work Description: pupamanyu commented on a change in pull request #5141: 
[BEAM-3925] Allow ValueProvider for KafkaIO so that we can create Beam 
Templates using KafkaIO
URL: https://github.com/apache/beam/pull/5141#discussion_r184469631
 
 

 ##
 File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
 ##
 @@ -360,32 +387,85 @@
  * of how the partitions are distributed among the splits.
  */
 public Read withTopicPartitions(List 
topicPartitions) {
-  checkState(getTopics().isEmpty(), "Only topics or topicPartitions can be 
set, not both");
-  return 
toBuilder().setTopicPartitions(ImmutableList.copyOf(topicPartitions)).build();
+  return 
withTopicPartitions(ValueProvider.StaticValueProvider.of(topicPartitions));
+}
+
+/**
+ * Like above but with a {@link ValueProvider 
ValueProvider>}.
+ *
+ */
+public Read withTopicPartitions(ValueProvider> 
topicPartitions) {
+  checkArgument(
+  getTopics().isAccessible(),
+  "Only topics or topicPartitions can be set, not both");
+  return toBuilder().setTopicPartitions(topicPartitions).build();
 }
 
 /**
- * Sets a Kafka {@link Deserializer} to interpret key bytes read from 
Kafka.
+ * Sets a Kafka {@link Deserializer Deserializer} for 
interpreting key bytes read.
+ * This uses the {@link String} provided to set the Deserializer
+ */
+public Read withKeyDeserializerClassName(String keyDeserializer) {
+  return 
withKeyDeserializerClassName(ValueProvider.StaticValueProvider.of(keyDeserializer));
+}
+
+/**
+ * Like above but with a {@link ValueProvider ValueProvider}.
+ */
+public Read withKeyDeserializerClassName(ValueProvider 
keyDeserializer) {
+  return toBuilder().setKeyDeserializer(ValueProvider
+  .NestedValueProvider.of(keyDeserializer, new 
KeyDeserializerTranslator())).build();
+}
+
+/**
+ * Sets a Kafka {@link Deserializer Deserializer} to interpret 
key bytes read.
  *
  * In addition, Beam also needs a {@link Coder} to serialize and 
deserialize key objects at
  * runtime. KafkaIO tries to infer a coder for the key based on the {@link 
Deserializer} class,
  * however in case that fails, you can use {@link 
#withKeyDeserializerAndCoder(Class, Coder)} to
  * provide the key coder explicitly.
  */
-public Read withKeyDeserializer(Class> 
keyDeserializer) {
-  return toBuilder().setKeyDeserializer(keyDeserializer).build();
+public Read withKeyDeserializerClassName(
 
 Review comment:
   It was requested to be changed as part of the review. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 95653)
Time Spent: 5h  (was: 4h 50m)

> Allow ValueProvider for KafkaIO
> ---
>
> Key: BEAM-3925
> URL: https://issues.apache.org/jira/browse/BEAM-3925
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core
>Reporter: Sameer Abhyankar
>Assignee: Pramod Upamanyu
>Priority: Major
>  Time Spent: 5h
>  Remaining Estimate: 0h
>
> Add ValueProvider support for the various methods in KafkaIO. This would 
> allow us to use KafkaIO in reusable pipeline templates.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3925) Allow ValueProvider for KafkaIO

2018-04-26 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3925?focusedWorklogId=95652&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-95652
 ]

ASF GitHub Bot logged work on BEAM-3925:


Author: ASF GitHub Bot
Created on: 26/Apr/18 17:24
Start Date: 26/Apr/18 17:24
Worklog Time Spent: 10m 
  Work Description: pupamanyu commented on a change in pull request #5141: 
[BEAM-3925] Allow ValueProvider for KafkaIO so that we can create Beam 
Templates using KafkaIO
URL: https://github.com/apache/beam/pull/5141#discussion_r184469522
 
 

 ##
 File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
 ##
 @@ -336,22 +347,38 @@
  * of how the partitions are distributed among the splits.
  */
 public Read withTopic(String topic) {
-  return withTopics(ImmutableList.of(topic));
+  return 
withTopics(commaSeparatedStrings(ValueProvider.StaticValueProvider.of(topic)));
+}
+
+/**
+ * Like above but with a {@link ValueProvider ValueProvider}.
+ */
+public Read withTopic(ValueProvider topic) {
+  return withTopics(commaSeparatedStrings(topic));
 }
 
 /**
- * Sets a list of topics to read from. All the partitions from each
- * of the topics are read.
+ * Sets a list of topics to read from with a {@link List 
List}.
+ * All the partitions from each of the topics are read.
  *
  * See {@link KafkaUnboundedSource#split(int, PipelineOptions)} for 
description
  * of how the partitions are distributed among the splits.
  */
 public Read withTopics(List topics) {
-  checkState(
-  getTopicPartitions().isEmpty(), "Only topics or topicPartitions can 
be set, not both");
-  return toBuilder().setTopics(ImmutableList.copyOf(topics)).build();
+  return withTopics(ValueProvider.StaticValueProvider.of(topics));
 }
 
+/**
+ * Like above but with a {@link ValueProvider 
ValueProvider>}.
+ *
+ */
+public Read withTopics(ValueProvider> topics) {
+  checkArgument(
+  getTopicPartitions().isAccessible(),
 
 Review comment:
   I am new to Guava, I wanted to check if getTopicPartitions() is not provided 
because Topics and TopicPartitions are mutually exclusive options. I need to do 
this validation at graph time if possible. Can you please guide me?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 95652)
Time Spent: 4h 50m  (was: 4h 40m)

> Allow ValueProvider for KafkaIO
> ---
>
> Key: BEAM-3925
> URL: https://issues.apache.org/jira/browse/BEAM-3925
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core
>Reporter: Sameer Abhyankar
>Assignee: Pramod Upamanyu
>Priority: Major
>  Time Spent: 4h 50m
>  Remaining Estimate: 0h
>
> Add ValueProvider support for the various methods in KafkaIO. This would 
> allow us to use KafkaIO in reusable pipeline templates.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3925) Allow ValueProvider for KafkaIO

2018-04-26 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3925?focusedWorklogId=95649&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-95649
 ]

ASF GitHub Bot logged work on BEAM-3925:


Author: ASF GitHub Bot
Created on: 26/Apr/18 17:23
Start Date: 26/Apr/18 17:23
Worklog Time Spent: 10m 
  Work Description: pupamanyu commented on a change in pull request #5141: 
[BEAM-3925] Allow ValueProvider for KafkaIO so that we can create Beam 
Templates using KafkaIO
URL: https://github.com/apache/beam/pull/5141#discussion_r184468967
 
 

 ##
 File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java
 ##
 @@ -168,7 +168,7 @@ static void ensureEOSSupport() {
   .apply("Assign sequential ids", ParDo.of(new Sequencer<>()))
   .apply("Persist ids", GroupByKey.create())
   .apply(
-String.format("Write to Kafka topic '%s'", spec.getTopic()),
+String.format("Write to Kafka topic '%s'", spec.getTopic().get()),
 
 Review comment:
   Sure I will check with isAccessible() and then if not, skip inclusion


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 95649)
Time Spent: 4.5h  (was: 4h 20m)

> Allow ValueProvider for KafkaIO
> ---
>
> Key: BEAM-3925
> URL: https://issues.apache.org/jira/browse/BEAM-3925
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core
>Reporter: Sameer Abhyankar
>Assignee: Pramod Upamanyu
>Priority: Major
>  Time Spent: 4.5h
>  Remaining Estimate: 0h
>
> Add ValueProvider support for the various methods in KafkaIO. This would 
> allow us to use KafkaIO in reusable pipeline templates.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3925) Allow ValueProvider for KafkaIO

2018-04-26 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3925?focusedWorklogId=95651&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-95651
 ]

ASF GitHub Bot logged work on BEAM-3925:


Author: ASF GitHub Bot
Created on: 26/Apr/18 17:23
Start Date: 26/Apr/18 17:23
Worklog Time Spent: 10m 
  Work Description: pupamanyu commented on a change in pull request #5141: 
[BEAM-3925] Allow ValueProvider for KafkaIO so that we can create Beam 
Templates using KafkaIO
URL: https://github.com/apache/beam/pull/5141#discussion_r184469156
 
 

 ##
 File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
 ##
 @@ -360,32 +387,85 @@
  * of how the partitions are distributed among the splits.
  */
 public Read withTopicPartitions(List 
topicPartitions) {
-  checkState(getTopics().isEmpty(), "Only topics or topicPartitions can be 
set, not both");
-  return 
toBuilder().setTopicPartitions(ImmutableList.copyOf(topicPartitions)).build();
+  return 
withTopicPartitions(ValueProvider.StaticValueProvider.of(topicPartitions));
+}
+
+/**
+ * Like above but with a {@link ValueProvider 
ValueProvider>}.
 
 Review comment:
   ack


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 95651)
Time Spent: 4h 40m  (was: 4.5h)

> Allow ValueProvider for KafkaIO
> ---
>
> Key: BEAM-3925
> URL: https://issues.apache.org/jira/browse/BEAM-3925
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core
>Reporter: Sameer Abhyankar
>Assignee: Pramod Upamanyu
>Priority: Major
>  Time Spent: 4h 40m
>  Remaining Estimate: 0h
>
> Add ValueProvider support for the various methods in KafkaIO. This would 
> allow us to use KafkaIO in reusable pipeline templates.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3925) Allow ValueProvider for KafkaIO

2018-04-26 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3925?focusedWorklogId=95646&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-95646
 ]

ASF GitHub Bot logged work on BEAM-3925:


Author: ASF GitHub Bot
Created on: 26/Apr/18 17:22
Start Date: 26/Apr/18 17:22
Worklog Time Spent: 10m 
  Work Description: pupamanyu commented on a change in pull request #5141: 
[BEAM-3925] Allow ValueProvider for KafkaIO so that we can create Beam 
Templates using KafkaIO
URL: https://github.com/apache/beam/pull/5141#discussion_r184468679
 
 

 ##
 File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProviders.java
 ##
 @@ -55,4 +60,33 @@ public static String updateSerializedOptions(
   throw new RuntimeException("Unable to parse re-serialize options", e);
 }
   }
+  /**
+   * Used to build a {@link ValueProvider} for {@link List List}.
+   */
+  @SuppressWarnings("unchecked")
+  public static ValueProvider> 
commaSeparatedStrings(ValueProvider csv) {
+return ValueProvider.NestedValueProvider.of(csv, new CsvTranslator());
+  }
+
+  /**
+   * Used to build a {@link ValueProvider} for {@link List List}.
+   */
+  private static class CsvTranslator implements SerializableFunction> {
+@Override
+public List apply(String csv) {
+  return ImmutableList.copyOf(
+  Splitter.on(',').trimResults().omitEmptyStrings().splitToList(csv));
+}
+  }
+
+  public static Class classForName(String className) {
+  Class clazz;
+  try {
+clazz =  Class.forName(className);
+  } catch (ClassNotFoundException e) {
+throw new RuntimeException("Please check for the existence of the 
KeySerializer Class",
 
 Review comment:
   I will change the message to "Please check for the existence of the Class"


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 95646)
Time Spent: 4h 20m  (was: 4h 10m)

> Allow ValueProvider for KafkaIO
> ---
>
> Key: BEAM-3925
> URL: https://issues.apache.org/jira/browse/BEAM-3925
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core
>Reporter: Sameer Abhyankar
>Assignee: Pramod Upamanyu
>Priority: Major
>  Time Spent: 4h 20m
>  Remaining Estimate: 0h
>
> Add ValueProvider support for the various methods in KafkaIO. This would 
> allow us to use KafkaIO in reusable pipeline templates.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3925) Allow ValueProvider for KafkaIO

2018-04-26 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3925?focusedWorklogId=95645&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-95645
 ]

ASF GitHub Bot logged work on BEAM-3925:


Author: ASF GitHub Bot
Created on: 26/Apr/18 17:21
Start Date: 26/Apr/18 17:21
Worklog Time Spent: 10m 
  Work Description: pupamanyu commented on a change in pull request #5141: 
[BEAM-3925] Allow ValueProvider for KafkaIO so that we can create Beam 
Templates using KafkaIO
URL: https://github.com/apache/beam/pull/5141#discussion_r184468564
 
 

 ##
 File path: .gitignore
 ##
 @@ -12,6 +12,7 @@
 **/build/**/*
 **/vendor/**/*
 **/.gradletasknamecache
+**/gradle/**/*
 
 Review comment:
   Sure I will remove this as part of this commit. Thanks for this feedback


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 95645)
Time Spent: 4h 10m  (was: 4h)

> Allow ValueProvider for KafkaIO
> ---
>
> Key: BEAM-3925
> URL: https://issues.apache.org/jira/browse/BEAM-3925
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core
>Reporter: Sameer Abhyankar
>Assignee: Pramod Upamanyu
>Priority: Major
>  Time Spent: 4h 10m
>  Remaining Estimate: 0h
>
> Add ValueProvider support for the various methods in KafkaIO. This would 
> allow us to use KafkaIO in reusable pipeline templates.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3925) Allow ValueProvider for KafkaIO

2018-04-26 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3925?focusedWorklogId=95643&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-95643
 ]

ASF GitHub Bot logged work on BEAM-3925:


Author: ASF GitHub Bot
Created on: 26/Apr/18 17:08
Start Date: 26/Apr/18 17:08
Worklog Time Spent: 10m 
  Work Description: rangadi commented on issue #5141: [BEAM-3925] Allow 
ValueProvider for KafkaIO so that we can create Beam Templates using KafkaIO
URL: https://github.com/apache/beam/pull/5141#issuecomment-384717287
 
 
   Thanks for the update Pramod. Made a few comments above. I will think a bit 
more about the over all approach to providing these options. If reader is 
initialized with topic, the partition list is read at the graph building time. 
I don't have much experience with templates, but the graph building phase is 
not executed for a template, right? When is the number of source partitions 
decided in a template?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 95643)
Time Spent: 4h  (was: 3h 50m)

> Allow ValueProvider for KafkaIO
> ---
>
> Key: BEAM-3925
> URL: https://issues.apache.org/jira/browse/BEAM-3925
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core
>Reporter: Sameer Abhyankar
>Assignee: Pramod Upamanyu
>Priority: Major
>  Time Spent: 4h
>  Remaining Estimate: 0h
>
> Add ValueProvider support for the various methods in KafkaIO. This would 
> allow us to use KafkaIO in reusable pipeline templates.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3925) Allow ValueProvider for KafkaIO

2018-04-26 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3925?focusedWorklogId=95630&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-95630
 ]

ASF GitHub Bot logged work on BEAM-3925:


Author: ASF GitHub Bot
Created on: 26/Apr/18 17:02
Start Date: 26/Apr/18 17:02
Worklog Time Spent: 10m 
  Work Description: rangadi commented on a change in pull request #5141: 
[BEAM-3925] Allow ValueProvider for KafkaIO so that we can create Beam 
Templates using KafkaIO
URL: https://github.com/apache/beam/pull/5141#discussion_r184170527
 
 

 ##
 File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java
 ##
 @@ -442,7 +442,7 @@ void sendRecord(TimestampedValue> record, Counter 
sendCounter) {
 
   producer.send(
   new ProducerRecord<>(
-  spec.getTopic(), null, timestampMillis,
+  spec.getTopic().get(), null, timestampMillis,
 
 Review comment:
   Note to self: check if `get()` is cheap for value providers, otherwise use a 
member var.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 95630)
Time Spent: 3h 20m  (was: 3h 10m)

> Allow ValueProvider for KafkaIO
> ---
>
> Key: BEAM-3925
> URL: https://issues.apache.org/jira/browse/BEAM-3925
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core
>Reporter: Sameer Abhyankar
>Assignee: Pramod Upamanyu
>Priority: Major
>  Time Spent: 3h 20m
>  Remaining Estimate: 0h
>
> Add ValueProvider support for the various methods in KafkaIO. This would 
> allow us to use KafkaIO in reusable pipeline templates.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3925) Allow ValueProvider for KafkaIO

2018-04-26 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3925?focusedWorklogId=95634&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-95634
 ]

ASF GitHub Bot logged work on BEAM-3925:


Author: ASF GitHub Bot
Created on: 26/Apr/18 17:02
Start Date: 26/Apr/18 17:02
Worklog Time Spent: 10m 
  Work Description: rangadi commented on a change in pull request #5141: 
[BEAM-3925] Allow ValueProvider for KafkaIO so that we can create Beam 
Templates using KafkaIO
URL: https://github.com/apache/beam/pull/5141#discussion_r184141322
 
 

 ##
 File path: .gitignore
 ##
 @@ -12,6 +12,7 @@
 **/build/**/*
 **/vendor/**/*
 **/.gradletasknamecache
+**/gradle/**/*
 
 Review comment:
   I think top level gradle directory is part of git. Can you move this to a 
different PR so that someone else can review?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 95634)
Time Spent: 3h 50m  (was: 3h 40m)

> Allow ValueProvider for KafkaIO
> ---
>
> Key: BEAM-3925
> URL: https://issues.apache.org/jira/browse/BEAM-3925
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core
>Reporter: Sameer Abhyankar
>Assignee: Pramod Upamanyu
>Priority: Major
>  Time Spent: 3h 50m
>  Remaining Estimate: 0h
>
> Add ValueProvider support for the various methods in KafkaIO. This would 
> allow us to use KafkaIO in reusable pipeline templates.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3925) Allow ValueProvider for KafkaIO

2018-04-26 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3925?focusedWorklogId=95633&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-95633
 ]

ASF GitHub Bot logged work on BEAM-3925:


Author: ASF GitHub Bot
Created on: 26/Apr/18 17:02
Start Date: 26/Apr/18 17:02
Worklog Time Spent: 10m 
  Work Description: rangadi commented on a change in pull request #5141: 
[BEAM-3925] Allow ValueProvider for KafkaIO so that we can create Beam 
Templates using KafkaIO
URL: https://github.com/apache/beam/pull/5141#discussion_r184167745
 
 

 ##
 File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProviders.java
 ##
 @@ -55,4 +60,33 @@ public static String updateSerializedOptions(
   throw new RuntimeException("Unable to parse re-serialize options", e);
 }
   }
+  /**
+   * Used to build a {@link ValueProvider} for {@link List List}.
+   */
+  @SuppressWarnings("unchecked")
+  public static ValueProvider> 
commaSeparatedStrings(ValueProvider csv) {
+return ValueProvider.NestedValueProvider.of(csv, new CsvTranslator());
+  }
+
+  /**
+   * Used to build a {@link ValueProvider} for {@link List List}.
+   */
+  private static class CsvTranslator implements SerializableFunction> {
+@Override
+public List apply(String csv) {
+  return ImmutableList.copyOf(
+  Splitter.on(',').trimResults().omitEmptyStrings().splitToList(csv));
+}
+  }
+
+  public static Class classForName(String className) {
+  Class clazz;
+  try {
+clazz =  Class.forName(className);
+  } catch (ClassNotFoundException e) {
+throw new RuntimeException("Please check for the existence of the 
KeySerializer Class",
 
 Review comment:
   The message is not required (it seems kafkaio specific as well).


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 95633)
Time Spent: 3h 40m  (was: 3.5h)

> Allow ValueProvider for KafkaIO
> ---
>
> Key: BEAM-3925
> URL: https://issues.apache.org/jira/browse/BEAM-3925
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core
>Reporter: Sameer Abhyankar
>Assignee: Pramod Upamanyu
>Priority: Major
>  Time Spent: 3h 40m
>  Remaining Estimate: 0h
>
> Add ValueProvider support for the various methods in KafkaIO. This would 
> allow us to use KafkaIO in reusable pipeline templates.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3925) Allow ValueProvider for KafkaIO

2018-04-26 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3925?focusedWorklogId=95631&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-95631
 ]

ASF GitHub Bot logged work on BEAM-3925:


Author: ASF GitHub Bot
Created on: 26/Apr/18 17:02
Start Date: 26/Apr/18 17:02
Worklog Time Spent: 10m 
  Work Description: rangadi commented on a change in pull request #5141: 
[BEAM-3925] Allow ValueProvider for KafkaIO so that we can create Beam 
Templates using KafkaIO
URL: https://github.com/apache/beam/pull/5141#discussion_r184170193
 
 

 ##
 File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java
 ##
 @@ -168,7 +168,7 @@ static void ensureEOSSupport() {
   .apply("Assign sequential ids", ParDo.of(new Sequencer<>()))
   .apply("Persist ids", GroupByKey.create())
   .apply(
-String.format("Write to Kafka topic '%s'", spec.getTopic()),
+String.format("Write to Kafka topic '%s'", spec.getTopic().get()),
 
 Review comment:
   Code executes on the graph build time. get() does not work for runtime 
values there (which would be the case for templates, right?). You can check if 
the value is available and not include it in the transform name. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 95631)
Time Spent: 3h 20m  (was: 3h 10m)

> Allow ValueProvider for KafkaIO
> ---
>
> Key: BEAM-3925
> URL: https://issues.apache.org/jira/browse/BEAM-3925
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core
>Reporter: Sameer Abhyankar
>Assignee: Pramod Upamanyu
>Priority: Major
>  Time Spent: 3h 20m
>  Remaining Estimate: 0h
>
> Add ValueProvider support for the various methods in KafkaIO. This would 
> allow us to use KafkaIO in reusable pipeline templates.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3925) Allow ValueProvider for KafkaIO

2018-04-26 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3925?focusedWorklogId=95636&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-95636
 ]

ASF GitHub Bot logged work on BEAM-3925:


Author: ASF GitHub Bot
Created on: 26/Apr/18 17:02
Start Date: 26/Apr/18 17:02
Worklog Time Spent: 10m 
  Work Description: rangadi commented on a change in pull request #5141: 
[BEAM-3925] Allow ValueProvider for KafkaIO so that we can create Beam 
Templates using KafkaIO
URL: https://github.com/apache/beam/pull/5141#discussion_r184172660
 
 

 ##
 File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
 ##
 @@ -336,22 +347,38 @@
  * of how the partitions are distributed among the splits.
  */
 public Read withTopic(String topic) {
-  return withTopics(ImmutableList.of(topic));
+  return 
withTopics(commaSeparatedStrings(ValueProvider.StaticValueProvider.of(topic)));
+}
+
+/**
+ * Like above but with a {@link ValueProvider ValueProvider}.
+ */
+public Read withTopic(ValueProvider topic) {
+  return withTopics(commaSeparatedStrings(topic));
 }
 
 /**
- * Sets a list of topics to read from. All the partitions from each
- * of the topics are read.
+ * Sets a list of topics to read from with a {@link List 
List}.
+ * All the partitions from each of the topics are read.
  *
  * See {@link KafkaUnboundedSource#split(int, PipelineOptions)} for 
description
  * of how the partitions are distributed among the splits.
  */
 public Read withTopics(List topics) {
-  checkState(
-  getTopicPartitions().isEmpty(), "Only topics or topicPartitions can 
be set, not both");
-  return toBuilder().setTopics(ImmutableList.copyOf(topics)).build();
+  return withTopics(ValueProvider.StaticValueProvider.of(topics));
 }
 
+/**
+ * Like above but with a {@link ValueProvider 
ValueProvider>}.
+ *
+ */
+public Read withTopics(ValueProvider> topics) {
+  checkArgument(
+  getTopicPartitions().isAccessible(),
 
 Review comment:
   Did you mean '!'? Also, it is accessible, but should be empty. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 95636)
Time Spent: 3h 50m  (was: 3h 40m)

> Allow ValueProvider for KafkaIO
> ---
>
> Key: BEAM-3925
> URL: https://issues.apache.org/jira/browse/BEAM-3925
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core
>Reporter: Sameer Abhyankar
>Assignee: Pramod Upamanyu
>Priority: Major
>  Time Spent: 3h 50m
>  Remaining Estimate: 0h
>
> Add ValueProvider support for the various methods in KafkaIO. This would 
> allow us to use KafkaIO in reusable pipeline templates.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3925) Allow ValueProvider for KafkaIO

2018-04-26 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3925?focusedWorklogId=95632&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-95632
 ]

ASF GitHub Bot logged work on BEAM-3925:


Author: ASF GitHub Bot
Created on: 26/Apr/18 17:02
Start Date: 26/Apr/18 17:02
Worklog Time Spent: 10m 
  Work Description: rangadi commented on a change in pull request #5141: 
[BEAM-3925] Allow ValueProvider for KafkaIO so that we can create Beam 
Templates using KafkaIO
URL: https://github.com/apache/beam/pull/5141#discussion_r184167984
 
 

 ##
 File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProviders.java
 ##
 @@ -55,4 +60,33 @@ public static String updateSerializedOptions(
   throw new RuntimeException("Unable to parse re-serialize options", e);
 }
   }
+  /**
+   * Used to build a {@link ValueProvider} for {@link List List}.
+   */
+  @SuppressWarnings("unchecked")
+  public static ValueProvider> 
commaSeparatedStrings(ValueProvider csv) {
+return ValueProvider.NestedValueProvider.of(csv, new CsvTranslator());
+  }
+
+  /**
+   * Used to build a {@link ValueProvider} for {@link List List}.
+   */
+  private static class CsvTranslator implements SerializableFunction> {
+@Override
+public List apply(String csv) {
+  return ImmutableList.copyOf(
+  Splitter.on(',').trimResults().omitEmptyStrings().splitToList(csv));
+}
+  }
+
+  public static Class classForName(String className) {
+  Class clazz;
 
 Review comment:
   use 2 space indentation. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 95632)
Time Spent: 3.5h  (was: 3h 20m)

> Allow ValueProvider for KafkaIO
> ---
>
> Key: BEAM-3925
> URL: https://issues.apache.org/jira/browse/BEAM-3925
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core
>Reporter: Sameer Abhyankar
>Assignee: Pramod Upamanyu
>Priority: Major
>  Time Spent: 3.5h
>  Remaining Estimate: 0h
>
> Add ValueProvider support for the various methods in KafkaIO. This would 
> allow us to use KafkaIO in reusable pipeline templates.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3925) Allow ValueProvider for KafkaIO

2018-04-26 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3925?focusedWorklogId=95637&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-95637
 ]

ASF GitHub Bot logged work on BEAM-3925:


Author: ASF GitHub Bot
Created on: 26/Apr/18 17:02
Start Date: 26/Apr/18 17:02
Worklog Time Spent: 10m 
  Work Description: rangadi commented on a change in pull request #5141: 
[BEAM-3925] Allow ValueProvider for KafkaIO so that we can create Beam 
Templates using KafkaIO
URL: https://github.com/apache/beam/pull/5141#discussion_r184173372
 
 

 ##
 File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
 ##
 @@ -360,32 +387,85 @@
  * of how the partitions are distributed among the splits.
  */
 public Read withTopicPartitions(List 
topicPartitions) {
-  checkState(getTopics().isEmpty(), "Only topics or topicPartitions can be 
set, not both");
-  return 
toBuilder().setTopicPartitions(ImmutableList.copyOf(topicPartitions)).build();
+  return 
withTopicPartitions(ValueProvider.StaticValueProvider.of(topicPartitions));
+}
+
+/**
+ * Like above but with a {@link ValueProvider 
ValueProvider>}.
+ *
+ */
+public Read withTopicPartitions(ValueProvider> 
topicPartitions) {
+  checkArgument(
+  getTopics().isAccessible(),
+  "Only topics or topicPartitions can be set, not both");
+  return toBuilder().setTopicPartitions(topicPartitions).build();
 }
 
 /**
- * Sets a Kafka {@link Deserializer} to interpret key bytes read from 
Kafka.
+ * Sets a Kafka {@link Deserializer Deserializer} for 
interpreting key bytes read.
+ * This uses the {@link String} provided to set the Deserializer
+ */
+public Read withKeyDeserializerClassName(String keyDeserializer) {
+  return 
withKeyDeserializerClassName(ValueProvider.StaticValueProvider.of(keyDeserializer));
+}
+
+/**
+ * Like above but with a {@link ValueProvider ValueProvider}.
 
 Review comment:
   `{@link ValueProvider ValueProvider}.` looks odd. Please fix 
these links every where in the PR.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 95637)

> Allow ValueProvider for KafkaIO
> ---
>
> Key: BEAM-3925
> URL: https://issues.apache.org/jira/browse/BEAM-3925
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core
>Reporter: Sameer Abhyankar
>Assignee: Pramod Upamanyu
>Priority: Major
>  Time Spent: 3h 50m
>  Remaining Estimate: 0h
>
> Add ValueProvider support for the various methods in KafkaIO. This would 
> allow us to use KafkaIO in reusable pipeline templates.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3925) Allow ValueProvider for KafkaIO

2018-04-26 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3925?focusedWorklogId=95635&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-95635
 ]

ASF GitHub Bot logged work on BEAM-3925:


Author: ASF GitHub Bot
Created on: 26/Apr/18 17:02
Start Date: 26/Apr/18 17:02
Worklog Time Spent: 10m 
  Work Description: rangadi commented on a change in pull request #5141: 
[BEAM-3925] Allow ValueProvider for KafkaIO so that we can create Beam 
Templates using KafkaIO
URL: https://github.com/apache/beam/pull/5141#discussion_r184173175
 
 

 ##
 File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
 ##
 @@ -360,32 +387,85 @@
  * of how the partitions are distributed among the splits.
  */
 public Read withTopicPartitions(List 
topicPartitions) {
-  checkState(getTopics().isEmpty(), "Only topics or topicPartitions can be 
set, not both");
-  return 
toBuilder().setTopicPartitions(ImmutableList.copyOf(topicPartitions)).build();
+  return 
withTopicPartitions(ValueProvider.StaticValueProvider.of(topicPartitions));
+}
+
+/**
+ * Like above but with a {@link ValueProvider 
ValueProvider>}.
+ *
+ */
+public Read withTopicPartitions(ValueProvider> 
topicPartitions) {
+  checkArgument(
+  getTopics().isAccessible(),
+  "Only topics or topicPartitions can be set, not both");
+  return toBuilder().setTopicPartitions(topicPartitions).build();
 }
 
 /**
- * Sets a Kafka {@link Deserializer} to interpret key bytes read from 
Kafka.
+ * Sets a Kafka {@link Deserializer Deserializer} for 
interpreting key bytes read.
+ * This uses the {@link String} provided to set the Deserializer
+ */
+public Read withKeyDeserializerClassName(String keyDeserializer) {
+  return 
withKeyDeserializerClassName(ValueProvider.StaticValueProvider.of(keyDeserializer));
+}
+
+/**
+ * Like above but with a {@link ValueProvider ValueProvider}.
+ */
+public Read withKeyDeserializerClassName(ValueProvider 
keyDeserializer) {
+  return toBuilder().setKeyDeserializer(ValueProvider
+  .NestedValueProvider.of(keyDeserializer, new 
KeyDeserializerTranslator())).build();
+}
+
+/**
+ * Sets a Kafka {@link Deserializer Deserializer} to interpret 
key bytes read.
  *
  * In addition, Beam also needs a {@link Coder} to serialize and 
deserialize key objects at
  * runtime. KafkaIO tries to infer a coder for the key based on the {@link 
Deserializer} class,
  * however in case that fails, you can use {@link 
#withKeyDeserializerAndCoder(Class, Coder)} to
  * provide the key coder explicitly.
  */
-public Read withKeyDeserializer(Class> 
keyDeserializer) {
-  return toBuilder().setKeyDeserializer(keyDeserializer).build();
+public Read withKeyDeserializerClassName(
 
 Review comment:
   Why is this method's name changed?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 95635)
Time Spent: 3h 50m  (was: 3h 40m)

> Allow ValueProvider for KafkaIO
> ---
>
> Key: BEAM-3925
> URL: https://issues.apache.org/jira/browse/BEAM-3925
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core
>Reporter: Sameer Abhyankar
>Assignee: Pramod Upamanyu
>Priority: Major
>  Time Spent: 3h 50m
>  Remaining Estimate: 0h
>
> Add ValueProvider support for the various methods in KafkaIO. This would 
> allow us to use KafkaIO in reusable pipeline templates.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3925) Allow ValueProvider for KafkaIO

2018-04-26 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3925?focusedWorklogId=95629&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-95629
 ]

ASF GitHub Bot logged work on BEAM-3925:


Author: ASF GitHub Bot
Created on: 26/Apr/18 17:02
Start Date: 26/Apr/18 17:02
Worklog Time Spent: 10m 
  Work Description: rangadi commented on a change in pull request #5141: 
[BEAM-3925] Allow ValueProvider for KafkaIO so that we can create Beam 
Templates using KafkaIO
URL: https://github.com/apache/beam/pull/5141#discussion_r184171914
 
 

 ##
 File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
 ##
 @@ -360,32 +387,85 @@
  * of how the partitions are distributed among the splits.
  */
 public Read withTopicPartitions(List 
topicPartitions) {
-  checkState(getTopics().isEmpty(), "Only topics or topicPartitions can be 
set, not both");
-  return 
toBuilder().setTopicPartitions(ImmutableList.copyOf(topicPartitions)).build();
+  return 
withTopicPartitions(ValueProvider.StaticValueProvider.of(topicPartitions));
+}
+
+/**
+ * Like above but with a {@link ValueProvider 
ValueProvider>}.
 
 Review comment:
   Rather than `Like above` Please refer to the actual method. Also fix 
ValueProvider link and remove extra empty lines.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 95629)
Time Spent: 3h 20m  (was: 3h 10m)

> Allow ValueProvider for KafkaIO
> ---
>
> Key: BEAM-3925
> URL: https://issues.apache.org/jira/browse/BEAM-3925
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core
>Reporter: Sameer Abhyankar
>Assignee: Pramod Upamanyu
>Priority: Major
>  Time Spent: 3h 20m
>  Remaining Estimate: 0h
>
> Add ValueProvider support for the various methods in KafkaIO. This would 
> allow us to use KafkaIO in reusable pipeline templates.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3925) Allow ValueProvider for KafkaIO

2018-04-25 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3925?focusedWorklogId=95334&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-95334
 ]

ASF GitHub Bot logged work on BEAM-3925:


Author: ASF GitHub Bot
Created on: 26/Apr/18 04:22
Start Date: 26/Apr/18 04:22
Worklog Time Spent: 10m 
  Work Description: pupamanyu commented on a change in pull request #5141: 
[BEAM-3925] Allow ValueProvider for KafkaIO so that we can create Beam 
Templates using KafkaIO
URL: https://github.com/apache/beam/pull/5141#discussion_r183844484
 
 

 ##
 File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
 ##
 @@ -359,20 +386,129 @@
  * of how the partitions are distributed among the splits.
  */
 public Read withTopicPartitions(List 
topicPartitions) {
-  checkState(getTopics().isEmpty(), "Only topics or topicPartitions can be 
set, not both");
-  return 
toBuilder().setTopicPartitions(ImmutableList.copyOf(topicPartitions)).build();
+  return withTopicPartitions(
+  
ValueProvider.StaticValueProvider.of(Joiner.on(',').join(topicPartitions)));
+}
+
+/**
+ * Sets a list of partitions to read from. Partitions are provided as a 
comma separated list of
+ * Strings in the format: topic1-partition1,topic1-partition2...
+ * This allows reading only a subset of partitions for one or more topics 
when (if ever) needed.
+ *
+ * See {@link KafkaUnboundedSource#split(int, PipelineOptions)} for 
description
+ * of how the partitions are distributed among the splits.
+ */
+public Read withTopicPartitions(String topicPartitions) {
+  return 
withTopicPartitions(ValueProvider.StaticValueProvider.of(topicPartitions));
+}
+
+/**
+ * Like above but with a {@link ValueProvider ValueProvider}.
+ */
+public Read withTopicPartitions(ValueProvider 
topicPartitions) {
+  checkState(getTopics().get().isEmpty(),
+  "Only topics or topicPartitions can be set, not both");
+  return toBuilder().setTopicPartitions(ValueProvider
+  .NestedValueProvider.of(topicPartitions, new 
TopicPartitionTranslator())).build();
+}
+
+/**
+ * Used to build a {@link ValueProvider} for {@link List 
List}.
+ */
+private static class TopicTranslator implements 
SerializableFunction> {
+  @Override
+  public List apply(String topics) {
+return ImmutableList.copyOf(
+
Splitter.on(',').trimResults().omitEmptyStrings().splitToList(topics));
+  }
+
+}
+
+/**
+ * Used to build a {@link ValueProvider} for {@link List 
List}.
+ */
+private static class TopicPartitionTranslator
+implements SerializableFunction> {
+  @Override
+  public List apply(String topicPartitions) {
+List topicPartitionList = new ArrayList<>();
+for (String topicPartition: 
Splitter.on(',').trimResults().omitEmptyStrings()
+  .splitToList(topicPartitions)) {
+  topicPartitionList
+  .add(new 
TopicPartition(Splitter.on('-').splitToList(topicPartition).get(0),
+  
Integer.parseInt(Splitter.on('-').splitToList(topicPartition).get(1;
+}
+  return ImmutableList.copyOf(topicPartitionList);
+  }
+}
+
+/**
+ * Sets a Kafka {@link Deserializer Deserializer} for 
interpreting key bytes read.
+ * This uses the {@link String} provided to set the Deserializer
+ */
+public Read withKeyDeserializer(String keyDeserializer) {
+  return 
withKeyDeserializer(ValueProvider.StaticValueProvider.of(keyDeserializer));
+}
+
+/**
+ * Like above but with a {@link ValueProvider ValueProvider}.
+ */
+public Read withKeyDeserializer(ValueProvider 
keyDeserializer) {
+  return toBuilder().setKeyDeserializer(ValueProvider
+  .NestedValueProvider.of(keyDeserializer, new 
KeyDeserializerTranslator())).build();
 }
 
 /**
- * Sets a Kafka {@link Deserializer} to interpret key bytes read from 
Kafka.
+ * Sets a Kafka {@link Deserializer Deserializer} to interpret 
key bytes read.
  *
  * In addition, Beam also needs a {@link Coder} to serialize and 
deserialize key objects at
  * runtime. KafkaIO tries to infer a coder for the key based on the {@link 
Deserializer} class,
  * however in case that fails, you can use {@link 
#withKeyDeserializerAndCoder(Class, Coder)} to
  * provide the key coder explicitly.
  */
 public Read withKeyDeserializer(Class> 
keyDeserializer) {
-  return toBuilder().setKeyDeserializer(keyDeserializer).build();
+  return toBuilder().setKeyDeserializer(
+  ValueProvider.StaticValueProvider.of(keyDeserializer)).build();
+}
+
+/**
+ * Used to build a {@link ValueProvider} for {@link Deserializer 

[jira] [Work logged] (BEAM-3925) Allow ValueProvider for KafkaIO

2018-04-25 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3925?focusedWorklogId=95332&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-95332
 ]

ASF GitHub Bot logged work on BEAM-3925:


Author: ASF GitHub Bot
Created on: 26/Apr/18 04:21
Start Date: 26/Apr/18 04:21
Worklog Time Spent: 10m 
  Work Description: pupamanyu commented on a change in pull request #5141: 
[BEAM-3925] Allow ValueProvider for KafkaIO so that we can create Beam 
Templates using KafkaIO
URL: https://github.com/apache/beam/pull/5141#discussion_r183844484
 
 

 ##
 File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
 ##
 @@ -359,20 +386,129 @@
  * of how the partitions are distributed among the splits.
  */
 public Read withTopicPartitions(List 
topicPartitions) {
-  checkState(getTopics().isEmpty(), "Only topics or topicPartitions can be 
set, not both");
-  return 
toBuilder().setTopicPartitions(ImmutableList.copyOf(topicPartitions)).build();
+  return withTopicPartitions(
+  
ValueProvider.StaticValueProvider.of(Joiner.on(',').join(topicPartitions)));
+}
+
+/**
+ * Sets a list of partitions to read from. Partitions are provided as a 
comma separated list of
+ * Strings in the format: topic1-partition1,topic1-partition2...
+ * This allows reading only a subset of partitions for one or more topics 
when (if ever) needed.
+ *
+ * See {@link KafkaUnboundedSource#split(int, PipelineOptions)} for 
description
+ * of how the partitions are distributed among the splits.
+ */
+public Read withTopicPartitions(String topicPartitions) {
+  return 
withTopicPartitions(ValueProvider.StaticValueProvider.of(topicPartitions));
+}
+
+/**
+ * Like above but with a {@link ValueProvider ValueProvider}.
+ */
+public Read withTopicPartitions(ValueProvider 
topicPartitions) {
+  checkState(getTopics().get().isEmpty(),
+  "Only topics or topicPartitions can be set, not both");
+  return toBuilder().setTopicPartitions(ValueProvider
+  .NestedValueProvider.of(topicPartitions, new 
TopicPartitionTranslator())).build();
+}
+
+/**
+ * Used to build a {@link ValueProvider} for {@link List 
List}.
+ */
+private static class TopicTranslator implements 
SerializableFunction> {
+  @Override
+  public List apply(String topics) {
+return ImmutableList.copyOf(
+
Splitter.on(',').trimResults().omitEmptyStrings().splitToList(topics));
+  }
+
+}
+
+/**
+ * Used to build a {@link ValueProvider} for {@link List 
List}.
+ */
+private static class TopicPartitionTranslator
+implements SerializableFunction> {
+  @Override
+  public List apply(String topicPartitions) {
+List topicPartitionList = new ArrayList<>();
+for (String topicPartition: 
Splitter.on(',').trimResults().omitEmptyStrings()
+  .splitToList(topicPartitions)) {
+  topicPartitionList
+  .add(new 
TopicPartition(Splitter.on('-').splitToList(topicPartition).get(0),
+  
Integer.parseInt(Splitter.on('-').splitToList(topicPartition).get(1;
+}
+  return ImmutableList.copyOf(topicPartitionList);
+  }
+}
+
+/**
+ * Sets a Kafka {@link Deserializer Deserializer} for 
interpreting key bytes read.
+ * This uses the {@link String} provided to set the Deserializer
+ */
+public Read withKeyDeserializer(String keyDeserializer) {
+  return 
withKeyDeserializer(ValueProvider.StaticValueProvider.of(keyDeserializer));
+}
+
+/**
+ * Like above but with a {@link ValueProvider ValueProvider}.
+ */
+public Read withKeyDeserializer(ValueProvider 
keyDeserializer) {
+  return toBuilder().setKeyDeserializer(ValueProvider
+  .NestedValueProvider.of(keyDeserializer, new 
KeyDeserializerTranslator())).build();
 }
 
 /**
- * Sets a Kafka {@link Deserializer} to interpret key bytes read from 
Kafka.
+ * Sets a Kafka {@link Deserializer Deserializer} to interpret 
key bytes read.
  *
  * In addition, Beam also needs a {@link Coder} to serialize and 
deserialize key objects at
  * runtime. KafkaIO tries to infer a coder for the key based on the {@link 
Deserializer} class,
  * however in case that fails, you can use {@link 
#withKeyDeserializerAndCoder(Class, Coder)} to
  * provide the key coder explicitly.
  */
 public Read withKeyDeserializer(Class> 
keyDeserializer) {
-  return toBuilder().setKeyDeserializer(keyDeserializer).build();
+  return toBuilder().setKeyDeserializer(
+  ValueProvider.StaticValueProvider.of(keyDeserializer)).build();
+}
+
+/**
+ * Used to build a {@link ValueProvider} for {@link Deserializer 

[jira] [Work logged] (BEAM-3925) Allow ValueProvider for KafkaIO

2018-04-25 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3925?focusedWorklogId=95130&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-95130
 ]

ASF GitHub Bot logged work on BEAM-3925:


Author: ASF GitHub Bot
Created on: 25/Apr/18 17:36
Start Date: 25/Apr/18 17:36
Worklog Time Spent: 10m 
  Work Description: pupamanyu commented on a change in pull request #5141: 
[BEAM-3925] Allow ValueProvider for KafkaIO so that we can create Beam 
Templates using KafkaIO
URL: https://github.com/apache/beam/pull/5141#discussion_r183846958
 
 

 ##
 File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
 ##
 @@ -335,22 +346,38 @@
  * of how the partitions are distributed among the splits.
  */
 public Read withTopic(String topic) {
-  return withTopics(ImmutableList.of(topic));
+  return withTopic(ValueProvider.StaticValueProvider.of(topic));
 }
 
 /**
- * Sets a list of topics to read from. All the partitions from each
- * of the topics are read.
+ * Like above but with a {@link ValueProvider ValueProvider}.
+ */
+public Read withTopic(ValueProvider topic) {
+  return withTopics(topic);
+}
+
+/**
+ * Sets a list of topics to read from with a {@link List 
List}.
+ * All the partitions from each of the topics are read.
  *
  * See {@link KafkaUnboundedSource#split(int, PipelineOptions)} for 
description
  * of how the partitions are distributed among the splits.
  */
 public Read withTopics(List topics) {
-  checkState(
-  getTopicPartitions().isEmpty(), "Only topics or topicPartitions can 
be set, not both");
-  return toBuilder().setTopics(ImmutableList.copyOf(topics)).build();
+  return 
withTopics(ValueProvider.StaticValueProvider.of(Joiner.on(',').join(topics)));
 }
 
+/**
+ * Like above but with a {@link ValueProvider ValueProvider}.
+ * The format is comma separated String of topics
+ */
+public Read withTopics(ValueProvider topics) {
+  checkState(
+  getTopicPartitions().get().isEmpty(),
 
 Review comment:
   I have changed the getTopicPartitions().get().isEmpty() to 
getTopicPartitions().isAccessible()


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 95130)
Time Spent: 2h 50m  (was: 2h 40m)

> Allow ValueProvider for KafkaIO
> ---
>
> Key: BEAM-3925
> URL: https://issues.apache.org/jira/browse/BEAM-3925
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core
>Reporter: Sameer Abhyankar
>Assignee: Pramod Upamanyu
>Priority: Major
>  Time Spent: 2h 50m
>  Remaining Estimate: 0h
>
> Add ValueProvider support for the various methods in KafkaIO. This would 
> allow us to use KafkaIO in reusable pipeline templates.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3925) Allow ValueProvider for KafkaIO

2018-04-25 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3925?focusedWorklogId=95128&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-95128
 ]

ASF GitHub Bot logged work on BEAM-3925:


Author: ASF GitHub Bot
Created on: 25/Apr/18 17:35
Start Date: 25/Apr/18 17:35
Worklog Time Spent: 10m 
  Work Description: pupamanyu commented on a change in pull request #5141: 
[BEAM-3925] Allow ValueProvider for KafkaIO so that we can create Beam 
Templates using KafkaIO
URL: https://github.com/apache/beam/pull/5141#discussion_r184146288
 
 

 ##
 File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
 ##
 @@ -359,20 +386,129 @@
  * of how the partitions are distributed among the splits.
  */
 public Read withTopicPartitions(List 
topicPartitions) {
-  checkState(getTopics().isEmpty(), "Only topics or topicPartitions can be 
set, not both");
-  return 
toBuilder().setTopicPartitions(ImmutableList.copyOf(topicPartitions)).build();
+  return withTopicPartitions(
+  
ValueProvider.StaticValueProvider.of(Joiner.on(',').join(topicPartitions)));
+}
+
+/**
+ * Sets a list of partitions to read from. Partitions are provided as a 
comma separated list of
+ * Strings in the format: topic1-partition1,topic1-partition2...
+ * This allows reading only a subset of partitions for one or more topics 
when (if ever) needed.
+ *
+ * See {@link KafkaUnboundedSource#split(int, PipelineOptions)} for 
description
+ * of how the partitions are distributed among the splits.
+ */
+public Read withTopicPartitions(String topicPartitions) {
+  return 
withTopicPartitions(ValueProvider.StaticValueProvider.of(topicPartitions));
+}
+
+/**
+ * Like above but with a {@link ValueProvider ValueProvider}.
+ */
+public Read withTopicPartitions(ValueProvider 
topicPartitions) {
+  checkState(getTopics().get().isEmpty(),
+  "Only topics or topicPartitions can be set, not both");
+  return toBuilder().setTopicPartitions(ValueProvider
+  .NestedValueProvider.of(topicPartitions, new 
TopicPartitionTranslator())).build();
+}
+
+/**
+ * Used to build a {@link ValueProvider} for {@link List 
List}.
+ */
+private static class TopicTranslator implements 
SerializableFunction> {
+  @Override
+  public List apply(String topics) {
+return ImmutableList.copyOf(
+
Splitter.on(',').trimResults().omitEmptyStrings().splitToList(topics));
+  }
+
+}
+
+/**
+ * Used to build a {@link ValueProvider} for {@link List 
List}.
+ */
+private static class TopicPartitionTranslator
+implements SerializableFunction> {
+  @Override
+  public List apply(String topicPartitions) {
+List topicPartitionList = new ArrayList<>();
+for (String topicPartition: 
Splitter.on(',').trimResults().omitEmptyStrings()
+  .splitToList(topicPartitions)) {
+  topicPartitionList
+  .add(new 
TopicPartition(Splitter.on('-').splitToList(topicPartition).get(0),
+  
Integer.parseInt(Splitter.on('-').splitToList(topicPartition).get(1;
+}
+  return ImmutableList.copyOf(topicPartitionList);
+  }
+}
+
+/**
+ * Sets a Kafka {@link Deserializer Deserializer} for 
interpreting key bytes read.
+ * This uses the {@link String} provided to set the Deserializer
+ */
+public Read withKeyDeserializer(String keyDeserializer) {
 
 Review comment:
   I have renamed withKeyDeserializerClass to withKeyDeserializerClassName


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 95128)
Time Spent: 2h 40m  (was: 2.5h)

> Allow ValueProvider for KafkaIO
> ---
>
> Key: BEAM-3925
> URL: https://issues.apache.org/jira/browse/BEAM-3925
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core
>Reporter: Sameer Abhyankar
>Assignee: Pramod Upamanyu
>Priority: Major
>  Time Spent: 2h 40m
>  Remaining Estimate: 0h
>
> Add ValueProvider support for the various methods in KafkaIO. This would 
> allow us to use KafkaIO in reusable pipeline templates.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3925) Allow ValueProvider for KafkaIO

2018-04-24 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3925?focusedWorklogId=94802&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-94802
 ]

ASF GitHub Bot logged work on BEAM-3925:


Author: ASF GitHub Bot
Created on: 24/Apr/18 21:40
Start Date: 24/Apr/18 21:40
Worklog Time Spent: 10m 
  Work Description: jkff commented on issue #5141: [BEAM-3925] Allow 
ValueProvider for KafkaIO so that we can create Beam Templates using KafkaIO
URL: https://github.com/apache/beam/pull/5141#issuecomment-384088873
 
 
   (passing review back to @rangadi - don't have enough time to devote to this 
PR myself right now)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 94802)
Time Spent: 2.5h  (was: 2h 20m)

> Allow ValueProvider for KafkaIO
> ---
>
> Key: BEAM-3925
> URL: https://issues.apache.org/jira/browse/BEAM-3925
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core
>Reporter: Sameer Abhyankar
>Assignee: Pramod Upamanyu
>Priority: Major
>  Time Spent: 2.5h
>  Remaining Estimate: 0h
>
> Add ValueProvider support for the various methods in KafkaIO. This would 
> allow us to use KafkaIO in reusable pipeline templates.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3925) Allow ValueProvider for KafkaIO

2018-04-24 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3925?focusedWorklogId=94761&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-94761
 ]

ASF GitHub Bot logged work on BEAM-3925:


Author: ASF GitHub Bot
Created on: 24/Apr/18 19:06
Start Date: 24/Apr/18 19:06
Worklog Time Spent: 10m 
  Work Description: pupamanyu commented on a change in pull request #5141: 
[BEAM-3925] Allow ValueProvider for KafkaIO so that we can create Beam 
Templates using KafkaIO
URL: https://github.com/apache/beam/pull/5141#discussion_r183846804
 
 

 ##
 File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
 ##
 @@ -359,20 +386,129 @@
  * of how the partitions are distributed among the splits.
  */
 public Read withTopicPartitions(List 
topicPartitions) {
-  checkState(getTopics().isEmpty(), "Only topics or topicPartitions can be 
set, not both");
-  return 
toBuilder().setTopicPartitions(ImmutableList.copyOf(topicPartitions)).build();
+  return withTopicPartitions(
+  
ValueProvider.StaticValueProvider.of(Joiner.on(',').join(topicPartitions)));
+}
+
+/**
+ * Sets a list of partitions to read from. Partitions are provided as a 
comma separated list of
+ * Strings in the format: topic1-partition1,topic1-partition2...
+ * This allows reading only a subset of partitions for one or more topics 
when (if ever) needed.
+ *
+ * See {@link KafkaUnboundedSource#split(int, PipelineOptions)} for 
description
+ * of how the partitions are distributed among the splits.
+ */
+public Read withTopicPartitions(String topicPartitions) {
 
 Review comment:
   I have removed the code which handles the parsing of String representation 
of Topic Partition for now and added 
withTopoicPartitions(ValueProvider> partitions) so that we 
can use this for templates


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 94761)
Time Spent: 2h 10m  (was: 2h)

> Allow ValueProvider for KafkaIO
> ---
>
> Key: BEAM-3925
> URL: https://issues.apache.org/jira/browse/BEAM-3925
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core
>Reporter: Sameer Abhyankar
>Assignee: Pramod Upamanyu
>Priority: Major
>  Time Spent: 2h 10m
>  Remaining Estimate: 0h
>
> Add ValueProvider support for the various methods in KafkaIO. This would 
> allow us to use KafkaIO in reusable pipeline templates.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3925) Allow ValueProvider for KafkaIO

2018-04-24 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3925?focusedWorklogId=94763&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-94763
 ]

ASF GitHub Bot logged work on BEAM-3925:


Author: ASF GitHub Bot
Created on: 24/Apr/18 19:06
Start Date: 24/Apr/18 19:06
Worklog Time Spent: 10m 
  Work Description: pupamanyu commented on a change in pull request #5141: 
[BEAM-3925] Allow ValueProvider for KafkaIO so that we can create Beam 
Templates using KafkaIO
URL: https://github.com/apache/beam/pull/5141#discussion_r183846958
 
 

 ##
 File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
 ##
 @@ -335,22 +346,38 @@
  * of how the partitions are distributed among the splits.
  */
 public Read withTopic(String topic) {
-  return withTopics(ImmutableList.of(topic));
+  return withTopic(ValueProvider.StaticValueProvider.of(topic));
 }
 
 /**
- * Sets a list of topics to read from. All the partitions from each
- * of the topics are read.
+ * Like above but with a {@link ValueProvider ValueProvider}.
+ */
+public Read withTopic(ValueProvider topic) {
+  return withTopics(topic);
+}
+
+/**
+ * Sets a list of topics to read from with a {@link List 
List}.
+ * All the partitions from each of the topics are read.
  *
  * See {@link KafkaUnboundedSource#split(int, PipelineOptions)} for 
description
  * of how the partitions are distributed among the splits.
  */
 public Read withTopics(List topics) {
-  checkState(
-  getTopicPartitions().isEmpty(), "Only topics or topicPartitions can 
be set, not both");
-  return toBuilder().setTopics(ImmutableList.copyOf(topics)).build();
+  return 
withTopics(ValueProvider.StaticValueProvider.of(Joiner.on(',').join(topics)));
 }
 
+/**
+ * Like above but with a {@link ValueProvider ValueProvider}.
+ * The format is comma separated String of topics
+ */
+public Read withTopics(ValueProvider topics) {
+  checkState(
+  getTopicPartitions().get().isEmpty(),
 
 Review comment:
   I have changed the getTopicPartitions().get().isEmpty() to 
getTopicPartitions() == null


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 94763)
Time Spent: 2h 20m  (was: 2h 10m)

> Allow ValueProvider for KafkaIO
> ---
>
> Key: BEAM-3925
> URL: https://issues.apache.org/jira/browse/BEAM-3925
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core
>Reporter: Sameer Abhyankar
>Assignee: Pramod Upamanyu
>Priority: Major
>  Time Spent: 2h 20m
>  Remaining Estimate: 0h
>
> Add ValueProvider support for the various methods in KafkaIO. This would 
> allow us to use KafkaIO in reusable pipeline templates.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3925) Allow ValueProvider for KafkaIO

2018-04-24 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3925?focusedWorklogId=94757&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-94757
 ]

ASF GitHub Bot logged work on BEAM-3925:


Author: ASF GitHub Bot
Created on: 24/Apr/18 18:57
Start Date: 24/Apr/18 18:57
Worklog Time Spent: 10m 
  Work Description: pupamanyu commented on a change in pull request #5141: 
[BEAM-3925] Allow ValueProvider for KafkaIO so that we can create Beam 
Templates using KafkaIO
URL: https://github.com/apache/beam/pull/5141#discussion_r183844484
 
 

 ##
 File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
 ##
 @@ -359,20 +386,129 @@
  * of how the partitions are distributed among the splits.
  */
 public Read withTopicPartitions(List 
topicPartitions) {
-  checkState(getTopics().isEmpty(), "Only topics or topicPartitions can be 
set, not both");
-  return 
toBuilder().setTopicPartitions(ImmutableList.copyOf(topicPartitions)).build();
+  return withTopicPartitions(
+  
ValueProvider.StaticValueProvider.of(Joiner.on(',').join(topicPartitions)));
+}
+
+/**
+ * Sets a list of partitions to read from. Partitions are provided as a 
comma separated list of
+ * Strings in the format: topic1-partition1,topic1-partition2...
+ * This allows reading only a subset of partitions for one or more topics 
when (if ever) needed.
+ *
+ * See {@link KafkaUnboundedSource#split(int, PipelineOptions)} for 
description
+ * of how the partitions are distributed among the splits.
+ */
+public Read withTopicPartitions(String topicPartitions) {
+  return 
withTopicPartitions(ValueProvider.StaticValueProvider.of(topicPartitions));
+}
+
+/**
+ * Like above but with a {@link ValueProvider ValueProvider}.
+ */
+public Read withTopicPartitions(ValueProvider 
topicPartitions) {
+  checkState(getTopics().get().isEmpty(),
+  "Only topics or topicPartitions can be set, not both");
+  return toBuilder().setTopicPartitions(ValueProvider
+  .NestedValueProvider.of(topicPartitions, new 
TopicPartitionTranslator())).build();
+}
+
+/**
+ * Used to build a {@link ValueProvider} for {@link List 
List}.
+ */
+private static class TopicTranslator implements 
SerializableFunction> {
+  @Override
+  public List apply(String topics) {
+return ImmutableList.copyOf(
+
Splitter.on(',').trimResults().omitEmptyStrings().splitToList(topics));
+  }
+
+}
+
+/**
+ * Used to build a {@link ValueProvider} for {@link List 
List}.
+ */
+private static class TopicPartitionTranslator
+implements SerializableFunction> {
+  @Override
+  public List apply(String topicPartitions) {
+List topicPartitionList = new ArrayList<>();
+for (String topicPartition: 
Splitter.on(',').trimResults().omitEmptyStrings()
+  .splitToList(topicPartitions)) {
+  topicPartitionList
+  .add(new 
TopicPartition(Splitter.on('-').splitToList(topicPartition).get(0),
+  
Integer.parseInt(Splitter.on('-').splitToList(topicPartition).get(1;
+}
+  return ImmutableList.copyOf(topicPartitionList);
+  }
+}
+
+/**
+ * Sets a Kafka {@link Deserializer Deserializer} for 
interpreting key bytes read.
+ * This uses the {@link String} provided to set the Deserializer
+ */
+public Read withKeyDeserializer(String keyDeserializer) {
+  return 
withKeyDeserializer(ValueProvider.StaticValueProvider.of(keyDeserializer));
+}
+
+/**
+ * Like above but with a {@link ValueProvider ValueProvider}.
+ */
+public Read withKeyDeserializer(ValueProvider 
keyDeserializer) {
+  return toBuilder().setKeyDeserializer(ValueProvider
+  .NestedValueProvider.of(keyDeserializer, new 
KeyDeserializerTranslator())).build();
 }
 
 /**
- * Sets a Kafka {@link Deserializer} to interpret key bytes read from 
Kafka.
+ * Sets a Kafka {@link Deserializer Deserializer} to interpret 
key bytes read.
  *
  * In addition, Beam also needs a {@link Coder} to serialize and 
deserialize key objects at
  * runtime. KafkaIO tries to infer a coder for the key based on the {@link 
Deserializer} class,
  * however in case that fails, you can use {@link 
#withKeyDeserializerAndCoder(Class, Coder)} to
  * provide the key coder explicitly.
  */
 public Read withKeyDeserializer(Class> 
keyDeserializer) {
-  return toBuilder().setKeyDeserializer(keyDeserializer).build();
+  return toBuilder().setKeyDeserializer(
+  ValueProvider.StaticValueProvider.of(keyDeserializer)).build();
+}
+
+/**
+ * Used to build a {@link ValueProvider} for {@link Deserializer 

[jira] [Work logged] (BEAM-3925) Allow ValueProvider for KafkaIO

2018-04-24 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3925?focusedWorklogId=94518&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-94518
 ]

ASF GitHub Bot logged work on BEAM-3925:


Author: ASF GitHub Bot
Created on: 24/Apr/18 08:16
Start Date: 24/Apr/18 08:16
Worklog Time Spent: 10m 
  Work Description: pupamanyu commented on a change in pull request #5141: 
[BEAM-3925] Allow ValueProvider for KafkaIO so that we can create Beam 
Templates using KafkaIO
URL: https://github.com/apache/beam/pull/5141#discussion_r183641697
 
 

 ##
 File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
 ##
 @@ -359,20 +386,129 @@
  * of how the partitions are distributed among the splits.
  */
 public Read withTopicPartitions(List 
topicPartitions) {
-  checkState(getTopics().isEmpty(), "Only topics or topicPartitions can be 
set, not both");
-  return 
toBuilder().setTopicPartitions(ImmutableList.copyOf(topicPartitions)).build();
+  return withTopicPartitions(
+  
ValueProvider.StaticValueProvider.of(Joiner.on(',').join(topicPartitions)));
+}
+
+/**
+ * Sets a list of partitions to read from. Partitions are provided as a 
comma separated list of
+ * Strings in the format: topic1-partition1,topic1-partition2...
+ * This allows reading only a subset of partitions for one or more topics 
when (if ever) needed.
+ *
+ * See {@link KafkaUnboundedSource#split(int, PipelineOptions)} for 
description
+ * of how the partitions are distributed among the splits.
+ */
+public Read withTopicPartitions(String topicPartitions) {
+  return 
withTopicPartitions(ValueProvider.StaticValueProvider.of(topicPartitions));
+}
+
+/**
+ * Like above but with a {@link ValueProvider ValueProvider}.
+ */
+public Read withTopicPartitions(ValueProvider 
topicPartitions) {
+  checkState(getTopics().get().isEmpty(),
+  "Only topics or topicPartitions can be set, not both");
+  return toBuilder().setTopicPartitions(ValueProvider
+  .NestedValueProvider.of(topicPartitions, new 
TopicPartitionTranslator())).build();
+}
+
+/**
+ * Used to build a {@link ValueProvider} for {@link List 
List}.
+ */
+private static class TopicTranslator implements 
SerializableFunction> {
+  @Override
+  public List apply(String topics) {
+return ImmutableList.copyOf(
+
Splitter.on(',').trimResults().omitEmptyStrings().splitToList(topics));
+  }
+
+}
+
+/**
+ * Used to build a {@link ValueProvider} for {@link List 
List}.
+ */
+private static class TopicPartitionTranslator
+implements SerializableFunction> {
+  @Override
+  public List apply(String topicPartitions) {
+List topicPartitionList = new ArrayList<>();
+for (String topicPartition: 
Splitter.on(',').trimResults().omitEmptyStrings()
+  .splitToList(topicPartitions)) {
+  topicPartitionList
+  .add(new 
TopicPartition(Splitter.on('-').splitToList(topicPartition).get(0),
+  
Integer.parseInt(Splitter.on('-').splitToList(topicPartition).get(1;
+}
+  return ImmutableList.copyOf(topicPartitionList);
+  }
+}
+
+/**
+ * Sets a Kafka {@link Deserializer Deserializer} for 
interpreting key bytes read.
+ * This uses the {@link String} provided to set the Deserializer
+ */
+public Read withKeyDeserializer(String keyDeserializer) {
+  return 
withKeyDeserializer(ValueProvider.StaticValueProvider.of(keyDeserializer));
+}
+
+/**
+ * Like above but with a {@link ValueProvider ValueProvider}.
+ */
+public Read withKeyDeserializer(ValueProvider 
keyDeserializer) {
+  return toBuilder().setKeyDeserializer(ValueProvider
+  .NestedValueProvider.of(keyDeserializer, new 
KeyDeserializerTranslator())).build();
 }
 
 /**
- * Sets a Kafka {@link Deserializer} to interpret key bytes read from 
Kafka.
+ * Sets a Kafka {@link Deserializer Deserializer} to interpret 
key bytes read.
  *
  * In addition, Beam also needs a {@link Coder} to serialize and 
deserialize key objects at
  * runtime. KafkaIO tries to infer a coder for the key based on the {@link 
Deserializer} class,
  * however in case that fails, you can use {@link 
#withKeyDeserializerAndCoder(Class, Coder)} to
  * provide the key coder explicitly.
  */
 public Read withKeyDeserializer(Class> 
keyDeserializer) {
-  return toBuilder().setKeyDeserializer(keyDeserializer).build();
+  return toBuilder().setKeyDeserializer(
+  ValueProvider.StaticValueProvider.of(keyDeserializer)).build();
+}
+
+/**
+ * Used to build a {@link ValueProvider} for {@link Deserializer 

[jira] [Work logged] (BEAM-3925) Allow ValueProvider for KafkaIO

2018-04-23 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3925?focusedWorklogId=94481&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-94481
 ]

ASF GitHub Bot logged work on BEAM-3925:


Author: ASF GitHub Bot
Created on: 24/Apr/18 06:46
Start Date: 24/Apr/18 06:46
Worklog Time Spent: 10m 
  Work Description: pupamanyu commented on a change in pull request #5141: 
[BEAM-3925] Allow ValueProvider for KafkaIO so that we can create Beam 
Templates using KafkaIO
URL: https://github.com/apache/beam/pull/5141#discussion_r183619623
 
 

 ##
 File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
 ##
 @@ -359,20 +386,129 @@
  * of how the partitions are distributed among the splits.
  */
 public Read withTopicPartitions(List 
topicPartitions) {
-  checkState(getTopics().isEmpty(), "Only topics or topicPartitions can be 
set, not both");
-  return 
toBuilder().setTopicPartitions(ImmutableList.copyOf(topicPartitions)).build();
+  return withTopicPartitions(
+  
ValueProvider.StaticValueProvider.of(Joiner.on(',').join(topicPartitions)));
+}
+
+/**
+ * Sets a list of partitions to read from. Partitions are provided as a 
comma separated list of
+ * Strings in the format: topic1-partition1,topic1-partition2...
+ * This allows reading only a subset of partitions for one or more topics 
when (if ever) needed.
+ *
+ * See {@link KafkaUnboundedSource#split(int, PipelineOptions)} for 
description
+ * of how the partitions are distributed among the splits.
+ */
+public Read withTopicPartitions(String topicPartitions) {
+  return 
withTopicPartitions(ValueProvider.StaticValueProvider.of(topicPartitions));
+}
+
+/**
+ * Like above but with a {@link ValueProvider ValueProvider}.
+ */
+public Read withTopicPartitions(ValueProvider 
topicPartitions) {
+  checkState(getTopics().get().isEmpty(),
+  "Only topics or topicPartitions can be set, not both");
+  return toBuilder().setTopicPartitions(ValueProvider
+  .NestedValueProvider.of(topicPartitions, new 
TopicPartitionTranslator())).build();
+}
+
+/**
+ * Used to build a {@link ValueProvider} for {@link List 
List}.
+ */
+private static class TopicTranslator implements 
SerializableFunction> {
+  @Override
+  public List apply(String topics) {
+return ImmutableList.copyOf(
+
Splitter.on(',').trimResults().omitEmptyStrings().splitToList(topics));
+  }
+
+}
+
+/**
+ * Used to build a {@link ValueProvider} for {@link List 
List}.
+ */
+private static class TopicPartitionTranslator
+implements SerializableFunction> {
+  @Override
+  public List apply(String topicPartitions) {
+List topicPartitionList = new ArrayList<>();
+for (String topicPartition: 
Splitter.on(',').trimResults().omitEmptyStrings()
+  .splitToList(topicPartitions)) {
+  topicPartitionList
+  .add(new 
TopicPartition(Splitter.on('-').splitToList(topicPartition).get(0),
+  
Integer.parseInt(Splitter.on('-').splitToList(topicPartition).get(1;
+}
+  return ImmutableList.copyOf(topicPartitionList);
+  }
+}
+
+/**
+ * Sets a Kafka {@link Deserializer Deserializer} for 
interpreting key bytes read.
+ * This uses the {@link String} provided to set the Deserializer
+ */
+public Read withKeyDeserializer(String keyDeserializer) {
 
 Review comment:
   This is used outside of KafkaIO. Not sure if renaming this may cause issues


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 94481)
Time Spent: 1h 40m  (was: 1.5h)

> Allow ValueProvider for KafkaIO
> ---
>
> Key: BEAM-3925
> URL: https://issues.apache.org/jira/browse/BEAM-3925
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core
>Reporter: Sameer Abhyankar
>Assignee: Pramod Upamanyu
>Priority: Major
>  Time Spent: 1h 40m
>  Remaining Estimate: 0h
>
> Add ValueProvider support for the various methods in KafkaIO. This would 
> allow us to use KafkaIO in reusable pipeline templates.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3925) Allow ValueProvider for KafkaIO

2018-04-20 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3925?focusedWorklogId=93366&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-93366
 ]

ASF GitHub Bot logged work on BEAM-3925:


Author: ASF GitHub Bot
Created on: 20/Apr/18 18:51
Start Date: 20/Apr/18 18:51
Worklog Time Spent: 10m 
  Work Description: jkff commented on a change in pull request #5141: 
[BEAM-3925] Allow ValueProvider for KafkaIO so that we can create Beam 
Templates using KafkaIO
URL: https://github.com/apache/beam/pull/5141#discussion_r183140040
 
 

 ##
 File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
 ##
 @@ -870,25 +1092,133 @@ private KafkaIO() {}
  * determine partition in Kafka (see {@link ProducerRecord} for more 
details).
  */
 public Write withKeySerializer(Class> 
keySerializer) {
-  return toBuilder().setKeySerializer(keySerializer).build();
+  return toBuilder()
+  
.setKeySerializer(ValueProvider.StaticValueProvider.of(keySerializer)).build();
 }
 
 /**
- * Sets a {@link Serializer} for serializing value to bytes.
+ * Sets a {@link Serializer} for serializing key (if any) to bytes using a 
String.
+ */
+public Write withKeySerializer(String keySerializer) {
+  return 
withKeySerializer(ValueProvider.StaticValueProvider.of(keySerializer));
+}
+
+/**
+ * Like above but with a {@link ValueProvider ValueProvider}.
+ */
+public Write withKeySerializer(ValueProvider keySerializer) {
+  return toBuilder().setKeySerializer(ValueProvider.
+  NestedValueProvider.of(keySerializer, new 
SerializerKeyTranslator())).build();
+}
+
+/**
+ * Sets a {@link Serializer Serializer} for serializing value to 
bytes.
  */
 public Write withValueSerializer(Class> 
valueSerializer) {
-  return toBuilder().setValueSerializer(valueSerializer).build();
+  return toBuilder().setValueSerializer(
+  ValueProvider.StaticValueProvider.of(valueSerializer)).build();
+}
+
+/**
+ * Like above but with a class name provided as a {@link String}.
+ */
+public Write withValueSerializer(String valueSerializer) {
+  return 
withValueSerializer(ValueProvider.StaticValueProvider.of(valueSerializer));
+}
+
+/**
+ * Like above but with a {@link ValueProvider ValueProvider}.
+ */
+public Write withValueSerializer(ValueProvider 
valueSerializer) {
+  return toBuilder().setValueSerializer(ValueProvider
+  .NestedValueProvider.of(valueSerializer, new 
SerializerValueTranslator())).build();
+}
+
+/**
+ * Used to build a {@link ValueProvider} for {@link Serializer 
Serializer}.
+ */
+private class SerializerKeyTranslator
+implements SerializableFunction>> {
+  @SuppressWarnings("unchecked")
+  @Override
+  public Class apply(String serializer) {
 
 Review comment:
   These are all also redundant with the deserializer code.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 93366)
Time Spent: 1.5h  (was: 1h 20m)

> Allow ValueProvider for KafkaIO
> ---
>
> Key: BEAM-3925
> URL: https://issues.apache.org/jira/browse/BEAM-3925
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core
>Reporter: Sameer Abhyankar
>Assignee: Pramod Upamanyu
>Priority: Major
>  Time Spent: 1.5h
>  Remaining Estimate: 0h
>
> Add ValueProvider support for the various methods in KafkaIO. This would 
> allow us to use KafkaIO in reusable pipeline templates.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3925) Allow ValueProvider for KafkaIO

2018-04-20 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3925?focusedWorklogId=93364&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-93364
 ]

ASF GitHub Bot logged work on BEAM-3925:


Author: ASF GitHub Bot
Created on: 20/Apr/18 18:51
Start Date: 20/Apr/18 18:51
Worklog Time Spent: 10m 
  Work Description: jkff commented on a change in pull request #5141: 
[BEAM-3925] Allow ValueProvider for KafkaIO so that we can create Beam 
Templates using KafkaIO
URL: https://github.com/apache/beam/pull/5141#discussion_r183139553
 
 

 ##
 File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
 ##
 @@ -359,20 +386,129 @@
  * of how the partitions are distributed among the splits.
  */
 public Read withTopicPartitions(List 
topicPartitions) {
-  checkState(getTopics().isEmpty(), "Only topics or topicPartitions can be 
set, not both");
-  return 
toBuilder().setTopicPartitions(ImmutableList.copyOf(topicPartitions)).build();
+  return withTopicPartitions(
+  
ValueProvider.StaticValueProvider.of(Joiner.on(',').join(topicPartitions)));
+}
+
+/**
+ * Sets a list of partitions to read from. Partitions are provided as a 
comma separated list of
+ * Strings in the format: topic1-partition1,topic1-partition2...
+ * This allows reading only a subset of partitions for one or more topics 
when (if ever) needed.
+ *
+ * See {@link KafkaUnboundedSource#split(int, PipelineOptions)} for 
description
+ * of how the partitions are distributed among the splits.
+ */
+public Read withTopicPartitions(String topicPartitions) {
+  return 
withTopicPartitions(ValueProvider.StaticValueProvider.of(topicPartitions));
+}
+
+/**
+ * Like above but with a {@link ValueProvider ValueProvider}.
+ */
+public Read withTopicPartitions(ValueProvider 
topicPartitions) {
+  checkState(getTopics().get().isEmpty(),
+  "Only topics or topicPartitions can be set, not both");
+  return toBuilder().setTopicPartitions(ValueProvider
+  .NestedValueProvider.of(topicPartitions, new 
TopicPartitionTranslator())).build();
+}
+
+/**
+ * Used to build a {@link ValueProvider} for {@link List 
List}.
+ */
+private static class TopicTranslator implements 
SerializableFunction> {
+  @Override
+  public List apply(String topics) {
+return ImmutableList.copyOf(
+
Splitter.on(',').trimResults().omitEmptyStrings().splitToList(topics));
+  }
+
+}
+
+/**
+ * Used to build a {@link ValueProvider} for {@link List 
List}.
+ */
+private static class TopicPartitionTranslator
+implements SerializableFunction> {
+  @Override
+  public List apply(String topicPartitions) {
+List topicPartitionList = new ArrayList<>();
+for (String topicPartition: 
Splitter.on(',').trimResults().omitEmptyStrings()
+  .splitToList(topicPartitions)) {
+  topicPartitionList
+  .add(new 
TopicPartition(Splitter.on('-').splitToList(topicPartition).get(0),
+  
Integer.parseInt(Splitter.on('-').splitToList(topicPartition).get(1;
+}
+  return ImmutableList.copyOf(topicPartitionList);
+  }
+}
+
+/**
+ * Sets a Kafka {@link Deserializer Deserializer} for 
interpreting key bytes read.
+ * This uses the {@link String} provided to set the Deserializer
+ */
+public Read withKeyDeserializer(String keyDeserializer) {
 
 Review comment:
   Rename this to something like `withKeyDeserializerClassName`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 93364)
Time Spent: 1h 20m  (was: 1h 10m)

> Allow ValueProvider for KafkaIO
> ---
>
> Key: BEAM-3925
> URL: https://issues.apache.org/jira/browse/BEAM-3925
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core
>Reporter: Sameer Abhyankar
>Assignee: Pramod Upamanyu
>Priority: Major
>  Time Spent: 1h 20m
>  Remaining Estimate: 0h
>
> Add ValueProvider support for the various methods in KafkaIO. This would 
> allow us to use KafkaIO in reusable pipeline templates.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3925) Allow ValueProvider for KafkaIO

2018-04-20 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3925?focusedWorklogId=93361&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-93361
 ]

ASF GitHub Bot logged work on BEAM-3925:


Author: ASF GitHub Bot
Created on: 20/Apr/18 18:51
Start Date: 20/Apr/18 18:51
Worklog Time Spent: 10m 
  Work Description: jkff commented on a change in pull request #5141: 
[BEAM-3925] Allow ValueProvider for KafkaIO so that we can create Beam 
Templates using KafkaIO
URL: https://github.com/apache/beam/pull/5141#discussion_r183139351
 
 

 ##
 File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
 ##
 @@ -359,20 +386,129 @@
  * of how the partitions are distributed among the splits.
  */
 public Read withTopicPartitions(List 
topicPartitions) {
-  checkState(getTopics().isEmpty(), "Only topics or topicPartitions can be 
set, not both");
-  return 
toBuilder().setTopicPartitions(ImmutableList.copyOf(topicPartitions)).build();
+  return withTopicPartitions(
+  
ValueProvider.StaticValueProvider.of(Joiner.on(',').join(topicPartitions)));
+}
+
+/**
+ * Sets a list of partitions to read from. Partitions are provided as a 
comma separated list of
+ * Strings in the format: topic1-partition1,topic1-partition2...
+ * This allows reading only a subset of partitions for one or more topics 
when (if ever) needed.
+ *
+ * See {@link KafkaUnboundedSource#split(int, PipelineOptions)} for 
description
+ * of how the partitions are distributed among the splits.
+ */
+public Read withTopicPartitions(String topicPartitions) {
 
 Review comment:
   Similar comment here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 93361)
Time Spent: 1h  (was: 50m)

> Allow ValueProvider for KafkaIO
> ---
>
> Key: BEAM-3925
> URL: https://issues.apache.org/jira/browse/BEAM-3925
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core
>Reporter: Sameer Abhyankar
>Assignee: Pramod Upamanyu
>Priority: Major
>  Time Spent: 1h
>  Remaining Estimate: 0h
>
> Add ValueProvider support for the various methods in KafkaIO. This would 
> allow us to use KafkaIO in reusable pipeline templates.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3925) Allow ValueProvider for KafkaIO

2018-04-20 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3925?focusedWorklogId=93362&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-93362
 ]

ASF GitHub Bot logged work on BEAM-3925:


Author: ASF GitHub Bot
Created on: 20/Apr/18 18:51
Start Date: 20/Apr/18 18:51
Worklog Time Spent: 10m 
  Work Description: jkff commented on a change in pull request #5141: 
[BEAM-3925] Allow ValueProvider for KafkaIO so that we can create Beam 
Templates using KafkaIO
URL: https://github.com/apache/beam/pull/5141#discussion_r183139852
 
 

 ##
 File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
 ##
 @@ -359,20 +386,129 @@
  * of how the partitions are distributed among the splits.
  */
 public Read withTopicPartitions(List 
topicPartitions) {
-  checkState(getTopics().isEmpty(), "Only topics or topicPartitions can be 
set, not both");
-  return 
toBuilder().setTopicPartitions(ImmutableList.copyOf(topicPartitions)).build();
+  return withTopicPartitions(
+  
ValueProvider.StaticValueProvider.of(Joiner.on(',').join(topicPartitions)));
+}
+
+/**
+ * Sets a list of partitions to read from. Partitions are provided as a 
comma separated list of
+ * Strings in the format: topic1-partition1,topic1-partition2...
+ * This allows reading only a subset of partitions for one or more topics 
when (if ever) needed.
+ *
+ * See {@link KafkaUnboundedSource#split(int, PipelineOptions)} for 
description
+ * of how the partitions are distributed among the splits.
+ */
+public Read withTopicPartitions(String topicPartitions) {
+  return 
withTopicPartitions(ValueProvider.StaticValueProvider.of(topicPartitions));
+}
+
+/**
+ * Like above but with a {@link ValueProvider ValueProvider}.
+ */
+public Read withTopicPartitions(ValueProvider 
topicPartitions) {
+  checkState(getTopics().get().isEmpty(),
+  "Only topics or topicPartitions can be set, not both");
+  return toBuilder().setTopicPartitions(ValueProvider
+  .NestedValueProvider.of(topicPartitions, new 
TopicPartitionTranslator())).build();
+}
+
+/**
+ * Used to build a {@link ValueProvider} for {@link List 
List}.
+ */
+private static class TopicTranslator implements 
SerializableFunction> {
+  @Override
+  public List apply(String topics) {
+return ImmutableList.copyOf(
+
Splitter.on(',').trimResults().omitEmptyStrings().splitToList(topics));
+  }
+
+}
+
+/**
+ * Used to build a {@link ValueProvider} for {@link List 
List}.
+ */
+private static class TopicPartitionTranslator
+implements SerializableFunction> {
+  @Override
+  public List apply(String topicPartitions) {
+List topicPartitionList = new ArrayList<>();
+for (String topicPartition: 
Splitter.on(',').trimResults().omitEmptyStrings()
+  .splitToList(topicPartitions)) {
+  topicPartitionList
+  .add(new 
TopicPartition(Splitter.on('-').splitToList(topicPartition).get(0),
+  
Integer.parseInt(Splitter.on('-').splitToList(topicPartition).get(1;
+}
+  return ImmutableList.copyOf(topicPartitionList);
+  }
+}
+
+/**
+ * Sets a Kafka {@link Deserializer Deserializer} for 
interpreting key bytes read.
+ * This uses the {@link String} provided to set the Deserializer
+ */
+public Read withKeyDeserializer(String keyDeserializer) {
+  return 
withKeyDeserializer(ValueProvider.StaticValueProvider.of(keyDeserializer));
+}
+
+/**
+ * Like above but with a {@link ValueProvider ValueProvider}.
+ */
+public Read withKeyDeserializer(ValueProvider 
keyDeserializer) {
+  return toBuilder().setKeyDeserializer(ValueProvider
+  .NestedValueProvider.of(keyDeserializer, new 
KeyDeserializerTranslator())).build();
 }
 
 /**
- * Sets a Kafka {@link Deserializer} to interpret key bytes read from 
Kafka.
+ * Sets a Kafka {@link Deserializer Deserializer} to interpret 
key bytes read.
  *
  * In addition, Beam also needs a {@link Coder} to serialize and 
deserialize key objects at
  * runtime. KafkaIO tries to infer a coder for the key based on the {@link 
Deserializer} class,
  * however in case that fails, you can use {@link 
#withKeyDeserializerAndCoder(Class, Coder)} to
  * provide the key coder explicitly.
  */
 public Read withKeyDeserializer(Class> 
keyDeserializer) {
-  return toBuilder().setKeyDeserializer(keyDeserializer).build();
+  return toBuilder().setKeyDeserializer(
+  ValueProvider.StaticValueProvider.of(keyDeserializer)).build();
+}
+
+/**
+ * Used to build a {@link ValueProvider} for {@link Deserializer 
Dese

[jira] [Work logged] (BEAM-3925) Allow ValueProvider for KafkaIO

2018-04-20 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3925?focusedWorklogId=93363&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-93363
 ]

ASF GitHub Bot logged work on BEAM-3925:


Author: ASF GitHub Bot
Created on: 20/Apr/18 18:51
Start Date: 20/Apr/18 18:51
Worklog Time Spent: 10m 
  Work Description: jkff commented on a change in pull request #5141: 
[BEAM-3925] Allow ValueProvider for KafkaIO so that we can create Beam 
Templates using KafkaIO
URL: https://github.com/apache/beam/pull/5141#discussion_r183139278
 
 

 ##
 File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
 ##
 @@ -335,22 +346,38 @@
  * of how the partitions are distributed among the splits.
  */
 public Read withTopic(String topic) {
-  return withTopics(ImmutableList.of(topic));
+  return withTopic(ValueProvider.StaticValueProvider.of(topic));
 }
 
 /**
- * Sets a list of topics to read from. All the partitions from each
- * of the topics are read.
+ * Like above but with a {@link ValueProvider ValueProvider}.
+ */
+public Read withTopic(ValueProvider topic) {
+  return withTopics(topic);
+}
+
+/**
+ * Sets a list of topics to read from with a {@link List 
List}.
+ * All the partitions from each of the topics are read.
  *
  * See {@link KafkaUnboundedSource#split(int, PipelineOptions)} for 
description
  * of how the partitions are distributed among the splits.
  */
 public Read withTopics(List topics) {
-  checkState(
-  getTopicPartitions().isEmpty(), "Only topics or topicPartitions can 
be set, not both");
-  return toBuilder().setTopics(ImmutableList.copyOf(topics)).build();
+  return 
withTopics(ValueProvider.StaticValueProvider.of(Joiner.on(',').join(topics)));
 }
 
+/**
+ * Like above but with a {@link ValueProvider ValueProvider}.
+ * The format is comma separated String of topics
+ */
+public Read withTopics(ValueProvider topics) {
+  checkState(
+  getTopicPartitions().get().isEmpty(),
 
 Review comment:
   Calling `.get()` here means that this can not be used with a runtime value 
provider at all which defeats the purpose of this change. `.get()` can only be 
called inside code that runs while the pipeline is running, but not at 
construction time - i.e. only inside code of `DoFn`s, effectively.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 93363)
Time Spent: 1h 10m  (was: 1h)

> Allow ValueProvider for KafkaIO
> ---
>
> Key: BEAM-3925
> URL: https://issues.apache.org/jira/browse/BEAM-3925
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core
>Reporter: Sameer Abhyankar
>Assignee: Pramod Upamanyu
>Priority: Major
>  Time Spent: 1h 10m
>  Remaining Estimate: 0h
>
> Add ValueProvider support for the various methods in KafkaIO. This would 
> allow us to use KafkaIO in reusable pipeline templates.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3925) Allow ValueProvider for KafkaIO

2018-04-20 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3925?focusedWorklogId=93365&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-93365
 ]

ASF GitHub Bot logged work on BEAM-3925:


Author: ASF GitHub Bot
Created on: 20/Apr/18 18:51
Start Date: 20/Apr/18 18:51
Worklog Time Spent: 10m 
  Work Description: jkff commented on a change in pull request #5141: 
[BEAM-3925] Allow ValueProvider for KafkaIO so that we can create Beam 
Templates using KafkaIO
URL: https://github.com/apache/beam/pull/5141#discussion_r183139657
 
 

 ##
 File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
 ##
 @@ -359,20 +386,129 @@
  * of how the partitions are distributed among the splits.
  */
 public Read withTopicPartitions(List 
topicPartitions) {
-  checkState(getTopics().isEmpty(), "Only topics or topicPartitions can be 
set, not both");
-  return 
toBuilder().setTopicPartitions(ImmutableList.copyOf(topicPartitions)).build();
+  return withTopicPartitions(
+  
ValueProvider.StaticValueProvider.of(Joiner.on(',').join(topicPartitions)));
+}
+
+/**
+ * Sets a list of partitions to read from. Partitions are provided as a 
comma separated list of
+ * Strings in the format: topic1-partition1,topic1-partition2...
+ * This allows reading only a subset of partitions for one or more topics 
when (if ever) needed.
+ *
+ * See {@link KafkaUnboundedSource#split(int, PipelineOptions)} for 
description
+ * of how the partitions are distributed among the splits.
+ */
+public Read withTopicPartitions(String topicPartitions) {
+  return 
withTopicPartitions(ValueProvider.StaticValueProvider.of(topicPartitions));
+}
+
+/**
+ * Like above but with a {@link ValueProvider ValueProvider}.
+ */
+public Read withTopicPartitions(ValueProvider 
topicPartitions) {
+  checkState(getTopics().get().isEmpty(),
+  "Only topics or topicPartitions can be set, not both");
+  return toBuilder().setTopicPartitions(ValueProvider
+  .NestedValueProvider.of(topicPartitions, new 
TopicPartitionTranslator())).build();
+}
+
+/**
+ * Used to build a {@link ValueProvider} for {@link List 
List}.
+ */
+private static class TopicTranslator implements 
SerializableFunction> {
+  @Override
+  public List apply(String topics) {
+return ImmutableList.copyOf(
+
Splitter.on(',').trimResults().omitEmptyStrings().splitToList(topics));
+  }
+
+}
+
+/**
+ * Used to build a {@link ValueProvider} for {@link List 
List}.
+ */
+private static class TopicPartitionTranslator
+implements SerializableFunction> {
+  @Override
+  public List apply(String topicPartitions) {
+List topicPartitionList = new ArrayList<>();
+for (String topicPartition: 
Splitter.on(',').trimResults().omitEmptyStrings()
+  .splitToList(topicPartitions)) {
+  topicPartitionList
+  .add(new 
TopicPartition(Splitter.on('-').splitToList(topicPartition).get(0),
+  
Integer.parseInt(Splitter.on('-').splitToList(topicPartition).get(1;
+}
+  return ImmutableList.copyOf(topicPartitionList);
+  }
+}
+
+/**
+ * Sets a Kafka {@link Deserializer Deserializer} for 
interpreting key bytes read.
+ * This uses the {@link String} provided to set the Deserializer
+ */
+public Read withKeyDeserializer(String keyDeserializer) {
+  return 
withKeyDeserializer(ValueProvider.StaticValueProvider.of(keyDeserializer));
+}
+
+/**
+ * Like above but with a {@link ValueProvider ValueProvider}.
+ */
+public Read withKeyDeserializer(ValueProvider 
keyDeserializer) {
+  return toBuilder().setKeyDeserializer(ValueProvider
+  .NestedValueProvider.of(keyDeserializer, new 
KeyDeserializerTranslator())).build();
 }
 
 /**
- * Sets a Kafka {@link Deserializer} to interpret key bytes read from 
Kafka.
+ * Sets a Kafka {@link Deserializer Deserializer} to interpret 
key bytes read.
  *
  * In addition, Beam also needs a {@link Coder} to serialize and 
deserialize key objects at
  * runtime. KafkaIO tries to infer a coder for the key based on the {@link 
Deserializer} class,
  * however in case that fails, you can use {@link 
#withKeyDeserializerAndCoder(Class, Coder)} to
  * provide the key coder explicitly.
  */
 public Read withKeyDeserializer(Class> 
keyDeserializer) {
-  return toBuilder().setKeyDeserializer(keyDeserializer).build();
+  return toBuilder().setKeyDeserializer(
+  ValueProvider.StaticValueProvider.of(keyDeserializer)).build();
+}
+
+/**
+ * Used to build a {@link ValueProvider} for {@link Deserializer 
Dese

[jira] [Work logged] (BEAM-3925) Allow ValueProvider for KafkaIO

2018-04-20 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3925?focusedWorklogId=93360&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-93360
 ]

ASF GitHub Bot logged work on BEAM-3925:


Author: ASF GitHub Bot
Created on: 20/Apr/18 18:51
Start Date: 20/Apr/18 18:51
Worklog Time Spent: 10m 
  Work Description: jkff commented on a change in pull request #5141: 
[BEAM-3925] Allow ValueProvider for KafkaIO so that we can create Beam 
Templates using KafkaIO
URL: https://github.com/apache/beam/pull/5141#discussion_r183139080
 
 

 ##
 File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
 ##
 @@ -335,22 +346,38 @@
  * of how the partitions are distributed among the splits.
  */
 public Read withTopic(String topic) {
-  return withTopics(ImmutableList.of(topic));
+  return withTopic(ValueProvider.StaticValueProvider.of(topic));
 }
 
 /**
- * Sets a list of topics to read from. All the partitions from each
- * of the topics are read.
+ * Like above but with a {@link ValueProvider ValueProvider}.
+ */
+public Read withTopic(ValueProvider topic) {
+  return withTopics(topic);
+}
+
+/**
+ * Sets a list of topics to read from with a {@link List 
List}.
+ * All the partitions from each of the topics are read.
  *
  * See {@link KafkaUnboundedSource#split(int, PipelineOptions)} for 
description
  * of how the partitions are distributed among the splits.
  */
 public Read withTopics(List topics) {
-  checkState(
-  getTopicPartitions().isEmpty(), "Only topics or topicPartitions can 
be set, not both");
-  return toBuilder().setTopics(ImmutableList.copyOf(topics)).build();
+  return 
withTopics(ValueProvider.StaticValueProvider.of(Joiner.on(',').join(topics)));
 }
 
+/**
+ * Like above but with a {@link ValueProvider ValueProvider}.
+ * The format is comma separated String of topics
 
 Review comment:
   Please do not use "string typing" (as opposed to "strong typing") - this has 
to be a `ValueProvider>`. That means that this value can not be 
directly passed as a command-line argument, only via a NestedValueProvider - 
please add a helper class somewhere that simplifies this (e.g. add a static 
utility method to `ValueProviders`, such as `ValueProvider> 
ValueProviders.commaSeparatedStrings(ValueProvider csv)`).


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 93360)
Time Spent: 1h  (was: 50m)

> Allow ValueProvider for KafkaIO
> ---
>
> Key: BEAM-3925
> URL: https://issues.apache.org/jira/browse/BEAM-3925
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core
>Reporter: Sameer Abhyankar
>Assignee: Pramod Upamanyu
>Priority: Major
>  Time Spent: 1h
>  Remaining Estimate: 0h
>
> Add ValueProvider support for the various methods in KafkaIO. This would 
> allow us to use KafkaIO in reusable pipeline templates.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3925) Allow ValueProvider for KafkaIO

2018-04-20 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3925?focusedWorklogId=93340&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-93340
 ]

ASF GitHub Bot logged work on BEAM-3925:


Author: ASF GitHub Bot
Created on: 20/Apr/18 18:23
Start Date: 20/Apr/18 18:23
Worklog Time Spent: 10m 
  Work Description: pupamanyu commented on a change in pull request #5141: 
[BEAM-3925] Allow ValueProvider for KafkaIO so that we can create Beam 
Templates using KafkaIO
URL: https://github.com/apache/beam/pull/5141#discussion_r183133104
 
 

 ##
 File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
 ##
 @@ -359,20 +386,129 @@
  * of how the partitions are distributed among the splits.
  */
 public Read withTopicPartitions(List 
topicPartitions) {
-  checkState(getTopics().isEmpty(), "Only topics or topicPartitions can be 
set, not both");
-  return 
toBuilder().setTopicPartitions(ImmutableList.copyOf(topicPartitions)).build();
+  return withTopicPartitions(
+  
ValueProvider.StaticValueProvider.of(Joiner.on(',').join(topicPartitions)));
+}
+
+/**
+ * Sets a list of partitions to read from. Partitions are provided as a 
comma separated list of
+ * Strings in the format: topic1-partition1,topic1-partition2...
+ * This allows reading only a subset of partitions for one or more topics 
when (if ever) needed.
+ *
+ * See {@link KafkaUnboundedSource#split(int, PipelineOptions)} for 
description
+ * of how the partitions are distributed among the splits.
+ */
+public Read withTopicPartitions(String topicPartitions) {
 
 Review comment:
   I thought of making it to parse JSON String instead of accepting a list of 
Hyphenated Strings. Can you please provide more details on the KafkaIOOptions 
Class?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 93340)
Time Spent: 50m  (was: 40m)

> Allow ValueProvider for KafkaIO
> ---
>
> Key: BEAM-3925
> URL: https://issues.apache.org/jira/browse/BEAM-3925
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core
>Reporter: Sameer Abhyankar
>Assignee: Pramod Upamanyu
>Priority: Major
>  Time Spent: 50m
>  Remaining Estimate: 0h
>
> Add ValueProvider support for the various methods in KafkaIO. This would 
> allow us to use KafkaIO in reusable pipeline templates.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3925) Allow ValueProvider for KafkaIO

2018-04-20 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3925?focusedWorklogId=93338&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-93338
 ]

ASF GitHub Bot logged work on BEAM-3925:


Author: ASF GitHub Bot
Created on: 20/Apr/18 18:17
Start Date: 20/Apr/18 18:17
Worklog Time Spent: 10m 
  Work Description: rangadi commented on a change in pull request #5141: 
[BEAM-3925] Allow ValueProvider for KafkaIO so that we can create Beam 
Templates using KafkaIO
URL: https://github.com/apache/beam/pull/5141#discussion_r183130577
 
 

 ##
 File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
 ##
 @@ -359,20 +386,129 @@
  * of how the partitions are distributed among the splits.
  */
 public Read withTopicPartitions(List 
topicPartitions) {
-  checkState(getTopics().isEmpty(), "Only topics or topicPartitions can be 
set, not both");
-  return 
toBuilder().setTopicPartitions(ImmutableList.copyOf(topicPartitions)).build();
+  return withTopicPartitions(
+  
ValueProvider.StaticValueProvider.of(Joiner.on(',').join(topicPartitions)));
+}
+
+/**
+ * Sets a list of partitions to read from. Partitions are provided as a 
comma separated list of
+ * Strings in the format: topic1-partition1,topic1-partition2...
+ * This allows reading only a subset of partitions for one or more topics 
when (if ever) needed.
+ *
+ * See {@link KafkaUnboundedSource#split(int, PipelineOptions)} for 
description
+ * of how the partitions are distributed among the splits.
+ */
+public Read withTopicPartitions(String topicPartitions) {
 
 Review comment:
   I don't think KafkaIO should to handle parsing of string representation of 
`TopicPartition` objects. E.g. what if topic names have '-'? If we want the 
user to provide topic-partition pairs in options, probably better to add a 
`KafkaIOOptions` class. We could support json respresentation to specify 
topic-partition pairs, for e.g.
   
   @jkff, could you take a look at the style here? e.g. in addition to 
`withTopoicPartitions(List partitions)`, the it adds:
- `withTopicPartitions(String partitions)` 
- `withTopicPartitions(ValueProvider)`
   
   Same for a few others.



This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 93338)
Time Spent: 40m  (was: 0.5h)

> Allow ValueProvider for KafkaIO
> ---
>
> Key: BEAM-3925
> URL: https://issues.apache.org/jira/browse/BEAM-3925
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core
>Reporter: Sameer Abhyankar
>Assignee: Pramod Upamanyu
>Priority: Major
>  Time Spent: 40m
>  Remaining Estimate: 0h
>
> Add ValueProvider support for the various methods in KafkaIO. This would 
> allow us to use KafkaIO in reusable pipeline templates.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3925) Allow ValueProvider for KafkaIO

2018-04-20 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3925?focusedWorklogId=93296&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-93296
 ]

ASF GitHub Bot logged work on BEAM-3925:


Author: ASF GitHub Bot
Created on: 20/Apr/18 17:19
Start Date: 20/Apr/18 17:19
Worklog Time Spent: 10m 
  Work Description: aaltay commented on issue #5141: [BEAM-3925] Allow 
ValueProvider for KafkaIO so that we can create Beam Templates using KafkaIO
URL: https://github.com/apache/beam/pull/5141#issuecomment-383164158
 
 
   R: @rangadi could you review this?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 93296)
Time Spent: 0.5h  (was: 20m)

> Allow ValueProvider for KafkaIO
> ---
>
> Key: BEAM-3925
> URL: https://issues.apache.org/jira/browse/BEAM-3925
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core
>Reporter: Sameer Abhyankar
>Assignee: Pramod Upamanyu
>Priority: Major
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> Add ValueProvider support for the various methods in KafkaIO. This would 
> allow us to use KafkaIO in reusable pipeline templates.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3925) Allow ValueProvider for KafkaIO

2018-04-20 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3925?focusedWorklogId=93295&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-93295
 ]

ASF GitHub Bot logged work on BEAM-3925:


Author: ASF GitHub Bot
Created on: 20/Apr/18 17:19
Start Date: 20/Apr/18 17:19
Worklog Time Spent: 10m 
  Work Description: pupamanyu commented on issue #5141: [BEAM-3925] Allow 
ValueProvider for KafkaIO so that we can create Beam Templates using KafkaIO
URL: https://github.com/apache/beam/pull/5141#issuecomment-383164141
 
 
   retest this please


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 93295)
Time Spent: 20m  (was: 10m)

> Allow ValueProvider for KafkaIO
> ---
>
> Key: BEAM-3925
> URL: https://issues.apache.org/jira/browse/BEAM-3925
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core
>Reporter: Sameer Abhyankar
>Assignee: Pramod Upamanyu
>Priority: Major
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> Add ValueProvider support for the various methods in KafkaIO. This would 
> allow us to use KafkaIO in reusable pipeline templates.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3925) Allow ValueProvider for KafkaIO

2018-04-16 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3925?focusedWorklogId=91480&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-91480
 ]

ASF GitHub Bot logged work on BEAM-3925:


Author: ASF GitHub Bot
Created on: 16/Apr/18 20:48
Start Date: 16/Apr/18 20:48
Worklog Time Spent: 10m 
  Work Description: pupamanyu opened a new pull request #5141: [BEAM-3925] 
Allow ValueProvider for KafkaIO so that we can create Beam Templates using 
KafkaIO
URL: https://github.com/apache/beam/pull/5141
 
 
   This pull request allows ValueProviders to KafkaIO. This is added so that 
these options can be used to create Beam Templates using KafkaIO.
   
   - KafkaIO.read
 - Topics
 - TopicPartitions
 - KeyDeSerializer
 - ValueDeSerializer
   - KafkaIO.write
 - Topic
 - KeySerializer
 - ValueSerializer
   
   This pull request also adds ValueProvider support for the below options for 
the
   
   - KafkaIO.read
 - withBootstrapServers
 - withTopic
 - withTopics
 - withTopicPartitions
 - withKeyDeserializer
 - withValueDeserializer
   - KafkaIO.write
 - withBootstrapServers
 - withTopic
 - withKeySerializer
 - withValueSerializer
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 91480)
Time Spent: 10m
Remaining Estimate: 0h

> Allow ValueProvider for KafkaIO
> ---
>
> Key: BEAM-3925
> URL: https://issues.apache.org/jira/browse/BEAM-3925
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core
>Reporter: Sameer Abhyankar
>Assignee: Sameer Abhyankar
>Priority: Major
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Add ValueProvider support for the various methods in KafkaIO. This would 
> allow us to use KafkaIO in reusable pipeline templates.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)