[jira] [Work logged] (BEAM-7442) Bounded Reads for Flink Runner fails with OOM

2019-05-31 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7442?focusedWorklogId=251515=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-251515
 ]

ASF GitHub Bot logged work on BEAM-7442:


Author: ASF GitHub Bot
Created on: 31/May/19 06:01
Start Date: 31/May/19 06:01
Worklog Time Spent: 10m 
  Work Description: JozoVilcek commented on pull request #8715: 
[BEAM-7442][BEAM-5650] Read sequentially from bounded sources in 
UnboundedSourceWrapper
URL: https://github.com/apache/beam/pull/8715#discussion_r289264436
 
 

 ##
 File path: 
runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapperTest.java
 ##
 @@ -690,6 +701,68 @@ private static void testSourceDoesNotShutdown(boolean 
shouldHaveReaders) throws
   }
   assertThat(thread.isAlive(), is(false));
 }
+
+@Test
+public void testSequentialReadingFromBoundedSource() throws Exception {
+  UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter source =
+  new UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter<>(
+  new BoundedIntegerSource(0, 1000));
+
+  FlinkPipelineOptions options = 
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+  options.setShutdownSourcesOnFinalWatermark(true);
+
+  UnboundedSourceWrapper 
sourceWrapper =
+  new UnboundedSourceWrapper<>("sequentialRead", options, source, 4);
+  StreamingRuntimeContext runtimeContextMock = 
Mockito.mock(StreamingRuntimeContext.class);
+
+  Mockito.when(runtimeContextMock.getIndexOfThisSubtask()).thenReturn(0);
+  when(runtimeContextMock.getNumberOfParallelSubtasks()).thenReturn(2);
+  when(runtimeContextMock.getExecutionConfig()).thenReturn(new 
ExecutionConfig());
+
+  TestProcessingTimeService processingTimeService = new 
TestProcessingTimeService();
+  processingTimeService.setCurrentTime(0);
+  
when(runtimeContextMock.getProcessingTimeService()).thenReturn(processingTimeService);
+
+  when(runtimeContextMock.getMetricGroup()).thenReturn(new 
UnregisteredMetricsGroup());
+
+  sourceWrapper.setRuntimeContext(runtimeContextMock);
+
+  sourceWrapper.open(new Configuration());
+  assertThat(sourceWrapper.getLocalReaders().size(), is(2));
+
+  List integers = new ArrayList<>();
+  sourceWrapper.run(
+  new 
SourceFunction.SourceContext>>() {
+
+@Override
+public void collect(WindowedValue> 
element) {
+  integers.add(element.getValue().getValue());
+}
+
+@Override
+public void collectWithTimestamp(
+WindowedValue> element, long 
timestamp) {
+  throw new IllegalStateException("Should not collect with 
timestamp");
+}
+
+@Override
+public void emitWatermark(Watermark mark) {}
+
+@Override
+public void markAsTemporarilyIdle() {}
+
+@Override
+public Object getCheckpointLock() {
+  return new Object();
+}
+
+@Override
+public void close() {}
+  });
+
+  // the source is effectively split into two parts
+  assertThat(integers.size(), is(500));
 
 Review comment:
   From reading the code and test, I expected that result should be that we 
observe whole input, 1000 integers, but not out of order e.g. 
`[1,500,2,501, 1000, 499]` but in sequence `[1,2,3 ... 1000]`
   What do I miss?
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 251515)
Time Spent: 40m  (was: 0.5h)

> Bounded Reads for Flink Runner fails with OOM
> -
>
> Key: BEAM-7442
> URL: https://issues.apache.org/jira/browse/BEAM-7442
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: Akshay Iyangar
>Assignee: Akshay Iyangar
>Priority: Major
>  Time Spent: 40m
>  Remaining Estimate: 0h
>
> When Flink runner is reading from a bounded source and if the total number of 
> files are huge and the count is more. FlinkRunner throws an OOM error. This 
> is happening because the current implementation doesn't read them 
> sequentially but simultaneously thus causing all of the files to be in memory 
> which quickly breaks the cluster.
> Solution : To wrap `UnboundedReadFromBoundedSource` class by a wrapper to see 
> that when the stream is a bounded source we make it read it 

[jira] [Work logged] (BEAM-7442) Bounded Reads for Flink Runner fails with OOM

2019-05-31 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7442?focusedWorklogId=251514=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-251514
 ]

ASF GitHub Bot logged work on BEAM-7442:


Author: ASF GitHub Bot
Created on: 31/May/19 06:01
Start Date: 31/May/19 06:01
Worklog Time Spent: 10m 
  Work Description: JozoVilcek commented on pull request #8715: 
[BEAM-7442][BEAM-5650] Read sequentially from bounded sources in 
UnboundedSourceWrapper
URL: https://github.com/apache/beam/pull/8715#discussion_r289264745
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
 ##
 @@ -217,34 +227,33 @@ public void 
run(SourceContext>> ctx) th
   // through to idle this executor.
   LOG.info("Number of readers is 0 for this task executor, idle");
   // Do nothing here but still execute the rest of the source logic
-} else if (localReaders.size() == 1) {
-  // the easy case, we just read from one reader
-  UnboundedSource.UnboundedReader reader = localReaders.get(0);
-
-  synchronized (ctx.getCheckpointLock()) {
-boolean dataAvailable = readerInvoker.invokeStart(reader);
-if (dataAvailable) {
-  emitElement(ctx, reader);
-}
-  }
-
+} else if (isConvertedBoundedSource) {
   setNextWatermarkTimer(this.runtimeContext);
 
-  while (isRunning) {
-boolean dataAvailable;
-synchronized (ctx.getCheckpointLock()) {
-  dataAvailable = readerInvoker.invokeAdvance(reader);
+  // We read sequentially from all bounded sources
 
 Review comment:
   If I am correct, this is used in streaming case where pipeline reads bounded 
data. I wonder what happens to watermark of that `PCollection` and if reading 
sequentially or all at once round robin has an effect to watermark behavior
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 251514)
Time Spent: 0.5h  (was: 20m)

> Bounded Reads for Flink Runner fails with OOM
> -
>
> Key: BEAM-7442
> URL: https://issues.apache.org/jira/browse/BEAM-7442
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: Akshay Iyangar
>Assignee: Akshay Iyangar
>Priority: Major
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> When Flink runner is reading from a bounded source and if the total number of 
> files are huge and the count is more. FlinkRunner throws an OOM error. This 
> is happening because the current implementation doesn't read them 
> sequentially but simultaneously thus causing all of the files to be in memory 
> which quickly breaks the cluster.
> Solution : To wrap `UnboundedReadFromBoundedSource` class by a wrapper to see 
> that when the stream is a bounded source we make it read it sequentially 
> using a queue.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-6623) Dataflow ValidatesRunner test suite should also exercise ValidatesRunner tests under Python 3.

2019-05-31 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6623?focusedWorklogId=251603=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-251603
 ]

ASF GitHub Bot logged work on BEAM-6623:


Author: ASF GitHub Bot
Created on: 31/May/19 07:53
Start Date: 31/May/19 07:53
Worklog Time Spent: 10m 
  Work Description: Juta commented on pull request #8622: [BEAM-6623] add 
DataflowValidatesRunner batch tests for python 3.6 an…
URL: https://github.com/apache/beam/pull/8622
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 251603)
Time Spent: 14h 40m  (was: 14.5h)

> Dataflow ValidatesRunner test suite should also exercise ValidatesRunner 
> tests under Python 3.
> --
>
> Key: BEAM-6623
> URL: https://issues.apache.org/jira/browse/BEAM-6623
> Project: Beam
>  Issue Type: Sub-task
>  Components: testing
>Reporter: Valentyn Tymofieiev
>Assignee: Frederik Bode
>Priority: Blocker
> Fix For: 2.14.0
>
>  Time Spent: 14h 40m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-7465) Upgrade Jackson to version 2.9.9

2019-05-31 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7465?focusedWorklogId=251672=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-251672
 ]

ASF GitHub Bot logged work on BEAM-7465:


Author: ASF GitHub Bot
Created on: 31/May/19 10:02
Start Date: 31/May/19 10:02
Worklog Time Spent: 10m 
  Work Description: aromanenko-dev commented on issue #8729: [BEAM-7465] 
Upgrade Jackson to version 2.9.9
URL: https://github.com/apache/beam/pull/8729#issuecomment-497654038
 
 
   Run Spark ValidatesRunner
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 251672)
Time Spent: 20m  (was: 10m)

> Upgrade Jackson to version 2.9.9
> 
>
> Key: BEAM-7465
> URL: https://issues.apache.org/jira/browse/BEAM-7465
> Project: Beam
>  Issue Type: Improvement
>  Components: build-system, sdk-java-core
>Affects Versions: 2.12.0
>Reporter: Alexey Romanenko
>Assignee: Alexey Romanenko
>Priority: Minor
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> Jackson 2.9.9 fixes a medium-rating security issue:
> [https://www.cvedetails.com/cve/CVE-2019-12086/]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-7465) Upgrade Jackson to version 2.9.9

2019-05-31 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7465?focusedWorklogId=251671=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-251671
 ]

ASF GitHub Bot logged work on BEAM-7465:


Author: ASF GitHub Bot
Created on: 31/May/19 10:00
Start Date: 31/May/19 10:00
Worklog Time Spent: 10m 
  Work Description: aromanenko-dev commented on pull request #8729: 
[BEAM-7465] Upgrade Jackson to version 2.9.9
URL: https://github.com/apache/beam/pull/8729
 
 
   
   
   
   Thank you for your contribution! Follow this checklist to help us 
incorporate your contribution quickly and easily:
   
- [x] [**Choose 
reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and 
mention them in a comment (`R: @username`).
- [x] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
 | --- | --- | --- | --- | --- | ---
   Java | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)
   Python | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Python3_Verify/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python3_Verify/lastCompletedBuild/)
 | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)
  [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/)
 | --- | --- | ---
   
   Pre-Commit Tests Status (on master branch)
   

   
   --- |Java | Python | Go | Website
   --- | --- | --- | --- | ---
   Non-portable | [![Build 

[jira] [Updated] (BEAM-7358) Developing "GroupByMultiKey"

2019-05-31 Thread Hyu Naoki (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7358?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyu Naoki updated BEAM-7358:

Description: 
GroupByKey is useful, but it is difficult to express complex grouping 
conditions.

I propose the development of the function to group if there is a common term in 
even one of multiple keys.

Like this

 

usage

 
{code:java}
List io = Arrays.asList(
KV.of(new MultiKey("A","C", "E"),"0"),
 KV.of(new MultiKey("A","D", "F"),"1"),
 KV.of(new MultiKey("B","D", "G"),"2"),
 KV.of(new MultiKey("H","J", "L"),"3"),
 KV.of(new MultiKey("I","K", "L"),"4"),
 KV.of(new MultiKey("M","C", "O"),"5"),
 KV.of(new MultiKey(null,null, "O"),"6"),
 KV.of(new MultiKey(null,null, null),"7")
);
p.apply(Create.of(io))
.apply(GroupByMultiKey.create(3))
.apply(new DebugPrintln());
{code}
 

 out put this
{quote} 

[KV\{["A","C","E"],"0"},KV\{["A","D","F"],"1"},KV\{["B","D","G"],"2"},KV\{["M","C","O"],"5"},KV\{[null,null,"O"],"6"}]

[KV\{["H","J","L"],"3"},KV\{["I","K","L"],"4"}]

[KV\{[null,null,null],"7"}]


 
{quote}

  was:
GroupByKey is useful, but it is difficult to express complex grouping 
conditions.

I propose the development of the function to group if there is a common term in 
even one of multiple keys.

Like this

 

usage

 
{code:java}
List io = Arrays.asList(
KV.of(new MultiKey("A","C", "E"),"0"),
 KV.of(new MultiKey("A","D", "F"),"1"),
 KV.of(new MultiKey("B","D", "G"),"2"),
 KV.of(new MultiKey("H","J", "L"),"3"),
 KV.of(new MultiKey("I","K", "L"),"4"),
 KV.of(new MultiKey("M","C", "O"),"5"),
 KV.of(new MultiKey(null,null, "O"),"6"),
 KV.of(new MultiKey(null,null, null),"7")
);
p.apply(Create.of(io))
.apply(GroupByMultiKey.create(3))
.apply(new DebugPrintln());
{code}
 

 out put this
{quote} [
[KV\{["A","C","E"],"0"},KV\{["A","D","F"],"1"},KV\{["B","D","G"],"2"},KV\{["M","C","O"],"5"},KV\{[null,null,"O"],"6"}],
[KV\{["H","J","L"],"3"},KV\{["I","K","L"],"4"}],
[KV\{[null,null,null],"7"}]
]

 
{quote}


> Developing  "GroupByMultiKey"
> -
>
> Key: BEAM-7358
> URL: https://issues.apache.org/jira/browse/BEAM-7358
> Project: Beam
>  Issue Type: New Feature
>  Components: beam-model
>Reporter: Hyu Naoki
>Priority: Major
>
> GroupByKey is useful, but it is difficult to express complex grouping 
> conditions.
> I propose the development of the function to group if there is a common term 
> in even one of multiple keys.
> Like this
>  
> usage
>  
> {code:java}
> List io = Arrays.asList(
> KV.of(new MultiKey("A","C", "E"),"0"),
>  KV.of(new MultiKey("A","D", "F"),"1"),
>  KV.of(new MultiKey("B","D", "G"),"2"),
>  KV.of(new MultiKey("H","J", "L"),"3"),
>  KV.of(new MultiKey("I","K", "L"),"4"),
>  KV.of(new MultiKey("M","C", "O"),"5"),
>  KV.of(new MultiKey(null,null, "O"),"6"),
>  KV.of(new MultiKey(null,null, null),"7")
> );
> p.apply(Create.of(io))
> .apply(GroupByMultiKey.create(3))
> .apply(new DebugPrintln());
> {code}
>  
>  out put this
> {quote} 
> [KV\{["A","C","E"],"0"},KV\{["A","D","F"],"1"},KV\{["B","D","G"],"2"},KV\{["M","C","O"],"5"},KV\{[null,null,"O"],"6"}]
> [KV\{["H","J","L"],"3"},KV\{["I","K","L"],"4"}]
> [KV\{[null,null,null],"7"}]
>  
> {quote}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (BEAM-7465) Upgrade Jackson to version 2.9.9

2019-05-31 Thread Alexey Romanenko (JIRA)
Alexey Romanenko created BEAM-7465:
--

 Summary: Upgrade Jackson to version 2.9.9
 Key: BEAM-7465
 URL: https://issues.apache.org/jira/browse/BEAM-7465
 Project: Beam
  Issue Type: Improvement
  Components: build-system, sdk-java-core
Affects Versions: 2.12.0
Reporter: Alexey Romanenko
Assignee: Alexey Romanenko


Jackson 2.9.9 fixes a medium-rating security issue:
[https://www.cvedetails.com/cve/CVE-2019-12086/]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (BEAM-7465) Upgrade Jackson to version 2.9.9

2019-05-31 Thread Alexey Romanenko (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7465?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexey Romanenko updated BEAM-7465:
---
Status: Open  (was: Triage Needed)

> Upgrade Jackson to version 2.9.9
> 
>
> Key: BEAM-7465
> URL: https://issues.apache.org/jira/browse/BEAM-7465
> Project: Beam
>  Issue Type: Improvement
>  Components: build-system, sdk-java-core
>Affects Versions: 2.12.0
>Reporter: Alexey Romanenko
>Assignee: Alexey Romanenko
>Priority: Minor
>
> Jackson 2.9.9 fixes a medium-rating security issue:
> [https://www.cvedetails.com/cve/CVE-2019-12086/]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-7442) Bounded Reads for Flink Runner fails with OOM

2019-05-31 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7442?focusedWorklogId=251729=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-251729
 ]

ASF GitHub Bot logged work on BEAM-7442:


Author: ASF GitHub Bot
Created on: 31/May/19 11:53
Start Date: 31/May/19 11:53
Worklog Time Spent: 10m 
  Work Description: mxm commented on pull request #8715: 
[BEAM-7442][BEAM-5650] Read sequentially from bounded sources in 
UnboundedSourceWrapper
URL: https://github.com/apache/beam/pull/8715#discussion_r289359163
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
 ##
 @@ -217,34 +227,33 @@ public void 
run(SourceContext>> ctx) th
   // through to idle this executor.
   LOG.info("Number of readers is 0 for this task executor, idle");
   // Do nothing here but still execute the rest of the source logic
-} else if (localReaders.size() == 1) {
-  // the easy case, we just read from one reader
-  UnboundedSource.UnboundedReader reader = localReaders.get(0);
-
-  synchronized (ctx.getCheckpointLock()) {
-boolean dataAvailable = readerInvoker.invokeStart(reader);
-if (dataAvailable) {
-  emitElement(ctx, reader);
-}
-  }
-
+} else if (isConvertedBoundedSource) {
   setNextWatermarkTimer(this.runtimeContext);
 
-  while (isRunning) {
-boolean dataAvailable;
-synchronized (ctx.getCheckpointLock()) {
-  dataAvailable = readerInvoker.invokeAdvance(reader);
+  // We read sequentially from all bounded sources
 
 Review comment:
   It doesn't make a difference because the watermark is always the minimum 
across all readers. In case of the `UnboundedReadFromBoundedSource` adapter, 
all input is read from all sources and only then the watermark is progressed.
   
   See: 
https://github.com/apache/beam/commit/11d9ec5ebff4820b36db4b6ea4df7a0f79115ddd#diff-6bd5346c8d8943cdb40aa5cf5b1731bdR354
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 251729)
Time Spent: 1h  (was: 50m)

> Bounded Reads for Flink Runner fails with OOM
> -
>
> Key: BEAM-7442
> URL: https://issues.apache.org/jira/browse/BEAM-7442
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: Akshay Iyangar
>Assignee: Akshay Iyangar
>Priority: Major
>  Time Spent: 1h
>  Remaining Estimate: 0h
>
> When Flink runner is reading from a bounded source and if the total number of 
> files are huge and the count is more. FlinkRunner throws an OOM error. This 
> is happening because the current implementation doesn't read them 
> sequentially but simultaneously thus causing all of the files to be in memory 
> which quickly breaks the cluster.
> Solution : To wrap `UnboundedReadFromBoundedSource` class by a wrapper to see 
> that when the stream is a bounded source we make it read it sequentially 
> using a queue.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-7442) Bounded Reads for Flink Runner fails with OOM

2019-05-31 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7442?focusedWorklogId=251728=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-251728
 ]

ASF GitHub Bot logged work on BEAM-7442:


Author: ASF GitHub Bot
Created on: 31/May/19 11:53
Start Date: 31/May/19 11:53
Worklog Time Spent: 10m 
  Work Description: mxm commented on pull request #8715: 
[BEAM-7442][BEAM-5650] Read sequentially from bounded sources in 
UnboundedSourceWrapper
URL: https://github.com/apache/beam/pull/8715#discussion_r289359128
 
 

 ##
 File path: 
runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapperTest.java
 ##
 @@ -690,6 +701,68 @@ private static void testSourceDoesNotShutdown(boolean 
shouldHaveReaders) throws
   }
   assertThat(thread.isAlive(), is(false));
 }
+
+@Test
+public void testSequentialReadingFromBoundedSource() throws Exception {
+  UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter source =
+  new UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter<>(
+  new BoundedIntegerSource(0, 1000));
+
+  FlinkPipelineOptions options = 
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+  options.setShutdownSourcesOnFinalWatermark(true);
+
+  UnboundedSourceWrapper 
sourceWrapper =
+  new UnboundedSourceWrapper<>("sequentialRead", options, source, 4);
+  StreamingRuntimeContext runtimeContextMock = 
Mockito.mock(StreamingRuntimeContext.class);
+
+  Mockito.when(runtimeContextMock.getIndexOfThisSubtask()).thenReturn(0);
+  when(runtimeContextMock.getNumberOfParallelSubtasks()).thenReturn(2);
+  when(runtimeContextMock.getExecutionConfig()).thenReturn(new 
ExecutionConfig());
+
+  TestProcessingTimeService processingTimeService = new 
TestProcessingTimeService();
+  processingTimeService.setCurrentTime(0);
+  
when(runtimeContextMock.getProcessingTimeService()).thenReturn(processingTimeService);
+
+  when(runtimeContextMock.getMetricGroup()).thenReturn(new 
UnregisteredMetricsGroup());
+
+  sourceWrapper.setRuntimeContext(runtimeContextMock);
+
+  sourceWrapper.open(new Configuration());
+  assertThat(sourceWrapper.getLocalReaders().size(), is(2));
+
+  List integers = new ArrayList<>();
+  sourceWrapper.run(
+  new 
SourceFunction.SourceContext>>() {
+
+@Override
+public void collect(WindowedValue> 
element) {
+  integers.add(element.getValue().getValue());
+}
+
+@Override
+public void collectWithTimestamp(
+WindowedValue> element, long 
timestamp) {
+  throw new IllegalStateException("Should not collect with 
timestamp");
+}
+
+@Override
+public void emitWatermark(Watermark mark) {}
+
+@Override
+public void markAsTemporarilyIdle() {}
+
+@Override
+public Object getCheckpointLock() {
+  return new Object();
+}
+
+@Override
+public void close() {}
+  });
+
+  // the source is effectively split into two parts
+  assertThat(integers.size(), is(500));
 
 Review comment:
   If you look further up, it sets the number of parallel sub tasks to 2 and 
the initial splitting is performed with a parallelism of 4. So that makes 2 
instances for this partition which is run in the test.
   
   Good point on the output ordering. I've improved the output check. Please 
have a look.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 251728)
Time Spent: 50m  (was: 40m)

> Bounded Reads for Flink Runner fails with OOM
> -
>
> Key: BEAM-7442
> URL: https://issues.apache.org/jira/browse/BEAM-7442
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: Akshay Iyangar
>Assignee: Akshay Iyangar
>Priority: Major
>  Time Spent: 50m
>  Remaining Estimate: 0h
>
> When Flink runner is reading from a bounded source and if the total number of 
> files are huge and the count is more. FlinkRunner throws an OOM error. This 
> is happening because the current implementation doesn't read them 
> sequentially but simultaneously thus causing all of the files to be in memory 
> which quickly breaks the cluster.
> Solution : To wrap `UnboundedReadFromBoundedSource` class by a wrapper to see 

[jira] [Work logged] (BEAM-7442) Bounded Reads for Flink Runner fails with OOM

2019-05-31 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7442?focusedWorklogId=251734=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-251734
 ]

ASF GitHub Bot logged work on BEAM-7442:


Author: ASF GitHub Bot
Created on: 31/May/19 11:55
Start Date: 31/May/19 11:55
Worklog Time Spent: 10m 
  Work Description: mxm commented on pull request #8715: 
[BEAM-7442][BEAM-5650] Read sequentially from bounded sources in 
UnboundedSourceWrapper
URL: https://github.com/apache/beam/pull/8715#discussion_r289359752
 
 

 ##
 File path: 
runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapperTest.java
 ##
 @@ -707,4 +780,77 @@ public StreamStatus getStreamStatus() {
   return currentStreamStatus;
 }
   }
+
+  private static class BoundedIntegerSource extends OffsetBasedSource 
{
 
 Review comment:
   There is also `CountingSource.createUnboundedFrom` which is package-private, 
but it looks like all the public methods are deprecated.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 251734)
Time Spent: 1h 10m  (was: 1h)

> Bounded Reads for Flink Runner fails with OOM
> -
>
> Key: BEAM-7442
> URL: https://issues.apache.org/jira/browse/BEAM-7442
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: Akshay Iyangar
>Assignee: Akshay Iyangar
>Priority: Major
>  Time Spent: 1h 10m
>  Remaining Estimate: 0h
>
> When Flink runner is reading from a bounded source and if the total number of 
> files are huge and the count is more. FlinkRunner throws an OOM error. This 
> is happening because the current implementation doesn't read them 
> sequentially but simultaneously thus causing all of the files to be in memory 
> which quickly breaks the cluster.
> Solution : To wrap `UnboundedReadFromBoundedSource` class by a wrapper to see 
> that when the stream is a bounded source we make it read it sequentially 
> using a queue.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-7274) Protobuf Beam Schema support

2019-05-31 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7274?focusedWorklogId=251770=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-251770
 ]

ASF GitHub Bot logged work on BEAM-7274:


Author: ASF GitHub Bot
Created on: 31/May/19 12:46
Start Date: 31/May/19 12:46
Worklog Time Spent: 10m 
  Work Description: alexvanboxel commented on issue #8690: [BEAM-7274] 
Implement the Protobuf schema provider
URL: https://github.com/apache/beam/pull/8690#issuecomment-497695744
 
 
   I got it to work with the RowWithGetters but not without modifying it. See 
https://github.com/alexvanboxel/beam/commit/1da5715c74d1f3c5de66ae4ebcddcc148bfa7915
   
   I keep on handling the List/Map/Row with the Getters as I need more context 
then the other implementations. Personally I find this better and more flexible 
then doing it in the `RowWithGetters` as you have more ways to give context 
(example: proto map is a repeatable message).
   
   I looked for reference to the `GenericRecord` in Avro, but was disappointed 
that the `GenericRecord` gets materialised in a `RowWithStorage`. I'm quite 
sure with having the getters that handle collection this could be changed to 
`RowWithGetters`
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 251770)
Time Spent: 1h 10m  (was: 1h)

> Protobuf Beam Schema support
> 
>
> Key: BEAM-7274
> URL: https://issues.apache.org/jira/browse/BEAM-7274
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core
>Reporter: Alex Van Boxel
>Assignee: Alex Van Boxel
>Priority: Minor
>  Time Spent: 1h 10m
>  Remaining Estimate: 0h
>
> Add support for the new Beam Schema to the Protobuf extension.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-7305) Add first version of Hazelcast Jet Runner

2019-05-31 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7305?focusedWorklogId=251809=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-251809
 ]

ASF GitHub Bot logged work on BEAM-7305:


Author: ASF GitHub Bot
Created on: 31/May/19 14:30
Start Date: 31/May/19 14:30
Worklog Time Spent: 10m 
  Work Description: mxm commented on pull request #8699: [BEAM-7305] Fix 
issues in and extend documentation for Hazelcast Jet Runner 
URL: https://github.com/apache/beam/pull/8699
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 251809)
Time Spent: 8h 20m  (was: 8h 10m)

> Add first version of Hazelcast Jet Runner
> -
>
> Key: BEAM-7305
> URL: https://issues.apache.org/jira/browse/BEAM-7305
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-jet
>Reporter: Maximilian Michels
>Assignee: Jozsef Bartok
>Priority: Major
> Fix For: 2.14.0
>
>  Time Spent: 8h 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-7442) Bounded Reads for Flink Runner fails with OOM

2019-05-31 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7442?focusedWorklogId=251824=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-251824
 ]

ASF GitHub Bot logged work on BEAM-7442:


Author: ASF GitHub Bot
Created on: 31/May/19 14:53
Start Date: 31/May/19 14:53
Worklog Time Spent: 10m 
  Work Description: mxm commented on pull request #8715: 
[BEAM-7442][BEAM-5650] Read sequentially from bounded sources in 
UnboundedSourceWrapper
URL: https://github.com/apache/beam/pull/8715#discussion_r289424545
 
 

 ##
 File path: 
runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapperTest.java
 ##
 @@ -707,4 +780,77 @@ public StreamStatus getStreamStatus() {
   return currentStreamStatus;
 }
   }
+
+  private static class BoundedIntegerSource extends OffsetBasedSource 
{
 
 Review comment:
   Looking at all the other test which use `CountingSource.upTo()` I think it 
is probably ok to depend on this method, instead of essentially duplicating 
code. I've removed the source. 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 251824)
Time Spent: 1h 40m  (was: 1.5h)

> Bounded Reads for Flink Runner fails with OOM
> -
>
> Key: BEAM-7442
> URL: https://issues.apache.org/jira/browse/BEAM-7442
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: Akshay Iyangar
>Assignee: Akshay Iyangar
>Priority: Major
>  Time Spent: 1h 40m
>  Remaining Estimate: 0h
>
> When Flink runner is reading from a bounded source and if the total number of 
> files are huge and the count is more. FlinkRunner throws an OOM error. This 
> is happening because the current implementation doesn't read them 
> sequentially but simultaneously thus causing all of the files to be in memory 
> which quickly breaks the cluster.
> Solution : To wrap `UnboundedReadFromBoundedSource` class by a wrapper to see 
> that when the stream is a bounded source we make it read it sequentially 
> using a queue.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-7305) Add first version of Hazelcast Jet Runner

2019-05-31 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7305?focusedWorklogId=251706=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-251706
 ]

ASF GitHub Bot logged work on BEAM-7305:


Author: ASF GitHub Bot
Created on: 31/May/19 11:14
Start Date: 31/May/19 11:14
Worklog Time Spent: 10m 
  Work Description: jbartok commented on issue #8699: [BEAM-7305] Fix 
issues in and extend documentation for Hazelcast Jet Runner 
URL: https://github.com/apache/beam/pull/8699#issuecomment-497672574
 
 
   Sure, I just reasoned that multiple smaller changes would be simpler to 
review. Coming up!
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 251706)
Time Spent: 8h 10m  (was: 8h)

> Add first version of Hazelcast Jet Runner
> -
>
> Key: BEAM-7305
> URL: https://issues.apache.org/jira/browse/BEAM-7305
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-jet
>Reporter: Maximilian Michels
>Assignee: Jozsef Bartok
>Priority: Major
> Fix For: 2.14.0
>
>  Time Spent: 8h 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-7465) Upgrade Jackson to version 2.9.9

2019-05-31 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7465?focusedWorklogId=251716=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-251716
 ]

ASF GitHub Bot logged work on BEAM-7465:


Author: ASF GitHub Bot
Created on: 31/May/19 11:27
Start Date: 31/May/19 11:27
Worklog Time Spent: 10m 
  Work Description: iemejia commented on pull request #8729: [BEAM-7465] 
Upgrade Jackson to version 2.9.9
URL: https://github.com/apache/beam/pull/8729
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 251716)
Time Spent: 0.5h  (was: 20m)

> Upgrade Jackson to version 2.9.9
> 
>
> Key: BEAM-7465
> URL: https://issues.apache.org/jira/browse/BEAM-7465
> Project: Beam
>  Issue Type: Improvement
>  Components: build-system, sdk-java-core
>Affects Versions: 2.12.0
>Reporter: Alexey Romanenko
>Assignee: Alexey Romanenko
>Priority: Minor
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> Jackson 2.9.9 fixes a medium-rating security issue:
> [https://www.cvedetails.com/cve/CVE-2019-12086/]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-6829) Duplicate metric warnings clutter log

2019-05-31 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6829?focusedWorklogId=251792=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-251792
 ]

ASF GitHub Bot logged work on BEAM-6829:


Author: ASF GitHub Bot
Created on: 31/May/19 13:57
Start Date: 31/May/19 13:57
Worklog Time Spent: 10m 
  Work Description: tweise commented on issue #8585: [BEAM-6829] Use 
transform name for metric namespace if none provided
URL: https://github.com/apache/beam/pull/8585#issuecomment-497718117
 
 
   With this change, I still see the duplicate metrics warning:
   ```
   [Source: Impulse -> [2]write/Write/WriteImpl/DoOnce/{FlatMap(), Map(decode)} -> [1]write/Write/WriteImpl/InitializeWrite -> 
(Map -> ToKeyedWorkItem, Map -> ToKeyedWorkItem, Map -> ToKeyedWorkItem) (1/1)] 
WARN org.apache.flink.metrics.MetricGroup - Name collision: Group already 
contains a Metric with the name 'beam.metric:element_count:v1'. Metric will not 
be reported.[localhost, taskmanager, 0bd0adb5-1fba-49c7-811f-c8ae76f83bb8, 
BeamApp-tweise-0531134309-499ce3fa, 
[2]write/Write/WriteImpl/DoOnce/{FlatMap(), 
Map(decode)}, 0]
   ```
   
   But I also suspect that the proposed fix will have other side effects, since 
it is changing the metric name (at least that's how it would appear in our 
metric system).
   
   The metrics as logged with the slf4j reporter before this change look like 
this:
   ```
   
localhost.taskmanager.87150cf7-cc34-412c-9057-656edef0590c.BeamApp-tweise-0531054435-7c2c37e1.[1]write/Write/WriteImpl/FinalizeWrite.0.numRecordsIn:
 1
   
localhost.taskmanager.87150cf7-cc34-412c-9057-656edef0590c.BeamApp-tweise-0531054435-7c2c37e1.[3]{read,
 split, pair_with_one}.0.beam.metric:element_count:v1: 1
   
localhost.taskmanager.87150cf7-cc34-412c-9057-656edef0590c.BeamApp-tweise-0531054435-7c2c37e1.ToKeyedWorkItem.0.numRecordsOut:
 0
   
localhost.taskmanager.87150cf7-cc34-412c-9057-656edef0590c.BeamApp-tweise-0531054435-7c2c37e1.[3]{read,
 split, 
pair_with_one}.0.ref_AppliedPTransform_read/Read/ReadSplits_16.metric:pardo_execution_time:finish_bundle_msecs:v1:
 0
   ```
   In our backend this translates to 
`.operator.beam-metric-element_count-v1.gauge.sum`
   
   If we replace `beam` with ``, then we would effectively 
change the metric name. Instead, shouldn't the transform name be part of the 
scope?
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 251792)
Time Spent: 50m  (was: 40m)

> Duplicate metric warnings clutter log
> -
>
> Key: BEAM-6829
> URL: https://issues.apache.org/jira/browse/BEAM-6829
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Affects Versions: 2.11.0
>Reporter: Thomas Weise
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: portability
>  Time Spent: 50m
>  Remaining Estimate: 0h
>
> Logs fill up quickly with these warnings: 
> {code:java}
> WARN org.apache.flink.metrics.MetricGroup - Name collision: Group already 
> contains a Metric with the name ...{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-6829) Duplicate metric warnings clutter log

2019-05-31 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6829?focusedWorklogId=251796=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-251796
 ]

ASF GitHub Bot logged work on BEAM-6829:


Author: ASF GitHub Bot
Created on: 31/May/19 14:06
Start Date: 31/May/19 14:06
Worklog Time Spent: 10m 
  Work Description: tweise commented on issue #8585: [BEAM-6829] Use 
transform name for metric namespace if none provided
URL: https://github.com/apache/beam/pull/8585#issuecomment-497721088
 
 
   Looks as if the change applies to some metrics (in the log snipped 
`pardo_execution_time`, but not `element_count`). But I believe we need to keep 
the metric name intact and augment the scope.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 251796)
Time Spent: 1h  (was: 50m)

> Duplicate metric warnings clutter log
> -
>
> Key: BEAM-6829
> URL: https://issues.apache.org/jira/browse/BEAM-6829
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Affects Versions: 2.11.0
>Reporter: Thomas Weise
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: portability
>  Time Spent: 1h
>  Remaining Estimate: 0h
>
> Logs fill up quickly with these warnings: 
> {code:java}
> WARN org.apache.flink.metrics.MetricGroup - Name collision: Group already 
> contains a Metric with the name ...{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-7021) ToString transform for Python SDK

2019-05-31 Thread Shoaib Zafar (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-7021?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16852912#comment-16852912
 ] 

Shoaib Zafar commented on BEAM-7021:


[~shehzaadn] This task has been done and merged into master branch. We can 
close this ticket now!
- Thanks

> ToString transform for Python SDK
> -
>
> Key: BEAM-7021
> URL: https://issues.apache.org/jira/browse/BEAM-7021
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: Rose Nguyen
>Assignee: Shehzaad Nakhoda
>Priority: Minor
>  Labels: starter
>  Time Spent: 2.5h
>  Remaining Estimate: 0h
>
> PTransforms for converting a PCollection or  PCollection
> Iterable to a PCollection String
> It should offer the same API as its Java counterpart: 
> [https://github.com/apache/beam/blob/11a977b8b26eff2274d706541127c19dc93131a2/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ToString.java]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (BEAM-7021) ToString transform for Python SDK

2019-05-31 Thread Shehzaad Nakhoda (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7021?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shehzaad Nakhoda reassigned BEAM-7021:
--

Assignee: (was: Shehzaad Nakhoda)

> ToString transform for Python SDK
> -
>
> Key: BEAM-7021
> URL: https://issues.apache.org/jira/browse/BEAM-7021
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: Rose Nguyen
>Priority: Minor
>  Labels: starter
>  Time Spent: 2.5h
>  Remaining Estimate: 0h
>
> PTransforms for converting a PCollection or  PCollection
> Iterable to a PCollection String
> It should offer the same API as its Java counterpart: 
> [https://github.com/apache/beam/blob/11a977b8b26eff2274d706541127c19dc93131a2/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ToString.java]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-7021) ToString transform for Python SDK

2019-05-31 Thread Shehzaad Nakhoda (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-7021?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16852915#comment-16852915
 ] 

Shehzaad Nakhoda commented on BEAM-7021:


[~altay] can we close this please?

> ToString transform for Python SDK
> -
>
> Key: BEAM-7021
> URL: https://issues.apache.org/jira/browse/BEAM-7021
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: Rose Nguyen
>Assignee: Shehzaad Nakhoda
>Priority: Minor
>  Labels: starter
>  Time Spent: 2.5h
>  Remaining Estimate: 0h
>
> PTransforms for converting a PCollection or  PCollection
> Iterable to a PCollection String
> It should offer the same API as its Java counterpart: 
> [https://github.com/apache/beam/blob/11a977b8b26eff2274d706541127c19dc93131a2/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ToString.java]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (BEAM-7021) ToString transform for Python SDK

2019-05-31 Thread Shehzaad Nakhoda (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7021?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shehzaad Nakhoda reassigned BEAM-7021:
--

Assignee: Shehzaad Nakhoda

> ToString transform for Python SDK
> -
>
> Key: BEAM-7021
> URL: https://issues.apache.org/jira/browse/BEAM-7021
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: Rose Nguyen
>Assignee: Shehzaad Nakhoda
>Priority: Minor
>  Labels: starter
>  Time Spent: 2.5h
>  Remaining Estimate: 0h
>
> PTransforms for converting a PCollection or  PCollection
> Iterable to a PCollection String
> It should offer the same API as its Java counterpart: 
> [https://github.com/apache/beam/blob/11a977b8b26eff2274d706541127c19dc93131a2/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ToString.java]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (BEAM-7465) Upgrade Jackson to version 2.9.9

2019-05-31 Thread JIRA


 [ 
https://issues.apache.org/jira/browse/BEAM-7465?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismaël Mejía resolved BEAM-7465.

   Resolution: Fixed
Fix Version/s: 2.14.0

> Upgrade Jackson to version 2.9.9
> 
>
> Key: BEAM-7465
> URL: https://issues.apache.org/jira/browse/BEAM-7465
> Project: Beam
>  Issue Type: Improvement
>  Components: build-system, sdk-java-core
>Affects Versions: 2.12.0
>Reporter: Alexey Romanenko
>Assignee: Alexey Romanenko
>Priority: Minor
> Fix For: 2.14.0
>
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> Jackson 2.9.9 fixes a medium-rating security issue:
> [https://www.cvedetails.com/cve/CVE-2019-12086/]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-5995) Create Jenkins jobs to run the load tests

2019-05-31 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5995?focusedWorklogId=251718=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-251718
 ]

ASF GitHub Bot logged work on BEAM-5995:


Author: ASF GitHub Bot
Created on: 31/May/19 11:30
Start Date: 31/May/19 11:30
Worklog Time Spent: 10m 
  Work Description: kkucharc commented on issue #8151: [BEAM-5995] add 
Jenkins job with GBK Python load tests
URL: https://github.com/apache/beam/pull/8151#issuecomment-497676459
 
 
   @pabloem I think waiting until Monday won't be that long and Udi is already 
familiar with the topic. Unless you recommend that someone more should look at 
it.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 251718)
Time Spent: 50h 20m  (was: 50h 10m)

> Create Jenkins jobs to run the load tests
> -
>
> Key: BEAM-5995
> URL: https://issues.apache.org/jira/browse/BEAM-5995
> Project: Beam
>  Issue Type: Sub-task
>  Components: testing
>Reporter: Kasia Kucharczyk
>Assignee: Kasia Kucharczyk
>Priority: Major
>  Time Spent: 50h 20m
>  Remaining Estimate: 0h
>
> (/) Add SMOKE test 
>  Add GBK load tests.
> Add CoGBK load tests.
> Add Pardo load tests.
> Add SideInput tests.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (BEAM-7357) Kinesis IO.write throws LimitExceededException

2019-05-31 Thread Alexey Romanenko (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7357?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexey Romanenko reassigned BEAM-7357:
--

Assignee: Alexey Romanenko

> Kinesis IO.write throws LimitExceededException
> --
>
> Key: BEAM-7357
> URL: https://issues.apache.org/jira/browse/BEAM-7357
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-kinesis
>Affects Versions: 2.11.0
>Reporter: Brachi Packter
>Assignee: Alexey Romanenko
>Priority: Major
>
> I used Kinesis IO to write to kinesis. I get very quickly many exceptions 
> like:
> [shard_map.cc:150] Shard map update for stream "***" failed. Code: 
> LimitExceededException Message: Rate exceeded for stream *** under account 
> ***; retrying in ..
> Also, I see many exceptions like:
> Caused by: java.lang.IllegalArgumentException: Stream ** does not exist at 
> org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument(Preconditions.java:191)
>  at 
> org.apache.beam.sdk.io.kinesis.KinesisIO$Write$KinesisWriterFn.setup(KinesisIO.java:515)
> I'm sure this stream exists because I can see some data from my pipeline that 
> was successfully ingested to it.
>  
> Here is my code:
>  
>  
> {code:java}
> .apply(KinesisIO.write()
>        .withStreamName("**")
>        .withPartitioner(new KinesisPartitioner() {
>                        @Override
>                         public String getPartitionKey(byte[] value) {
>                                         return UUID.randomUUID().toString()
>                          }
>                         @Override
>                         public String getExplicitHashKey(byte[] value) {
>                                         return null;
>                         }
>        })
>.withAWSClientsProvider("**","***",Regions.US_EAST_1));{code}
>  
> I tried to not use the Kinesis IO. and everything works well, I can't figure 
> out what went wrong.
> I tried using the same API as the library did.
>  
> {code:java}
> .apply(
>  ParDo.of(new DoFn() {
>  private transient IKinesisProducer inlineProducer;
>  @Setup
>  public void setup(){
>  KinesisProducerConfiguration config =   
> KinesisProducerConfiguration.fromProperties(new Properties());
>  config.setRegion(Regions.US_EAST_1.getName());
>  config.setCredentialsProvider(new AWSStaticCredentialsProvider(new 
> BasicAWSCredentials("***", "***")));
>  inlineProducer = new KinesisProducer(config);
>  }
>  @ProcessElement
>  public void processElement(ProcessContext c) throws Exception {
> ByteBuffer data = ByteBuffer.wrap(c.element());
> String partitionKey =UUID.randomUUID().toString();
> ListenableFuture f =
> getProducer().addUserRecord("***", partitionKey, data);
>Futures.addCallback(f, new UserRecordResultFutureCallback());
> }
>  class UserRecordResultFutureCallback implements 
> FutureCallback {
>  @Override
>  public void onFailure(Throwable cause) {
>throw new RuntimeException("failed produce:"+cause);
>  }
>  @Override
>  public void onSuccess(UserRecordResult result) {
>  }
>  }
>  })
>  );
>  
> {code}
>  
> Any idea what I did wrong? or what the error in the KinesisIO?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-7357) Kinesis IO.write throws LimitExceededException

2019-05-31 Thread Alexey Romanenko (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-7357?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16852940#comment-16852940
 ] 

Alexey Romanenko commented on BEAM-7357:


Yes, I'll fix this. Thanks for the report!

> Kinesis IO.write throws LimitExceededException
> --
>
> Key: BEAM-7357
> URL: https://issues.apache.org/jira/browse/BEAM-7357
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-kinesis
>Affects Versions: 2.11.0
>Reporter: Brachi Packter
>Priority: Major
>
> I used Kinesis IO to write to kinesis. I get very quickly many exceptions 
> like:
> [shard_map.cc:150] Shard map update for stream "***" failed. Code: 
> LimitExceededException Message: Rate exceeded for stream *** under account 
> ***; retrying in ..
> Also, I see many exceptions like:
> Caused by: java.lang.IllegalArgumentException: Stream ** does not exist at 
> org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument(Preconditions.java:191)
>  at 
> org.apache.beam.sdk.io.kinesis.KinesisIO$Write$KinesisWriterFn.setup(KinesisIO.java:515)
> I'm sure this stream exists because I can see some data from my pipeline that 
> was successfully ingested to it.
>  
> Here is my code:
>  
>  
> {code:java}
> .apply(KinesisIO.write()
>        .withStreamName("**")
>        .withPartitioner(new KinesisPartitioner() {
>                        @Override
>                         public String getPartitionKey(byte[] value) {
>                                         return UUID.randomUUID().toString()
>                          }
>                         @Override
>                         public String getExplicitHashKey(byte[] value) {
>                                         return null;
>                         }
>        })
>.withAWSClientsProvider("**","***",Regions.US_EAST_1));{code}
>  
> I tried to not use the Kinesis IO. and everything works well, I can't figure 
> out what went wrong.
> I tried using the same API as the library did.
>  
> {code:java}
> .apply(
>  ParDo.of(new DoFn() {
>  private transient IKinesisProducer inlineProducer;
>  @Setup
>  public void setup(){
>  KinesisProducerConfiguration config =   
> KinesisProducerConfiguration.fromProperties(new Properties());
>  config.setRegion(Regions.US_EAST_1.getName());
>  config.setCredentialsProvider(new AWSStaticCredentialsProvider(new 
> BasicAWSCredentials("***", "***")));
>  inlineProducer = new KinesisProducer(config);
>  }
>  @ProcessElement
>  public void processElement(ProcessContext c) throws Exception {
> ByteBuffer data = ByteBuffer.wrap(c.element());
> String partitionKey =UUID.randomUUID().toString();
> ListenableFuture f =
> getProducer().addUserRecord("***", partitionKey, data);
>Futures.addCallback(f, new UserRecordResultFutureCallback());
> }
>  class UserRecordResultFutureCallback implements 
> FutureCallback {
>  @Override
>  public void onFailure(Throwable cause) {
>throw new RuntimeException("failed produce:"+cause);
>  }
>  @Override
>  public void onSuccess(UserRecordResult result) {
>  }
>  }
>  })
>  );
>  
> {code}
>  
> Any idea what I did wrong? or what the error in the KinesisIO?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-7442) Bounded Reads for Flink Runner fails with OOM

2019-05-31 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7442?focusedWorklogId=251744=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-251744
 ]

ASF GitHub Bot logged work on BEAM-7442:


Author: ASF GitHub Bot
Created on: 31/May/19 12:06
Start Date: 31/May/19 12:06
Worklog Time Spent: 10m 
  Work Description: JozoVilcek commented on pull request #8715: 
[BEAM-7442][BEAM-5650] Read sequentially from bounded sources in 
UnboundedSourceWrapper
URL: https://github.com/apache/beam/pull/8715#discussion_r289362422
 
 

 ##
 File path: 
runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapperTest.java
 ##
 @@ -707,4 +780,77 @@ public StreamStatus getStreamStatus() {
   return currentStreamStatus;
 }
   }
+
+  private static class BoundedIntegerSource extends OffsetBasedSource 
{
 
 Review comment:
   I think it is deprecated for the sake of `GenerateSequence` which works on 
`PTransform` level. It uses `CountingSource` internally. Maybe it would make 
sense to be able to use also `source` level from this. But maybe there are not 
so many sources like this in tests
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 251744)
Time Spent: 1.5h  (was: 1h 20m)

> Bounded Reads for Flink Runner fails with OOM
> -
>
> Key: BEAM-7442
> URL: https://issues.apache.org/jira/browse/BEAM-7442
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: Akshay Iyangar
>Assignee: Akshay Iyangar
>Priority: Major
>  Time Spent: 1.5h
>  Remaining Estimate: 0h
>
> When Flink runner is reading from a bounded source and if the total number of 
> files are huge and the count is more. FlinkRunner throws an OOM error. This 
> is happening because the current implementation doesn't read them 
> sequentially but simultaneously thus causing all of the files to be in memory 
> which quickly breaks the cluster.
> Solution : To wrap `UnboundedReadFromBoundedSource` class by a wrapper to see 
> that when the stream is a bounded source we make it read it sequentially 
> using a queue.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-7442) Bounded Reads for Flink Runner fails with OOM

2019-05-31 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7442?focusedWorklogId=251742=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-251742
 ]

ASF GitHub Bot logged work on BEAM-7442:


Author: ASF GitHub Bot
Created on: 31/May/19 12:06
Start Date: 31/May/19 12:06
Worklog Time Spent: 10m 
  Work Description: JozoVilcek commented on pull request #8715: 
[BEAM-7442][BEAM-5650] Read sequentially from bounded sources in 
UnboundedSourceWrapper
URL: https://github.com/apache/beam/pull/8715#discussion_r289361329
 
 

 ##
 File path: 
runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapperTest.java
 ##
 @@ -690,6 +701,68 @@ private static void testSourceDoesNotShutdown(boolean 
shouldHaveReaders) throws
   }
   assertThat(thread.isAlive(), is(false));
 }
+
+@Test
+public void testSequentialReadingFromBoundedSource() throws Exception {
+  UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter source =
+  new UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter<>(
+  new BoundedIntegerSource(0, 1000));
+
+  FlinkPipelineOptions options = 
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+  options.setShutdownSourcesOnFinalWatermark(true);
+
+  UnboundedSourceWrapper 
sourceWrapper =
+  new UnboundedSourceWrapper<>("sequentialRead", options, source, 4);
+  StreamingRuntimeContext runtimeContextMock = 
Mockito.mock(StreamingRuntimeContext.class);
+
+  Mockito.when(runtimeContextMock.getIndexOfThisSubtask()).thenReturn(0);
+  when(runtimeContextMock.getNumberOfParallelSubtasks()).thenReturn(2);
+  when(runtimeContextMock.getExecutionConfig()).thenReturn(new 
ExecutionConfig());
+
+  TestProcessingTimeService processingTimeService = new 
TestProcessingTimeService();
+  processingTimeService.setCurrentTime(0);
+  
when(runtimeContextMock.getProcessingTimeService()).thenReturn(processingTimeService);
+
+  when(runtimeContextMock.getMetricGroup()).thenReturn(new 
UnregisteredMetricsGroup());
+
+  sourceWrapper.setRuntimeContext(runtimeContextMock);
+
+  sourceWrapper.open(new Configuration());
+  assertThat(sourceWrapper.getLocalReaders().size(), is(2));
+
+  List integers = new ArrayList<>();
+  sourceWrapper.run(
+  new 
SourceFunction.SourceContext>>() {
+
+@Override
+public void collect(WindowedValue> 
element) {
+  integers.add(element.getValue().getValue());
+}
+
+@Override
+public void collectWithTimestamp(
+WindowedValue> element, long 
timestamp) {
+  throw new IllegalStateException("Should not collect with 
timestamp");
+}
+
+@Override
+public void emitWatermark(Watermark mark) {}
+
+@Override
+public void markAsTemporarilyIdle() {}
+
+@Override
+public Object getCheckpointLock() {
+  return new Object();
+}
+
+@Override
+public void close() {}
+  });
+
+  // the source is effectively split into two parts
+  assertThat(integers.size(), is(500));
 
 Review comment:
   Aha, I missed 
`Mockito.when(runtimeContextMock.getIndexOfThisSubtask()).thenReturn(0)` .. I 
thought it is going to process both partitions. Thanks
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 251742)
Time Spent: 1h 20m  (was: 1h 10m)

> Bounded Reads for Flink Runner fails with OOM
> -
>
> Key: BEAM-7442
> URL: https://issues.apache.org/jira/browse/BEAM-7442
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: Akshay Iyangar
>Assignee: Akshay Iyangar
>Priority: Major
>  Time Spent: 1h 20m
>  Remaining Estimate: 0h
>
> When Flink runner is reading from a bounded source and if the total number of 
> files are huge and the count is more. FlinkRunner throws an OOM error. This 
> is happening because the current implementation doesn't read them 
> sequentially but simultaneously thus causing all of the files to be in memory 
> which quickly breaks the cluster.
> Solution : To wrap `UnboundedReadFromBoundedSource` class by a wrapper to see 
> that when the stream is a bounded source we make it read it sequentially 
> using a queue.
>  



--
This message was sent by 

[jira] [Work logged] (BEAM-7442) Bounded Reads for Flink Runner fails with OOM

2019-05-31 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7442?focusedWorklogId=251743=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-251743
 ]

ASF GitHub Bot logged work on BEAM-7442:


Author: ASF GitHub Bot
Created on: 31/May/19 12:06
Start Date: 31/May/19 12:06
Worklog Time Spent: 10m 
  Work Description: JozoVilcek commented on pull request #8715: 
[BEAM-7442][BEAM-5650] Read sequentially from bounded sources in 
UnboundedSourceWrapper
URL: https://github.com/apache/beam/pull/8715#discussion_r289360958
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
 ##
 @@ -217,34 +227,33 @@ public void 
run(SourceContext>> ctx) th
   // through to idle this executor.
   LOG.info("Number of readers is 0 for this task executor, idle");
   // Do nothing here but still execute the rest of the source logic
-} else if (localReaders.size() == 1) {
-  // the easy case, we just read from one reader
-  UnboundedSource.UnboundedReader reader = localReaders.get(0);
-
-  synchronized (ctx.getCheckpointLock()) {
-boolean dataAvailable = readerInvoker.invokeStart(reader);
-if (dataAvailable) {
-  emitElement(ctx, reader);
-}
-  }
-
+} else if (isConvertedBoundedSource) {
   setNextWatermarkTimer(this.runtimeContext);
 
-  while (isRunning) {
-boolean dataAvailable;
-synchronized (ctx.getCheckpointLock()) {
-  dataAvailable = readerInvoker.invokeAdvance(reader);
+  // We read sequentially from all bounded sources
 
 Review comment:
   Cool. Thanks for the reference
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 251743)
Time Spent: 1.5h  (was: 1h 20m)

> Bounded Reads for Flink Runner fails with OOM
> -
>
> Key: BEAM-7442
> URL: https://issues.apache.org/jira/browse/BEAM-7442
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: Akshay Iyangar
>Assignee: Akshay Iyangar
>Priority: Major
>  Time Spent: 1.5h
>  Remaining Estimate: 0h
>
> When Flink runner is reading from a bounded source and if the total number of 
> files are huge and the count is more. FlinkRunner throws an OOM error. This 
> is happening because the current implementation doesn't read them 
> sequentially but simultaneously thus causing all of the files to be in memory 
> which quickly breaks the cluster.
> Solution : To wrap `UnboundedReadFromBoundedSource` class by a wrapper to see 
> that when the stream is a bounded source we make it read it sequentially 
> using a queue.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-7018) Regex transform for Python SDK

2019-05-31 Thread Shoaib Zafar (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-7018?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16852916#comment-16852916
 ] 

Shoaib Zafar commented on BEAM-7018:


[~shehzaadn] . This task is in progress.

> Regex transform for Python SDK
> --
>
> Key: BEAM-7018
> URL: https://issues.apache.org/jira/browse/BEAM-7018
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: Rose Nguyen
>Assignee: Shehzaad Nakhoda
>Priority: Minor
>
> PTransorms to use Regular Expressions to process elements in a PCollection
> It should offer the same API as its Java counterpart: 
> [https://github.com/apache/beam/blob/11a977b8b26eff2274d706541127c19dc93131a2/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Regex.java]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-7442) Bounded Reads for Flink Runner fails with OOM

2019-05-31 Thread Maximilian Michels (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-7442?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16853077#comment-16853077
 ] 

Maximilian Michels commented on BEAM-7442:
--

I didn't know you wanted to work on this. Let me know if the fix works for you 
or if you have any difficulties. You should be able to assign yourself to other 
issues now.

> Bounded Reads for Flink Runner fails with OOM
> -
>
> Key: BEAM-7442
> URL: https://issues.apache.org/jira/browse/BEAM-7442
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: Akshay Iyangar
>Assignee: Akshay Iyangar
>Priority: Major
>  Time Spent: 1.5h
>  Remaining Estimate: 0h
>
> When Flink runner is reading from a bounded source and if the total number of 
> files are huge and the count is more. FlinkRunner throws an OOM error. This 
> is happening because the current implementation doesn't read them 
> sequentially but simultaneously thus causing all of the files to be in memory 
> which quickly breaks the cluster.
> Solution : To wrap `UnboundedReadFromBoundedSource` class by a wrapper to see 
> that when the stream is a bounded source we make it read it sequentially 
> using a queue.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-7357) Kinesis IO.write throws LimitExceededException

2019-05-31 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7357?focusedWorklogId=251817=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-251817
 ]

ASF GitHub Bot logged work on BEAM-7357:


Author: ASF GitHub Bot
Created on: 31/May/19 14:42
Start Date: 31/May/19 14:42
Worklog Time Spent: 10m 
  Work Description: aromanenko-dev commented on pull request #8730: 
[BEAM-7357] KinesisIO: fix too many checks that writing stream exists.
URL: https://github.com/apache/beam/pull/8730
 
 
   The operation `DescribeStream` has a limit of 10 transactions per second per 
account. So, `streamExists()` was moved to `Write.expand()`.
   
   
   
   Thank you for your contribution! Follow this checklist to help us 
incorporate your contribution quickly and easily:
   
- [x] [**Choose 
reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and 
mention them in a comment (`R: @username`).
- [x] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
 | --- | --- | --- | --- | --- | ---
   Java | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)
   Python | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Python3_Verify/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python3_Verify/lastCompletedBuild/)
 | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)
  [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/)
 | --- | --- | ---
   
   Pre-Commit Tests Status (on master branch)
   

   

[jira] [Work logged] (BEAM-7305) Add first version of Hazelcast Jet Runner

2019-05-31 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7305?focusedWorklogId=251704=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-251704
 ]

ASF GitHub Bot logged work on BEAM-7305:


Author: ASF GitHub Bot
Created on: 31/May/19 11:12
Start Date: 31/May/19 11:12
Worklog Time Spent: 10m 
  Work Description: mxm commented on issue #8699: [BEAM-7305] Fix issues in 
and extend documentation for Hazelcast Jet Runner 
URL: https://github.com/apache/beam/pull/8699#issuecomment-497672150
 
 
   Could you squash at least the spotless fixup commits? I'll merge then.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 251704)
Time Spent: 8h  (was: 7h 50m)

> Add first version of Hazelcast Jet Runner
> -
>
> Key: BEAM-7305
> URL: https://issues.apache.org/jira/browse/BEAM-7305
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-jet
>Reporter: Maximilian Michels
>Assignee: Jozsef Bartok
>Priority: Major
> Fix For: 2.14.0
>
>  Time Spent: 8h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (BEAM-7357) Kinesis IO.write throws LimitExceededException

2019-05-31 Thread Alexey Romanenko (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-7357?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16852940#comment-16852940
 ] 

Alexey Romanenko edited comment on BEAM-7357 at 5/31/19 12:49 PM:
--

Yes, I'll fix this. Thanks for report and chasing this issue!


was (Author: aromanenko):
Yes, I'll fix this. Thanks for the report and chasing this issue!

> Kinesis IO.write throws LimitExceededException
> --
>
> Key: BEAM-7357
> URL: https://issues.apache.org/jira/browse/BEAM-7357
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-kinesis
>Affects Versions: 2.11.0
>Reporter: Brachi Packter
>Assignee: Alexey Romanenko
>Priority: Major
>
> I used Kinesis IO to write to kinesis. I get very quickly many exceptions 
> like:
> [shard_map.cc:150] Shard map update for stream "***" failed. Code: 
> LimitExceededException Message: Rate exceeded for stream *** under account 
> ***; retrying in ..
> Also, I see many exceptions like:
> Caused by: java.lang.IllegalArgumentException: Stream ** does not exist at 
> org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument(Preconditions.java:191)
>  at 
> org.apache.beam.sdk.io.kinesis.KinesisIO$Write$KinesisWriterFn.setup(KinesisIO.java:515)
> I'm sure this stream exists because I can see some data from my pipeline that 
> was successfully ingested to it.
>  
> Here is my code:
>  
>  
> {code:java}
> .apply(KinesisIO.write()
>        .withStreamName("**")
>        .withPartitioner(new KinesisPartitioner() {
>                        @Override
>                         public String getPartitionKey(byte[] value) {
>                                         return UUID.randomUUID().toString()
>                          }
>                         @Override
>                         public String getExplicitHashKey(byte[] value) {
>                                         return null;
>                         }
>        })
>.withAWSClientsProvider("**","***",Regions.US_EAST_1));{code}
>  
> I tried to not use the Kinesis IO. and everything works well, I can't figure 
> out what went wrong.
> I tried using the same API as the library did.
>  
> {code:java}
> .apply(
>  ParDo.of(new DoFn() {
>  private transient IKinesisProducer inlineProducer;
>  @Setup
>  public void setup(){
>  KinesisProducerConfiguration config =   
> KinesisProducerConfiguration.fromProperties(new Properties());
>  config.setRegion(Regions.US_EAST_1.getName());
>  config.setCredentialsProvider(new AWSStaticCredentialsProvider(new 
> BasicAWSCredentials("***", "***")));
>  inlineProducer = new KinesisProducer(config);
>  }
>  @ProcessElement
>  public void processElement(ProcessContext c) throws Exception {
> ByteBuffer data = ByteBuffer.wrap(c.element());
> String partitionKey =UUID.randomUUID().toString();
> ListenableFuture f =
> getProducer().addUserRecord("***", partitionKey, data);
>Futures.addCallback(f, new UserRecordResultFutureCallback());
> }
>  class UserRecordResultFutureCallback implements 
> FutureCallback {
>  @Override
>  public void onFailure(Throwable cause) {
>throw new RuntimeException("failed produce:"+cause);
>  }
>  @Override
>  public void onSuccess(UserRecordResult result) {
>  }
>  }
>  })
>  );
>  
> {code}
>  
> Any idea what I did wrong? or what the error in the KinesisIO?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (BEAM-7357) Kinesis IO.write throws LimitExceededException

2019-05-31 Thread Alexey Romanenko (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-7357?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16852940#comment-16852940
 ] 

Alexey Romanenko edited comment on BEAM-7357 at 5/31/19 12:48 PM:
--

Yes, I'll fix this. Thanks for the report and chasing this issue!


was (Author: aromanenko):
Yes, I'll fix this. Thanks for the report!

> Kinesis IO.write throws LimitExceededException
> --
>
> Key: BEAM-7357
> URL: https://issues.apache.org/jira/browse/BEAM-7357
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-kinesis
>Affects Versions: 2.11.0
>Reporter: Brachi Packter
>Assignee: Alexey Romanenko
>Priority: Major
>
> I used Kinesis IO to write to kinesis. I get very quickly many exceptions 
> like:
> [shard_map.cc:150] Shard map update for stream "***" failed. Code: 
> LimitExceededException Message: Rate exceeded for stream *** under account 
> ***; retrying in ..
> Also, I see many exceptions like:
> Caused by: java.lang.IllegalArgumentException: Stream ** does not exist at 
> org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument(Preconditions.java:191)
>  at 
> org.apache.beam.sdk.io.kinesis.KinesisIO$Write$KinesisWriterFn.setup(KinesisIO.java:515)
> I'm sure this stream exists because I can see some data from my pipeline that 
> was successfully ingested to it.
>  
> Here is my code:
>  
>  
> {code:java}
> .apply(KinesisIO.write()
>        .withStreamName("**")
>        .withPartitioner(new KinesisPartitioner() {
>                        @Override
>                         public String getPartitionKey(byte[] value) {
>                                         return UUID.randomUUID().toString()
>                          }
>                         @Override
>                         public String getExplicitHashKey(byte[] value) {
>                                         return null;
>                         }
>        })
>.withAWSClientsProvider("**","***",Regions.US_EAST_1));{code}
>  
> I tried to not use the Kinesis IO. and everything works well, I can't figure 
> out what went wrong.
> I tried using the same API as the library did.
>  
> {code:java}
> .apply(
>  ParDo.of(new DoFn() {
>  private transient IKinesisProducer inlineProducer;
>  @Setup
>  public void setup(){
>  KinesisProducerConfiguration config =   
> KinesisProducerConfiguration.fromProperties(new Properties());
>  config.setRegion(Regions.US_EAST_1.getName());
>  config.setCredentialsProvider(new AWSStaticCredentialsProvider(new 
> BasicAWSCredentials("***", "***")));
>  inlineProducer = new KinesisProducer(config);
>  }
>  @ProcessElement
>  public void processElement(ProcessContext c) throws Exception {
> ByteBuffer data = ByteBuffer.wrap(c.element());
> String partitionKey =UUID.randomUUID().toString();
> ListenableFuture f =
> getProducer().addUserRecord("***", partitionKey, data);
>Futures.addCallback(f, new UserRecordResultFutureCallback());
> }
>  class UserRecordResultFutureCallback implements 
> FutureCallback {
>  @Override
>  public void onFailure(Throwable cause) {
>throw new RuntimeException("failed produce:"+cause);
>  }
>  @Override
>  public void onSuccess(UserRecordResult result) {
>  }
>  }
>  })
>  );
>  
> {code}
>  
> Any idea what I did wrong? or what the error in the KinesisIO?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-5995) Create Jenkins jobs to run the load tests

2019-05-31 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5995?focusedWorklogId=251815=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-251815
 ]

ASF GitHub Bot logged work on BEAM-5995:


Author: ASF GitHub Bot
Created on: 31/May/19 14:40
Start Date: 31/May/19 14:40
Worklog Time Spent: 10m 
  Work Description: kkucharc commented on pull request #8664: [BEAM-5995] 
Add hot key to Python Synthetic Sources and use it in Load Tests
URL: https://github.com/apache/beam/pull/8664#discussion_r289418456
 
 

 ##
 File path: sdks/python/apache_beam/testing/synthetic_pipeline.py
 ##
 @@ -238,13 +241,25 @@ def get_range_tracker(self, start_position, 
stop_position):
   tracker = range_trackers.UnsplittableRangeTracker(tracker)
 return tracker
 
+  def _gen_kv_pair(self, index):
+r = np.random.RandomState(index)
+rand = r.random_sample()
+
+# Determines whether to generate hot key or not.
+if rand < self._hot_key_fraction:
+  # Generate hot key.
+  # An integer is randomly selected from the range [0, numHotKeys-1]
+  # with equal probability.
+  r_hot = np.random.RandomState(self._num_hot_keys)
 
 Review comment:
   That totally makes sense, thanks a lot! And it also answers my second 
comment here, I guess :) I'll change it this way.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 251815)
Time Spent: 50.5h  (was: 50h 20m)

> Create Jenkins jobs to run the load tests
> -
>
> Key: BEAM-5995
> URL: https://issues.apache.org/jira/browse/BEAM-5995
> Project: Beam
>  Issue Type: Sub-task
>  Components: testing
>Reporter: Kasia Kucharczyk
>Assignee: Kasia Kucharczyk
>Priority: Major
>  Time Spent: 50.5h
>  Remaining Estimate: 0h
>
> (/) Add SMOKE test 
>  Add GBK load tests.
> Add CoGBK load tests.
> Add Pardo load tests.
> Add SideInput tests.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (BEAM-7021) ToString transform for Python SDK

2019-05-31 Thread Ahmet Altay (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7021?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ahmet Altay resolved BEAM-7021.
---
   Resolution: Fixed
Fix Version/s: 2.14.0

> ToString transform for Python SDK
> -
>
> Key: BEAM-7021
> URL: https://issues.apache.org/jira/browse/BEAM-7021
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: Rose Nguyen
>Assignee: Shehzaad Nakhoda
>Priority: Minor
>  Labels: starter
> Fix For: 2.14.0
>
>  Time Spent: 2.5h
>  Remaining Estimate: 0h
>
> PTransforms for converting a PCollection or  PCollection
> Iterable to a PCollection String
> It should offer the same API as its Java counterpart: 
> [https://github.com/apache/beam/blob/11a977b8b26eff2274d706541127c19dc93131a2/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ToString.java]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-7462) Add Sampled Byte Count Metric to the Java SDK

2019-05-31 Thread Alex Amato (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-7462?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16853197#comment-16853197
 ] 

Alex Amato commented on BEAM-7462:
--

Here is a dump of the bundle descriptor, showing the code and pcollectionId 
which trigger the issue.

[^bundle_descriptor_dump.txt][^bundle_descriptor_dump.txt]

 

} transforms {
 key: "generatedPtransform-26" value {
 spec {
 *urn: "beam:transform:pardo:v1" payload:* 
"\n\356\004\n\327\004\n\025beam:dofn:javasdk:0.1\032\275\004\202SNAPPY\000\000\000\000\001\000\000\000\001\000\000\002)\347\006\360a\254\355\000\005sr\000?org.apache.beam.sdk.util.AutoValue_DoFnWithExecutionInformationo_\336e?\346.{
\002\000\003L\000\004doFnt\000%Lorg/a\005U\000/\001U8\000\024;xr\0005o^\002\001\001\247^\370\000D9\003c\264\241\274\276\220\002\000\000xpsr\000CoJF\000\031\363\360<.Reshuffle$ViaRandomKey$AssignShardFn\200\363Y\322\244\270\274\224\002\000\001I\000\005shardxr\000#ovZ\DoFn\254D#\006\316\223\275\235\002\001\216\020\000\000\000\000sZ8\000)M\000.1M\330\263\030yf[\300z\265\002\000\002Z\000\tgeneratedL\000\002idt\000\022Ljava/lang/String;xp\001t\000BN\354\000\rZ\250PCollection.:402#b70c45c110743c2bsr\000>NF\000=2\004Au.\200\002\000S>\273\001\210\3425\313\323\267`\232\254\002\000\001L\000\021elementConverterst\000\020L\005\2764util/List;xr\0004zt\000!LBj\000\034\257\325QB\327
}\001W%]\fsr\000\037\001Xi$\031\355\030s$Empty\001i0z\270\027\264<\247\236\336\002\000\000xp\022\022beam:env:docker:v1"
 
} inputs {
 key: "generatedInput-27" value: "generatedPcollection-19" 
} outputs {
 *key: "org.apache.beam.sdk.values.PCollection.:402#b70c45c110743c2b" 
value: "generatedPcollection-23"* 
}

 

} transforms {
 key: "-14" value {
 spec {
 *urn: "beam:sink:runner:0.1" payload: "\n\021\022\017localhost:12371"* 
} inputs {
 *key: "-13" value: "generatedPcollection-23"* 
}

 

 

sample with pCollectionId: *generatedPcollection-203* pColl.getName(): 
generatedPcollection-203 Coder: 
*org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder*

 

{
 insertId: "2018788043244039965:565032:0:164342" 
 jsonPayload: {
 exception: "java.lang.ClassCastException: [B cannot be cast to 
org.apache.beam.sdk.util.WindowedValue at 
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:480)
 at 
org.apache.beam.fn.harness.data.ElementCountFnDataReceiver.accept(ElementCountFnDataReceiver.java:113)
 at 
org.apache.beam.fn.harness.data.ElementCountFnDataReceiver.accept(ElementCountFnDataReceiver.java:45)
 at 
org.apache.beam.fn.harness.data.QueueingBeamFnDataClient.drainAndBlock(QueueingBeamFnDataClient.java:108)
 at 
org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:332)
 at 
org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:160)
 at 
org.apache.beam.fn.harness.control.BeamFnControlClient.lambda$processInstructionRequests$0(BeamFnControlClient.java:144)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
at java.lang.Thread.run(Thread.java:748) " 
 job: "2019-05-30_14_57_35-14659722017552460993" 
 logger: "org.apache.beam.fn.harness.control.BeamFnControlClient" 
 message: "Exception while trying to handle InstructionRequest -212" 
 portability_worker_id: "1" 
 thread: "17" 
 worker: "testpipeline-jenkins-0530-05301457-7v4d-harness-bbc4" }
 labels: {…} 
 logName: "projects/apache-beam-testing/logs/dataflow.googleapis.com%2Fworker" 
 receiveTimestamp: "2019-05-30T22:01:04.911873820Z" 
 resource: {…} 
 severity: "ERROR" 
 timestamp: "2019-05-30T22:00:55.189Z" 
}

> Add Sampled Byte Count Metric to the Java SDK
> -
>
> Key: BEAM-7462
> URL: https://issues.apache.org/jira/browse/BEAM-7462
> Project: Beam
>  Issue Type: New Feature
>  Components: java-fn-execution
>Reporter: Alex Amato
>Priority: Major
> Attachments: bundle_descriptor_dump.txt
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (BEAM-7462) Add Sampled Byte Count Metric to the Java SDK

2019-05-31 Thread Alex Amato (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-7462?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16853197#comment-16853197
 ] 

Alex Amato edited comment on BEAM-7462 at 5/31/19 4:43 PM:
---

Here is a dump of the bundle descriptor, showing the code and pcollectionId 
which trigger the issue.

[^bundle_descriptor_dump.txt][^bundle_descriptor_dump.txt]

 

} transforms {
 key: "generatedPtransform-26" value {
 spec {
 *urn: "beam:transform:pardo:v1" payload:* 
"\n\356\004\n\327\004\n\025beam:dofn:javasdk:0.1\032\275\004\202SNAPPY\000\000\000\000\001\000\000\000\001\000\000\002)\347\006\360a\254\355\000\005sr\000?org.apache.beam.sdk.util.AutoValue_DoFnWithExecutionInformationo_\336e?\346.

{ 
\002\000\003L\000\004doFnt\000%Lorg/a\005U\000/\001U8\000\024;xr\0005o^\002\001\001\247^\370\000D9\003c\264\241\274\276\220\002\000\000xpsr\000CoJF\000\031\363\360<.Reshuffle$ViaRandomKey$AssignShardFn\200\363Y\322\244\270\274\224\002\000\001I\000\005shardxr\000#ovZ\DoFn\254D#\006\316\223\275\235\002\001\216\020\000\000\000\000sZ8\000)M\000.1M\330\263\030yf[\300z\265\002\000\002Z\000\tgeneratedL\000\002idt\000\022Ljava/lang/String;xp\001t\000BN\354\000\rZ\250PCollection.:402#b70c45c110743c2bsr\000>NF\000=2\004Au.\200\002\000S>\273\001\210\3425\313\323\267`\232\254\002\000\001L\000\021elementConverterst\000\020L\005\2764util/List;xr\0004zt\000!LBj\000\034\257\325QB\327
 }

\001W%]\fsr\000\037\001Xi$\031\355\030s$Empty\001i0z\270\027\264<\247\236\336\002\000\000xp\022\022beam:env:docker:v1"
 
 } inputs

{ key: "generatedInput-27" value: "generatedPcollection-19" }

outputs

{ *key: "org.apache.beam.sdk.values.PCollection.:402#b70c45c110743c2b" 
value: "*generatedPcollection-23*"* }

 

} transforms {
 key: "-14" value {
 spec

{ *urn: "beam:sink:runner:0.1" payload: "\n\021\022\017localhost:12371"* }

inputs

{ *key: "-13" value: "*generatedPcollection-23*"* }

 

} pcollections {
 key: "*generatedPcollection-23*" value {
 coder_id: "*generatedCoder-22*" is_bounded: BOUNDED windowing_strategy_id: 
"fakeWindowingStrategy-17" 
}

 

} 
} coders {
 *key: "generatedCoder-22"* value {
 spec {
 *urn: "beam:coder:windowed_value:v1"* 
} component_coder_ids: "KvCoder14" component_coder_ids: "Coder1" 
}

 

 

sample with pCollectionId: *generatedPcollection-203* pColl.getName(): 
generatedPcollection-203 Coder: 
*org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder*

 

{
 insertId: "2018788043244039965:565032:0:164342" 
 jsonPayload: 

{ exception: "java.lang.ClassCastException: [B cannot be cast to 
org.apache.beam.sdk.util.WindowedValue at 
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:480)
 at 
org.apache.beam.fn.harness.data.ElementCountFnDataReceiver.accept(ElementCountFnDataReceiver.java:113)
 at 
org.apache.beam.fn.harness.data.ElementCountFnDataReceiver.accept(ElementCountFnDataReceiver.java:45)
 at 
org.apache.beam.fn.harness.data.QueueingBeamFnDataClient.drainAndBlock(QueueingBeamFnDataClient.java:108)
 at 
org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:332)
 at 
org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:160)
 at 
org.apache.beam.fn.harness.control.BeamFnControlClient.lambda$processInstructionRequests$0(BeamFnControlClient.java:144)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
at java.lang.Thread.run(Thread.java:748) "  job: 
"2019-05-30_14_57_35-14659722017552460993"  logger: 
"org.apache.beam.fn.harness.control.BeamFnControlClient"  message: "Exception 
while trying to handle InstructionRequest -212"  portability_worker_id: "1"  
thread: "17"  worker: "testpipeline-jenkins-0530-05301457-7v4d-harness-bbc4" }

labels: 

{…} 
 logName: "projects/apache-beam-testing/logs/dataflow.googleapis.com%2Fworker" 
 receiveTimestamp: "2019-05-30T22:01:04.911873820Z" 
 resource: \{…}

 
 severity: "ERROR" 
 timestamp: "2019-05-30T22:00:55.189Z" 
 }


was (Author: ajam...@google.com):
Here is a dump of the bundle descriptor, showing the code and pcollectionId 
which trigger the issue.

[^bundle_descriptor_dump.txt][^bundle_descriptor_dump.txt]

 

} transforms {
 key: "generatedPtransform-26" value {
 spec {
 *urn: "beam:transform:pardo:v1" payload:* 
"\n\356\004\n\327\004\n\025beam:dofn:javasdk:0.1\032\275\004\202SNAPPY\000\000\000\000\001\000\000\000\001\000\000\002)\347\006\360a\254\355\000\005sr\000?org.apache.beam.sdk.util.AutoValue_DoFnWithExecutionInformationo_\336e?\346.{

[jira] [Updated] (BEAM-7466) Remove the enable_health_checker flag after Python SDK 2.13 becomes unsupported

2019-05-31 Thread Yueyang Qiu (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7466?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yueyang Qiu updated BEAM-7466:
--
Description: [PR8681|https://github.com/apache/beam/pull/8681] introduced a 
new flag enable_health_checker to allow Python streaming jobs running SDK 
versions <= 2.13 to run on Dataflow. Once Python SDK 2.13 becomes unsupported, 
this flag can be cleaned up, and Dataflow will always enable health checker by 
default.  (was: [PR8681|https://github.com/apache/beam/pull/8681] introduced a 
new flag `enable_health_checker` to allow Python streaming jobs running SDK 
versions <= 2.13 to run on Dataflow. Once Python SDK 2.13 becomes unsupported, 
this flag can be cleaned up, and Dataflow will always enable health checker by 
default.)

> Remove the enable_health_checker flag after Python SDK 2.13 becomes 
> unsupported
> ---
>
> Key: BEAM-7466
> URL: https://issues.apache.org/jira/browse/BEAM-7466
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-dataflow
>Reporter: Yueyang Qiu
>Assignee: Yueyang Qiu
>Priority: Minor
>
> [PR8681|https://github.com/apache/beam/pull/8681] introduced a new flag 
> enable_health_checker to allow Python streaming jobs running SDK versions <= 
> 2.13 to run on Dataflow. Once Python SDK 2.13 becomes unsupported, this flag 
> can be cleaned up, and Dataflow will always enable health checker by default.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (BEAM-7464) Python transform catalog

2019-05-31 Thread Rose Nguyen (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7464?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rose Nguyen closed BEAM-7464.
-
   Resolution: Fixed
Fix Version/s: Not applicable

> Python transform catalog
> 
>
> Key: BEAM-7464
> URL: https://issues.apache.org/jira/browse/BEAM-7464
> Project: Beam
>  Issue Type: Improvement
>  Components: website
>Reporter: Rose Nguyen
>Assignee: Rose Nguyen
>Priority: Minor
> Fix For: Not applicable
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> Create catalog of core transforms (Python)
> -Links to pydocs
> -Brief description
> -Related transforms
> -Links to programming guide
> -Examples section to integrate Colab notebooks



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-6356) Python FileBasedCacheManager does not respect PCoder for PCollection being cached

2019-05-31 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6356?focusedWorklogId=251903=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-251903
 ]

ASF GitHub Bot logged work on BEAM-6356:


Author: ASF GitHub Bot
Created on: 31/May/19 17:42
Start Date: 31/May/19 17:42
Worklog Time Spent: 10m 
  Work Description: ostrokach commented on issue #8687: [BEAM-6356] Add the 
option to use TFRecord to store cache results using PCollection's PCoder
URL: https://github.com/apache/beam/pull/8687#issuecomment-497799053
 
 
   retest this please
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 251903)
Time Spent: 2h 50m  (was: 2h 40m)
Remaining Estimate: 165h 10m  (was: 165h 20m)

> Python  FileBasedCacheManager does not respect PCoder for PCollection being 
> cached
> --
>
> Key: BEAM-6356
> URL: https://issues.apache.org/jira/browse/BEAM-6356
> Project: Beam
>  Issue Type: Improvement
>  Components: examples-python
>Reporter: Hennadiy Leontyev
>Assignee: Qinye Li
>Priority: Minor
>   Original Estimate: 168h
>  Time Spent: 2h 50m
>  Remaining Estimate: 165h 10m
>
> FileBasedCacheManager used by Python's InteractiveRunner does not preserve 
> PCoder for elements of a PCollection being cached on disk. I suggest that the 
> cache on-disk format to be changed to TFRecords (which are supported by Beam) 
> and FileBasedCacheManager would store the desired PCoder for cached 
> collections.
> Currently, it is not possible to work with dynamically-generated protocol 
> buffer messages in interactive runner mode because of pickling errors.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-7402) Add a performance test for BigQueryIO read

2019-05-31 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7402?focusedWorklogId=251837=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-251837
 ]

ASF GitHub Bot logged work on BEAM-7402:


Author: ASF GitHub Bot
Created on: 31/May/19 15:25
Start Date: 31/May/19 15:25
Worklog Time Spent: 10m 
  Work Description: kkucharc commented on pull request #8675: [BEAM-7402] 
BigQuery IO read performance tests 
URL: https://github.com/apache/beam/pull/8675#discussion_r289374033
 
 

 ##
 File path: sdks/python/apache_beam/io/gcp/bigquery_io_read_pipeline.py
 ##
 @@ -14,66 +14,135 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
-
-"""A Dataflow job that counts the number of rows in a BQ table.
-
-   Can be configured to simulate slow reading for a given number of rows.
+"""
+A pipeline that reads data from a BigQuery table and counts the number of
+rows.
+
+Besides of the standard options, there are options with special meaning:
+* input_dataset - BQ dataset id.
+* input_table - BQ table id.
+The table will be created and populated with data from Synthetic Source if it
+does not exist.
+* input_options - options for Synthetic Source:
+num_records - number of rows to be inserted,
+value_size - the length of a single row,
+key_size - required option, but its value has no meaning,
+* num_slow - an integer in range [0,100] used to customize slow reading
+simulation.
+
+Example test run on DataflowRunner:
+
+python setup.py nosetests \
+--test-pipeline-options="
+--runner=TestDataflowRunner
+--project=...
+--staging_location=gs://...
+--temp_location=gs://...
+--sdk_location=.../dist/apache-beam-x.x.x.dev0.tar.gz
+--input_dataset=...
+--input_table=...
+--input_options='{
+\"num_records\": 1024,
+\"key_size\": 1,
+\"value_size\": 1024,
+}'" \
+--tests apache_beam.io.gcp.tests.bigquery_io_read_pipeline
 """
 
 from __future__ import absolute_import
 
-import argparse
+import base64
 import logging
 import random
 import time
-
-import apache_beam as beam
-from apache_beam.options.pipeline_options import PipelineOptions
+import unittest
+
+from apache_beam import DoFn
+from apache_beam import Map
+from apache_beam import ParDo
+from apache_beam.io import BigQueryDisposition
+from apache_beam.io import BigQuerySource
+from apache_beam.io import Read
+from apache_beam.io import WriteToBigQuery
+from apache_beam.io.gcp.bigquery_tools import BigQueryWrapper
+from apache_beam.io.gcp.bigquery_tools import parse_table_schema_from_json
+from apache_beam.testing.load_tests.load_test import LoadTest
+from apache_beam.testing.synthetic_pipeline import SyntheticSource
 from apache_beam.testing.test_pipeline import TestPipeline
 from apache_beam.testing.util import assert_that
 from apache_beam.testing.util import equal_to
-
-
-class RowToStringWithSlowDown(beam.DoFn):
 
 Review comment:
   Why did you decided to change a non-test pipeline into a test?  Did you 
check if it won't have its consequences?
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 251837)
Time Spent: 50m  (was: 40m)

> Add a performance test for BigQueryIO read
> --
>
> Key: BEAM-7402
> URL: https://issues.apache.org/jira/browse/BEAM-7402
> Project: Beam
>  Issue Type: Test
>  Components: testing
>Reporter: Kamil Wasilewski
>Assignee: Kamil Wasilewski
>Priority: Major
>  Time Spent: 50m
>  Remaining Estimate: 0h
>
> The task is to add a performance test for BigQuery IO read with configurable 
> data size.
> The plan is:
> 1) Setup BQ table with input data if needed,
> 2) Read data from that table and do some final assertions
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-7402) Add a performance test for BigQueryIO read

2019-05-31 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7402?focusedWorklogId=251836=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-251836
 ]

ASF GitHub Bot logged work on BEAM-7402:


Author: ASF GitHub Bot
Created on: 31/May/19 15:25
Start Date: 31/May/19 15:25
Worklog Time Spent: 10m 
  Work Description: kkucharc commented on pull request #8675: [BEAM-7402] 
BigQuery IO read performance tests 
URL: https://github.com/apache/beam/pull/8675#discussion_r289436792
 
 

 ##
 File path: sdks/python/apache_beam/io/gcp/bigquery_io_read_pipeline.py
 ##
 @@ -14,66 +14,135 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
-
-"""A Dataflow job that counts the number of rows in a BQ table.
-
-   Can be configured to simulate slow reading for a given number of rows.
+"""
+A pipeline that reads data from a BigQuery table and counts the number of
+rows.
+
+Besides of the standard options, there are options with special meaning:
+* input_dataset - BQ dataset id.
+* input_table - BQ table id.
+The table will be created and populated with data from Synthetic Source if it
+does not exist.
+* input_options - options for Synthetic Source:
+num_records - number of rows to be inserted,
+value_size - the length of a single row,
+key_size - required option, but its value has no meaning,
+* num_slow - an integer in range [0,100] used to customize slow reading
+simulation.
+
+Example test run on DataflowRunner:
+
+python setup.py nosetests \
+--test-pipeline-options="
+--runner=TestDataflowRunner
+--project=...
+--staging_location=gs://...
+--temp_location=gs://...
+--sdk_location=.../dist/apache-beam-x.x.x.dev0.tar.gz
+--input_dataset=...
+--input_table=...
+--input_options='{
+\"num_records\": 1024,
+\"key_size\": 1,
+\"value_size\": 1024,
+}'" \
+--tests apache_beam.io.gcp.tests.bigquery_io_read_pipeline
 """
 
 from __future__ import absolute_import
 
-import argparse
+import base64
 import logging
 import random
 import time
-
-import apache_beam as beam
-from apache_beam.options.pipeline_options import PipelineOptions
+import unittest
+
+from apache_beam import DoFn
+from apache_beam import Map
+from apache_beam import ParDo
+from apache_beam.io import BigQueryDisposition
+from apache_beam.io import BigQuerySource
+from apache_beam.io import Read
+from apache_beam.io import WriteToBigQuery
+from apache_beam.io.gcp.bigquery_tools import BigQueryWrapper
+from apache_beam.io.gcp.bigquery_tools import parse_table_schema_from_json
+from apache_beam.testing.load_tests.load_test import LoadTest
+from apache_beam.testing.synthetic_pipeline import SyntheticSource
 from apache_beam.testing.test_pipeline import TestPipeline
 from apache_beam.testing.util import assert_that
 from apache_beam.testing.util import equal_to
-
-
-class RowToStringWithSlowDown(beam.DoFn):
-
-  def process(self, element, num_slow=0, *args, **kwargs):
-
-if num_slow == 0:
-  yield ['row']
-else:
-  rand = random.random() * 100
-  if rand < num_slow:
-time.sleep(0.01)
-yield ['slow_row']
-  else:
+from apache_beam.transforms.combiners import Count
+
+
+class BigQueryIOReadTest(LoadTest):
+  def setUp(self):
+super(BigQueryIOReadTest, self).setUp()
+self.num_slow = self.pipeline.get_option('num_slow') or 0
+self.input_dataset = self.pipeline.get_option('input_dataset')
+self.input_table = self.pipeline.get_option('input_table')
+self._check_for_input_data()
+
+  def _check_for_input_data(self):
+"""Checks if a BQ table with input data exists and creates it if not."""
+wrapper = BigQueryWrapper()
+try:
+  wrapper.get_table(self.project_id, self.input_dataset, self.input_table)
+except Exception:
+  self._create_input_data()
+
+  def _create_input_data(self):
+"""
+Runs an additional pipeline which creates test data and waits for its
+completion.
+"""
+SCHEMA = parse_table_schema_from_json(
+'{"fields": [{"name": "data", "type": "BYTES"}]}')
+
+def format_record(record):
+  # Since Syntentic Source returns data as a dictionary. we should skip
+  # one of the part
+  return {'data': base64.b64encode(record[1])}
+
+p = TestPipeline()
+# pylint: disable=expression-not-assigned
+(p
+ | 'Produce rows' >> Read(SyntheticSource(self.parseTestPipelineOptions()))
+ | 'Format' >> Map(format_record)
+ | 'Write to BigQuery' >> WriteToBigQuery(
+ self.input_dataset + '.' + self.input_table,
+ schema=SCHEMA,
+ create_disposition=BigQueryDisposition.CREATE_IF_NEEDED,
+ write_disposition=BigQueryDisposition.WRITE_EMPTY))
+p.run().wait_until_finish()
+
+  class RowToStringWithSlowDown(DoFn):
+def process(self, element, num_slow=0, *args, **kwargs):
+

[jira] [Closed] (BEAM-7442) Bounded Reads for Flink Runner fails with OOM

2019-05-31 Thread Maximilian Michels (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7442?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels closed BEAM-7442.

   Resolution: Fixed
Fix Version/s: 2.14.0

> Bounded Reads for Flink Runner fails with OOM
> -
>
> Key: BEAM-7442
> URL: https://issues.apache.org/jira/browse/BEAM-7442
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: Akshay Iyangar
>Assignee: Akshay Iyangar
>Priority: Major
> Fix For: 2.14.0
>
>  Time Spent: 2h 10m
>  Remaining Estimate: 0h
>
> When Flink runner is reading from a bounded source and if the total number of 
> files are huge and the count is more. FlinkRunner throws an OOM error. This 
> is happening because the current implementation doesn't read them 
> sequentially but simultaneously thus causing all of the files to be in memory 
> which quickly breaks the cluster.
> Solution : To wrap `UnboundedReadFromBoundedSource` class by a wrapper to see 
> that when the stream is a bounded source we make it read it sequentially 
> using a queue.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-7442) Bounded Reads for Flink Runner fails with OOM

2019-05-31 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7442?focusedWorklogId=251846=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-251846
 ]

ASF GitHub Bot logged work on BEAM-7442:


Author: ASF GitHub Bot
Created on: 31/May/19 15:35
Start Date: 31/May/19 15:35
Worklog Time Spent: 10m 
  Work Description: mxm commented on pull request #8715: 
[BEAM-7442][BEAM-5650] Read sequentially from bounded sources in 
UnboundedSourceWrapper
URL: https://github.com/apache/beam/pull/8715
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 251846)
Time Spent: 2h 10m  (was: 2h)

> Bounded Reads for Flink Runner fails with OOM
> -
>
> Key: BEAM-7442
> URL: https://issues.apache.org/jira/browse/BEAM-7442
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: Akshay Iyangar
>Assignee: Akshay Iyangar
>Priority: Major
>  Time Spent: 2h 10m
>  Remaining Estimate: 0h
>
> When Flink runner is reading from a bounded source and if the total number of 
> files are huge and the count is more. FlinkRunner throws an OOM error. This 
> is happening because the current implementation doesn't read them 
> sequentially but simultaneously thus causing all of the files to be in memory 
> which quickly breaks the cluster.
> Solution : To wrap `UnboundedReadFromBoundedSource` class by a wrapper to see 
> that when the stream is a bounded source we make it read it sequentially 
> using a queue.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-7459) Add smoke integration tests to Precommit test suites on Python 3

2019-05-31 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7459?focusedWorklogId=251859=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-251859
 ]

ASF GitHub Bot logged work on BEAM-7459:


Author: ASF GitHub Bot
Created on: 31/May/19 16:20
Start Date: 31/May/19 16:20
Worklog Time Spent: 10m 
  Work Description: markflyhigh commented on pull request #8728: BEAM-7459] 
Add Precommit IT test suites for Python 3.
URL: https://github.com/apache/beam/pull/8728
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 251859)
Time Spent: 1h  (was: 50m)

> Add smoke integration tests to Precommit  test suites on Python 3 
> --
>
> Key: BEAM-7459
> URL: https://issues.apache.org/jira/browse/BEAM-7459
> Project: Beam
>  Issue Type: Sub-task
>  Components: testing
>Reporter: Valentyn Tymofieiev
>Assignee: Valentyn Tymofieiev
>Priority: Major
>  Time Spent: 1h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (BEAM-7462) Add Sampled Byte Count Metric to the Java SDK

2019-05-31 Thread Alex Amato (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-7462?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16853197#comment-16853197
 ] 

Alex Amato edited comment on BEAM-7462 at 5/31/19 4:48 PM:
---

Here is a dump of the bundle descriptor, showing the code and pcollectionId 
which triggers the issue.

[^bundle_descriptor_dump.txt][^bundle_descriptor_dump.txt]

 At first I confused this with generatedPCollection-23, which is real. But 
generatedPCollection-203 does not appear in the bundle descriptor. Is it 
produced elsewhere? Oddly this error comes from a pcollection-203 ID which does 
not exist in the descriptor.

 

And here is the error that occurred.

sample with pCollectionId: *generatedPcollection-203* pColl.getName(): 
generatedPcollection-203 Coder: 
*org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder* 

{
 insertId: "2018788043244039965:565032:0:164342" 
 jsonPayload: 

{ exception: "java.lang.ClassCastException: [B cannot be cast to 
org.apache.beam.sdk.util.WindowedValue at 
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:480)
 at 
org.apache.beam.fn.harness.data.ElementCountFnDataReceiver.accept(ElementCountFnDataReceiver.java:113)
 at 
org.apache.beam.fn.harness.data.ElementCountFnDataReceiver.accept(ElementCountFnDataReceiver.java:45)
 at 
org.apache.beam.fn.harness.data.QueueingBeamFnDataClient.drainAndBlock(QueueingBeamFnDataClient.java:108)
 at 
org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:332)
 at 
org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:160)
 at 
org.apache.beam.fn.harness.control.BeamFnControlClient.lambda$processInstructionRequests$0(BeamFnControlClient.java:144)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
at java.lang.Thread.run(Thread.java:748) "  job: 
"2019-05-30_14_57_35-14659722017552460993"  logger: 
"org.apache.beam.fn.harness.control.BeamFnControlClient"  message: "Exception 
while trying to handle InstructionRequest -212"  portability_worker_id: "1"  
thread: "17"  worker: "testpipeline-jenkins-0530-05301457-7v4d-harness-bbc4" }

labels: 

{…}

 
 logName: "projects/apache-beam-testing/logs/dataflow.googleapis.com%2Fworker" 
 receiveTimestamp: "2019-05-30T22:01:04.911873820Z" 
 resource: \{…}

 
 severity: "ERROR" 
 timestamp: "2019-05-30T22:00:55.189Z" 
 }


was (Author: ajam...@google.com):
Here is a dump of the bundle descriptor, showing the code and pcollectionId 
which trigger the issue.

[^bundle_descriptor_dump.txt][^bundle_descriptor_dump.txt]

 

} transforms {
 key: "generatedPtransform-26" value {
 spec {
 *urn: "beam:transform:pardo:v1" payload:* 
"\n\356\004\n\327\004\n\025beam:dofn:javasdk:0.1\032\275\004\202SNAPPY\000\000\000\000\001\000\000\000\001\000\000\002)\347\006\360a\254\355\000\005sr\000?org.apache.beam.sdk.util.AutoValue_DoFnWithExecutionInformationo_\336e?\346.

{ 
\002\000\003L\000\004doFnt\000%Lorg/a\005U\000/\001U8\000\024;xr\0005o^\002\001\001\247^\370\000D9\003c\264\241\274\276\220\002\000\000xpsr\000CoJF\000\031\363\360<.Reshuffle$ViaRandomKey$AssignShardFn\200\363Y\322\244\270\274\224\002\000\001I\000\005shardxr\000#ovZ\DoFn\254D#\006\316\223\275\235\002\001\216\020\000\000\000\000sZ8\000)M\000.1M\330\263\030yf[\300z\265\002\000\002Z\000\tgeneratedL\000\002idt\000\022Ljava/lang/String;xp\001t\000BN\354\000\rZ\250PCollection.:402#b70c45c110743c2bsr\000>NF\000=2\004Au.\200\002\000S>\273\001\210\3425\313\323\267`\232\254\002\000\001L\000\021elementConverterst\000\020L\005\2764util/List;xr\0004zt\000!LBj\000\034\257\325QB\327
 }

\001W%]\fsr\000\037\001Xi$\031\355\030s$Empty\001i0z\270\027\264<\247\236\336\002\000\000xp\022\022beam:env:docker:v1"
 
 } inputs

{ key: "generatedInput-27" value: "generatedPcollection-19" }

outputs

{ *key: "org.apache.beam.sdk.values.PCollection.:402#b70c45c110743c2b" 
value: "*generatedPcollection-23*"* }

 

} transforms {
 key: "-14" value {
 spec

{ *urn: "beam:sink:runner:0.1" payload: "\n\021\022\017localhost:12371"* }

inputs

{ *key: "-13" value: "*generatedPcollection-23*"* }

 

} pcollections {
 key: "*generatedPcollection-23*" value {
 coder_id: "*generatedCoder-22*" is_bounded: BOUNDED windowing_strategy_id: 
"fakeWindowingStrategy-17" 
}

 

} 
} coders {
 *key: "generatedCoder-22"* value {
 spec {
 *urn: "beam:coder:windowed_value:v1"* 
} component_coder_ids: "KvCoder14" component_coder_ids: "Coder1" 
}

 

 

sample with pCollectionId: *generatedPcollection-203* pColl.getName(): 
generatedPcollection-203 Coder: 
*org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder*

 

{
 insertId: "2018788043244039965:565032:0:164342" 
 jsonPayload: 

{ exception: "java.lang.ClassCastException: [B cannot be 

[jira] [Updated] (BEAM-7467) Gearpump Quickstart fails, java.lang.NoClassDefFoundError: com/gs/collections/api/block/procedure/Procedure

2019-05-31 Thread Luke Cwik (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7467?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Cwik updated BEAM-7467:

Description: 
After generating the archetype for the 2.13.0 RC2, the following quick start 
command fails:
{code:java}
mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount 
-Dexec.args="--inputFile=pom.xml --output=counts --runner=GearpumpRunner" 
-Pgearpump-runner{code}
I also tried:
{code:java}
mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount 
-Dexec.args="--inputFile=pom.xml --output=counts --runner=GearpumpRunner" 
-Pgearpump-runner{code}
Log:
{code:java}
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in 
[jar:file:/usr/local/google/home/lcwik/.m2/repository/org/slf4j/slf4j-jdk14/1.7.25/slf4j-jdk14-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in 
[jar:file:/usr/local/google/home/lcwik/.m2/repository/org/slf4j/slf4j-log4j12/1.7.16/slf4j-log4j12-1.7.16.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.JDK14LoggerFactory]
May 31, 2019 9:38:26 AM akka.event.slf4j.Slf4jLogger$$anonfun$receive$1 
applyOrElse
INFO: Slf4jLogger started
May 31, 2019 9:38:26 AM akka.event.slf4j.Slf4jLogger$$anonfun$receive$1 
$anonfun$applyOrElse$3
INFO: Starting remoting
May 31, 2019 9:38:26 AM akka.event.slf4j.Slf4jLogger$$anonfun$receive$1 
$anonfun$applyOrElse$3
INFO: Remoting started; listening on addresses 
:[akka.tcp://client1820912745@127.0.0.1:38849]
May 31, 2019 9:38:26 AM io.gearpump.metrics.Metrics$ createExtension
INFO: Metrics is enabled..., false
May 31, 2019 9:38:26 AM io.gearpump.cluster.master.MasterProxy 
INFO: Master Proxy is started...
[WARNING] 
java.lang.NoClassDefFoundError: com/gs/collections/api/block/procedure/Procedure
 at io.gearpump.streaming.dsl.plan.WindowOp.chain (OP.scala:273)
 at io.gearpump.streaming.dsl.plan.Planner.merge (Planner.scala:86)
 at io.gearpump.streaming.dsl.plan.Planner.$anonfun$optimize$2 
(Planner.scala:71)
 at io.gearpump.streaming.dsl.plan.Planner.$anonfun$optimize$2$adapted 
(Planner.scala:70)
 at scala.collection.mutable.HashSet.foreach (HashSet.scala:77)
 at io.gearpump.streaming.dsl.plan.Planner.$anonfun$optimize$1 
(Planner.scala:70)
 at io.gearpump.streaming.dsl.plan.Planner.$anonfun$optimize$1$adapted 
(Planner.scala:68)
 at scala.collection.immutable.List.foreach (List.scala:388)
 at io.gearpump.streaming.dsl.plan.Planner.optimize (Planner.scala:68)
 at io.gearpump.streaming.dsl.plan.Planner.plan (Planner.scala:48)
 at io.gearpump.streaming.dsl.scalaapi.StreamApp.plan (StreamApp.scala:59)
 at io.gearpump.streaming.dsl.scalaapi.StreamApp$.streamAppToApplication 
(StreamApp.scala:82)
 at io.gearpump.streaming.dsl.javaapi.JavaStreamApp.submit 
(JavaStreamApp.scala:44)
 at org.apache.beam.runners.gearpump.GearpumpRunner.run (GearpumpRunner.java:83)
 at org.apache.beam.runners.gearpump.GearpumpRunner.run (GearpumpRunner.java:44)
 at org.apache.beam.sdk.Pipeline.run (Pipeline.java:313)
 at org.apache.beam.sdk.Pipeline.run (Pipeline.java:299)
 at org.apache.beam.examples.WordCount.runWordCount (WordCount.java:185)
 at org.apache.beam.examples.WordCount.main (WordCount.java:192)
 at sun.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
 at sun.reflect.NativeMethodAccessorImpl.invoke 
(NativeMethodAccessorImpl.java:62)
 at sun.reflect.DelegatingMethodAccessorImpl.invoke 
(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke (Method.java:498)
 at org.codehaus.mojo.exec.ExecJavaMojo$1.run (ExecJavaMojo.java:282)
 at java.lang.Thread.run (Thread.java:748)
Caused by: java.lang.ClassNotFoundException: 
com.gs.collections.api.block.procedure.Procedure
 at java.net.URLClassLoader.findClass (URLClassLoader.java:381)
 at java.lang.ClassLoader.loadClass (ClassLoader.java:424)
 at java.lang.ClassLoader.loadClass (ClassLoader.java:357)
 at io.gearpump.streaming.dsl.plan.WindowOp.chain (OP.scala:273)
 at io.gearpump.streaming.dsl.plan.Planner.merge (Planner.scala:86)
 at io.gearpump.streaming.dsl.plan.Planner.$anonfun$optimize$2 
(Planner.scala:71)
 at io.gearpump.streaming.dsl.plan.Planner.$anonfun$optimize$2$adapted 
(Planner.scala:70)
 at scala.collection.mutable.HashSet.foreach (HashSet.scala:77)
 at io.gearpump.streaming.dsl.plan.Planner.$anonfun$optimize$1 
(Planner.scala:70)
 at io.gearpump.streaming.dsl.plan.Planner.$anonfun$optimize$1$adapted 
(Planner.scala:68)
 at scala.collection.immutable.List.foreach (List.scala:388)
 at io.gearpump.streaming.dsl.plan.Planner.optimize (Planner.scala:68)
 at io.gearpump.streaming.dsl.plan.Planner.plan (Planner.scala:48)
 at io.gearpump.streaming.dsl.scalaapi.StreamApp.plan (StreamApp.scala:59)
 at io.gearpump.streaming.dsl.scalaapi.StreamApp$.streamAppToApplication 
(StreamApp.scala:82)
 at 

[jira] [Work logged] (BEAM-6777) SDK Harness Resilience

2019-05-31 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6777?focusedWorklogId=251884=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-251884
 ]

ASF GitHub Bot logged work on BEAM-6777:


Author: ASF GitHub Bot
Created on: 31/May/19 16:58
Start Date: 31/May/19 16:58
Worklog Time Spent: 10m 
  Work Description: robinyqiu commented on pull request #8681: [BEAM-6777] 
Add enable_health_checker flag for Dataflow FnAPI worker (Python)
URL: https://github.com/apache/beam/pull/8681#discussion_r289470174
 
 

 ##
 File path: sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
 ##
 @@ -177,20 +177,27 @@ def __init__(self, packages, options, 
environment_version, pipeline_url):
 key='major', value=to_json_value(environment_version))])
 # TODO: Use enumerated type instead of strings for job types.
 if job_type.startswith('FNAPI_'):
+  self.debug_options.experiments = self.debug_options.experiments or []
+  debug_options_experiments = self.debug_options.experiments
   runner_harness_override = (
   get_runner_harness_container_image())
-  self.debug_options.experiments = self.debug_options.experiments or []
   if runner_harness_override:
-self.debug_options.experiments.append(
+debug_options_experiments.append(
 'runner_harness_container_image=' + runner_harness_override)
-  # Add use_multiple_sdk_containers flag if its not already present. Do not
+  # Add use_multiple_sdk_containers flag if it's not already present. Do 
not
   # add the flag if 'no_use_multiple_sdk_containers' is present.
   # TODO: Cleanup use_multiple_sdk_containers once we deprecate Python SDK
   # till version 2.4.
-  debug_options_experiments = self.debug_options.experiments
   if ('use_multiple_sdk_containers' not in debug_options_experiments and
   'no_use_multiple_sdk_containers' not in debug_options_experiments):
-self.debug_options.experiments.append('use_multiple_sdk_containers')
+debug_options_experiments.append('use_multiple_sdk_containers')
+  # Add enable_health_checker flag if it's not already present. Do not
+  # add the flag if 'disable_health_checker' is present.
+  # TODO: Cleanup enable_health_checker once we deprecate Python SDK
+  # till version 2.14.
 
 Review comment:
   Good suggestion! Thanks.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 251884)
Time Spent: 3h 20m  (was: 3h 10m)

> SDK Harness Resilience
> --
>
> Key: BEAM-6777
> URL: https://issues.apache.org/jira/browse/BEAM-6777
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-dataflow
>Reporter: Sam Rohde
>Assignee: Yueyang Qiu
>Priority: Major
>  Time Spent: 3h 20m
>  Remaining Estimate: 0h
>
> If the Python SDK Harness crashes in any way (user code exception, OOM, etc) 
> the job will hang and waste resources. The fix is to add a daemon in the SDK 
> Harness and Runner Harness to communicate with Dataflow to restart the VM 
> when stuckness is detected.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-6777) SDK Harness Resilience

2019-05-31 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6777?focusedWorklogId=251886=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-251886
 ]

ASF GitHub Bot logged work on BEAM-6777:


Author: ASF GitHub Bot
Created on: 31/May/19 16:58
Start Date: 31/May/19 16:58
Worklog Time Spent: 10m 
  Work Description: robinyqiu commented on pull request #8681: [BEAM-6777] 
Add enable_health_checker flag for Dataflow FnAPI worker (Python)
URL: https://github.com/apache/beam/pull/8681#discussion_r289470484
 
 

 ##
 File path: sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
 ##
 @@ -500,6 +500,34 @@ def test_experiment_use_multiple_sdk_containers(self):
 self.assertNotIn(
 "use_multiple_sdk_containers", environment.proto.experiments)
 
+  def test_experiment_enable_health_checker(self):
+pipeline_options = PipelineOptions(
+['--project', 'test_project', '--job_name', 'test_job_name',
+ '--temp_location', 'gs://test-location/temp',
+ '--experiments', 'beam_fn_api'])
+environment = apiclient.Environment(
+[], pipeline_options, 1, FAKE_PIPELINE_URL)
+self.assertIn("enable_health_checker", environment.proto.experiments)
 
 Review comment:
   Done. Thanks for paying attention to style issue.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 251886)
Time Spent: 3h 40m  (was: 3.5h)

> SDK Harness Resilience
> --
>
> Key: BEAM-6777
> URL: https://issues.apache.org/jira/browse/BEAM-6777
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-dataflow
>Reporter: Sam Rohde
>Assignee: Yueyang Qiu
>Priority: Major
>  Time Spent: 3h 40m
>  Remaining Estimate: 0h
>
> If the Python SDK Harness crashes in any way (user code exception, OOM, etc) 
> the job will hang and waste resources. The fix is to add a daemon in the SDK 
> Harness and Runner Harness to communicate with Dataflow to restart the VM 
> when stuckness is detected.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-6777) SDK Harness Resilience

2019-05-31 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6777?focusedWorklogId=251885=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-251885
 ]

ASF GitHub Bot logged work on BEAM-6777:


Author: ASF GitHub Bot
Created on: 31/May/19 16:58
Start Date: 31/May/19 16:58
Worklog Time Spent: 10m 
  Work Description: robinyqiu commented on pull request #8681: [BEAM-6777] 
Add enable_health_checker flag for Dataflow FnAPI worker (Python)
URL: https://github.com/apache/beam/pull/8681#discussion_r289470521
 
 

 ##
 File path: sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
 ##
 @@ -500,6 +500,34 @@ def test_experiment_use_multiple_sdk_containers(self):
 self.assertNotIn(
 "use_multiple_sdk_containers", environment.proto.experiments)
 
+  def test_experiment_enable_health_checker(self):
+pipeline_options = PipelineOptions(
+['--project', 'test_project', '--job_name', 'test_job_name',
+ '--temp_location', 'gs://test-location/temp',
+ '--experiments', 'beam_fn_api'])
+environment = apiclient.Environment(
+[], pipeline_options, 1, FAKE_PIPELINE_URL)
+self.assertIn("enable_health_checker", environment.proto.experiments)
+
+pipeline_options = PipelineOptions(
+['--project', 'test_project', '--job_name', 'test_job_name',
+ '--temp_location', 'gs://test-location/temp',
+ '--experiments', 'beam_fn_api',
+ '--experiments', 'enable_health_checker'])
+environment = apiclient.Environment(
+[], pipeline_options, 1, FAKE_PIPELINE_URL)
+self.assertIn("enable_health_checker", environment.proto.experiments)
+
+pipeline_options = PipelineOptions(
+['--project', 'test_project', '--job_name', 'test_job_name',
+ '--temp_location', 'gs://test-location/temp',
+ '--experiments', 'beam_fn_api',
+ '--experiments', 'disable_health_checker'])
+environment = apiclient.Environment(
+[], pipeline_options, 1, FAKE_PIPELINE_URL)
+self.assertNotIn(
+"enable_health_checker", environment.proto.experiments)
 
 Review comment:
   Done.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 251885)
Time Spent: 3.5h  (was: 3h 20m)

> SDK Harness Resilience
> --
>
> Key: BEAM-6777
> URL: https://issues.apache.org/jira/browse/BEAM-6777
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-dataflow
>Reporter: Sam Rohde
>Assignee: Yueyang Qiu
>Priority: Major
>  Time Spent: 3.5h
>  Remaining Estimate: 0h
>
> If the Python SDK Harness crashes in any way (user code exception, OOM, etc) 
> the job will hang and waste resources. The fix is to add a daemon in the SDK 
> Harness and Runner Harness to communicate with Dataflow to restart the VM 
> when stuckness is detected.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-7464) Python transform catalog

2019-05-31 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7464?focusedWorklogId=251890=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-251890
 ]

ASF GitHub Bot logged work on BEAM-7464:


Author: ASF GitHub Bot
Created on: 31/May/19 17:11
Start Date: 31/May/19 17:11
Worklog Time Spent: 10m 
  Work Description: melap commented on pull request #8534: [BEAM-7464] 
Transform catalog for Python
URL: https://github.com/apache/beam/pull/8534
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 251890)
Time Spent: 20m  (was: 10m)

> Python transform catalog
> 
>
> Key: BEAM-7464
> URL: https://issues.apache.org/jira/browse/BEAM-7464
> Project: Beam
>  Issue Type: Improvement
>  Components: website
>Reporter: Rose Nguyen
>Assignee: Rose Nguyen
>Priority: Minor
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> Create catalog of core transforms (Python)
> -Links to pydocs
> -Brief description
> -Related transforms
> -Links to programming guide
> -Examples section to integrate Colab notebooks



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-7467) Gearpump Quickstart fails, java.lang.NoClassDefFoundError: com/gs/collections/api/block/procedure/Procedure

2019-05-31 Thread Luke Cwik (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-7467?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16853236#comment-16853236
 ] 

Luke Cwik commented on BEAM-7467:
-

I tried the quickstart for gearpump for the other Apache Beam versions:

2.8.0: failed due to timeout

2.9.0: failed due to timeout

2.10.0: failed due to missing gs-collections dep

2.11.0: failed due to missing gs-collections dep

2.12.0: failed due to missing gs-collections dep

> Gearpump Quickstart fails, java.lang.NoClassDefFoundError: 
> com/gs/collections/api/block/procedure/Procedure
> ---
>
> Key: BEAM-7467
> URL: https://issues.apache.org/jira/browse/BEAM-7467
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-gearpump
>Affects Versions: 2.13.0
>Reporter: Luke Cwik
>Assignee: Manu Zhang
>Priority: Blocker
>
> After generating the archetype for the 2.13.0 RC2, the following quick start 
> command fails:
> {code:java}
> mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount 
> -Dexec.args="--inputFile=pom.xml --output=counts --runner=GearpumpRunner" 
> -Pgearpump-runner{code}
> I also tried:
> {code:java}
> mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount 
> -Dexec.args="--inputFile=pom.xml --output=counts --runner=GearpumpRunner" 
> -Pgearpump-runner{code}
> Log:
> {code:java}
> SLF4J: Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in 
> [jar:file:/usr/local/google/home/lcwik/.m2/repository/org/slf4j/slf4j-jdk14/1.7.25/slf4j-jdk14-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/usr/local/google/home/lcwik/.m2/repository/org/slf4j/slf4j-log4j12/1.7.16/slf4j-log4j12-1.7.16.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an 
> explanation.
> SLF4J: Actual binding is of type [org.slf4j.impl.JDK14LoggerFactory]
> May 31, 2019 9:38:26 AM akka.event.slf4j.Slf4jLogger$$anonfun$receive$1 
> applyOrElse
> INFO: Slf4jLogger started
> May 31, 2019 9:38:26 AM akka.event.slf4j.Slf4jLogger$$anonfun$receive$1 
> $anonfun$applyOrElse$3
> INFO: Starting remoting
> May 31, 2019 9:38:26 AM akka.event.slf4j.Slf4jLogger$$anonfun$receive$1 
> $anonfun$applyOrElse$3
> INFO: Remoting started; listening on addresses 
> :[akka.tcp://client1820912745@127.0.0.1:38849]
> May 31, 2019 9:38:26 AM io.gearpump.metrics.Metrics$ createExtension
> INFO: Metrics is enabled..., false
> May 31, 2019 9:38:26 AM io.gearpump.cluster.master.MasterProxy 
> INFO: Master Proxy is started...
> [WARNING] 
> java.lang.NoClassDefFoundError: 
> com/gs/collections/api/block/procedure/Procedure
>  at io.gearpump.streaming.dsl.plan.WindowOp.chain (OP.scala:273)
>  at io.gearpump.streaming.dsl.plan.Planner.merge (Planner.scala:86)
>  at io.gearpump.streaming.dsl.plan.Planner.$anonfun$optimize$2 
> (Planner.scala:71)
>  at io.gearpump.streaming.dsl.plan.Planner.$anonfun$optimize$2$adapted 
> (Planner.scala:70)
>  at scala.collection.mutable.HashSet.foreach (HashSet.scala:77)
>  at io.gearpump.streaming.dsl.plan.Planner.$anonfun$optimize$1 
> (Planner.scala:70)
>  at io.gearpump.streaming.dsl.plan.Planner.$anonfun$optimize$1$adapted 
> (Planner.scala:68)
>  at scala.collection.immutable.List.foreach (List.scala:388)
>  at io.gearpump.streaming.dsl.plan.Planner.optimize (Planner.scala:68)
>  at io.gearpump.streaming.dsl.plan.Planner.plan (Planner.scala:48)
>  at io.gearpump.streaming.dsl.scalaapi.StreamApp.plan (StreamApp.scala:59)
>  at io.gearpump.streaming.dsl.scalaapi.StreamApp$.streamAppToApplication 
> (StreamApp.scala:82)
>  at io.gearpump.streaming.dsl.javaapi.JavaStreamApp.submit 
> (JavaStreamApp.scala:44)
>  at org.apache.beam.runners.gearpump.GearpumpRunner.run 
> (GearpumpRunner.java:83)
>  at org.apache.beam.runners.gearpump.GearpumpRunner.run 
> (GearpumpRunner.java:44)
>  at org.apache.beam.sdk.Pipeline.run (Pipeline.java:313)
>  at org.apache.beam.sdk.Pipeline.run (Pipeline.java:299)
>  at org.apache.beam.examples.WordCount.runWordCount (WordCount.java:185)
>  at org.apache.beam.examples.WordCount.main (WordCount.java:192)
>  at sun.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
>  at sun.reflect.NativeMethodAccessorImpl.invoke 
> (NativeMethodAccessorImpl.java:62)
>  at sun.reflect.DelegatingMethodAccessorImpl.invoke 
> (DelegatingMethodAccessorImpl.java:43)
>  at java.lang.reflect.Method.invoke (Method.java:498)
>  at org.codehaus.mojo.exec.ExecJavaMojo$1.run (ExecJavaMojo.java:282)
>  at java.lang.Thread.run (Thread.java:748)
> Caused by: java.lang.ClassNotFoundException: 
> com.gs.collections.api.block.procedure.Procedure
>  at java.net.URLClassLoader.findClass (URLClassLoader.java:381)
>  at 

[jira] [Created] (BEAM-7468) Add Python global window side input pattern

2019-05-31 Thread Cyrus Maden (JIRA)
Cyrus Maden created BEAM-7468:
-

 Summary: Add Python global window side input pattern
 Key: BEAM-7468
 URL: https://issues.apache.org/jira/browse/BEAM-7468
 Project: Beam
  Issue Type: Improvement
  Components: website
Reporter: Cyrus Maden
Assignee: Reza ardeshir rokni


Tracker for documenting a Python version of the side input pattern "Using 
global window side inputs in non-global windows"

[https://beam.apache.org|https://beam.apache.org/documentation/pipelines/design-your-pipeline/]
 
[/documentation/patterns/side-input-patterns/#using-global-window-side-inputs-in-non-global-windows|http://localhost:4000/documentation/patterns/side-input-patterns/]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-7416) MetricsReader in Python reports all available metrics.

2019-05-31 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7416?focusedWorklogId=251825=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-251825
 ]

ASF GitHub Bot logged work on BEAM-7416:


Author: ASF GitHub Bot
Created on: 31/May/19 15:07
Start Date: 31/May/19 15:07
Worklog Time Spent: 10m 
  Work Description: kkucharc commented on pull request #8731: [BEAM-7416] 
MetricsReader in Python should save only supported metrics
URL: https://github.com/apache/beam/pull/8731
 
 
   Fix that currently MetricsReader is saving all metrics, not only those which 
are required for load tests (if happens that user collects some other metrics 
too).
   
   
   
   Thank you for your contribution! Follow this checklist to help us 
incorporate your contribution quickly and easily:
   
- [ ] [**Choose 
reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and 
mention them in a comment (`R: @username`).
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
 | --- | --- | --- | --- | --- | ---
   Java | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)
   Python | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Python3_Verify/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python3_Verify/lastCompletedBuild/)
 | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)
  [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/)
 | --- | --- | ---
   
   Pre-Commit Tests Status (on master branch)
   

[jira] [Resolved] (BEAM-5650) Timeout exceptions while reading a lot of files from a bounded source like S3 with Flink runner

2019-05-31 Thread Maximilian Michels (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5650?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels resolved BEAM-5650.
--
   Resolution: Duplicate
Fix Version/s: 2.14.0

> Timeout exceptions while reading a lot of files from a bounded source like S3 
> with Flink runner
> ---
>
> Key: BEAM-5650
> URL: https://issues.apache.org/jira/browse/BEAM-5650
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core, runner-flink
>Reporter: Ankit Jhalaria
>Assignee: Ankit Jhalaria
>Priority: Major
> Fix For: 2.14.0
>
>  Time Spent: 2h 50m
>  Remaining Estimate: 0h
>
> * Using TextIO, I was trying to read around 850 files.
>  * Getting this exception while using FlinkRunner
>  
> {code:java}
> //Caused by: org.apache.flink.runtime.client.JobExecutionException: 
> java.io.IOException: com.amazonaws.SdkClientException: Unable to execute HTTP 
> request: Timeout waiting for connection from pool at 
> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:625)
>  at 
> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
>  at 
> org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.executePipeline(FlinkPipelineExecutionEnvironment.java:175)
>  at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:115) ... 
> 28 more Caused by: java.io.IOException: com.amazonaws.SdkClientException: 
> Unable to execute HTTP request: Timeout waiting for connection from pool at 
> org.apache.beam.sdk.io.aws.s3.S3ReadableSeekableByteChannel.read(S3ReadableSeekableByteChannel.java:91)
>  at 
> org.apache.beam.sdk.io.CompressedSource$CompressedReader$CountingChannel.read(CompressedSource.java:382)
>  at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:65) at 
> sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:109) at 
> sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103) at 
> java.io.FilterInputStream.read(FilterInputStream.java:133) at 
> java.io.PushbackInputStream.read(PushbackInputStream.java:186) at 
> org.apache.beam.repackaged.beam_sdks_java_core.com.google.common.io.ByteStreams.read(ByteStreams.java:859)
>  at 
> org.apache.beam.sdk.io.Compression$3.readDecompressed(Compression.java:81) at 
> org.apache.beam.sdk.io.CompressedSource$CompressionMode.createDecompressingChannel(CompressedSource.java:110)
>  at 
> org.apache.beam.sdk.io.CompressedSource$CompressedReader.startReading(CompressedSource.java:417)
>  at 
> org.apache.beam.sdk.io.FileBasedSource$FileBasedReader.startImpl(FileBasedSource.java:476)
>  at 
> org.apache.beam.sdk.io.OffsetBasedSource$OffsetBasedReader.start(OffsetBasedSource.java:249)
>  at 
> org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter$ResidualSource.advance(UnboundedReadFromBoundedSource.java:456)
>  at 
> org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter$ResidualSource.access$300(UnboundedReadFromBoundedSource.java:434)
>  at 
> org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter$Reader.advance(UnboundedReadFromBoundedSource.java:286)
>  at 
> org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter$Reader.start(UnboundedReadFromBoundedSource.java:279)
>  at 
> org.apache.beam.runners.flink.metrics.ReaderInvocationUtil.invokeStart(ReaderInvocationUtil.java:51)
>  at 
> org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper.run(UnboundedSourceWrapper.java:250)
>  at 
> org.apache.beam.runners.flink.FlinkStreamingTransformTranslators$UnboundedSourceWrapperNoValueWithRecordId.run(FlinkStreamingTransformTranslators.java:1299)
>  at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
>  at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:56)
>  at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
>  at 
> org.apache.flink.streaming.runtime.tasks.StoppableSourceStreamTask.run(StoppableSourceStreamTask.java:45)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) at 
> java.lang.Thread.run(Thread.java:748) Caused by: 
> com.amazonaws.SdkClientException: Unable to execute HTTP request: Timeout 
> waiting for connection from pool at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleRetryableException(AmazonHttpClient.java:1116)
>  at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1066)
>  

[jira] [Work logged] (BEAM-7464) Python transform catalog

2019-05-31 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7464?focusedWorklogId=251852=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-251852
 ]

ASF GitHub Bot logged work on BEAM-7464:


Author: ASF GitHub Bot
Created on: 31/May/19 16:08
Start Date: 31/May/19 16:08
Worklog Time Spent: 10m 
  Work Description: rosetn commented on issue #8534: [BEAM-7464] Transform 
catalog for Python
URL: https://github.com/apache/beam/pull/8534#issuecomment-497767241
 
 
   Done, thank you!
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 251852)
Time Spent: 10m
Remaining Estimate: 0h

> Python transform catalog
> 
>
> Key: BEAM-7464
> URL: https://issues.apache.org/jira/browse/BEAM-7464
> Project: Beam
>  Issue Type: Improvement
>  Components: website
>Reporter: Rose Nguyen
>Assignee: Rose Nguyen
>Priority: Minor
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Create catalog of core transforms (Python)
> -Links to pydocs
> -Brief description
> -Related transforms
> -Links to programming guide
> -Examples section to integrate Colab notebooks



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (BEAM-7467) Gearpump Quickstart fails, java.lang.NoClassDefFoundError: com/gs/collections/api/block/procedure/Procedure

2019-05-31 Thread Luke Cwik (JIRA)
Luke Cwik created BEAM-7467:
---

 Summary: Gearpump Quickstart fails, 
java.lang.NoClassDefFoundError: com/gs/collections/api/block/procedure/Procedure
 Key: BEAM-7467
 URL: https://issues.apache.org/jira/browse/BEAM-7467
 Project: Beam
  Issue Type: Improvement
  Components: runner-gearpump
Affects Versions: 2.13.0
Reporter: Luke Cwik
Assignee: Ankur Goenka


After generating the archetype for the 2.13.0 RC2, the following quick start 
command fails:
{code:java}
mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount 
-Dexec.args="--inputFile=pom.xml --output=counts --runner=GearpumpRunner" 
-Pgearpump-runner{code}
Log:
{code:java}
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in 
[jar:file:/usr/local/google/home/lcwik/.m2/repository/org/slf4j/slf4j-jdk14/1.7.25/slf4j-jdk14-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in 
[jar:file:/usr/local/google/home/lcwik/.m2/repository/org/slf4j/slf4j-log4j12/1.7.16/slf4j-log4j12-1.7.16.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.JDK14LoggerFactory]
May 31, 2019 9:38:26 AM akka.event.slf4j.Slf4jLogger$$anonfun$receive$1 
applyOrElse
INFO: Slf4jLogger started
May 31, 2019 9:38:26 AM akka.event.slf4j.Slf4jLogger$$anonfun$receive$1 
$anonfun$applyOrElse$3
INFO: Starting remoting
May 31, 2019 9:38:26 AM akka.event.slf4j.Slf4jLogger$$anonfun$receive$1 
$anonfun$applyOrElse$3
INFO: Remoting started; listening on addresses 
:[akka.tcp://client1820912745@127.0.0.1:38849]
May 31, 2019 9:38:26 AM io.gearpump.metrics.Metrics$ createExtension
INFO: Metrics is enabled..., false
May 31, 2019 9:38:26 AM io.gearpump.cluster.master.MasterProxy 
INFO: Master Proxy is started...
[WARNING] 
java.lang.NoClassDefFoundError: com/gs/collections/api/block/procedure/Procedure
 at io.gearpump.streaming.dsl.plan.WindowOp.chain (OP.scala:273)
 at io.gearpump.streaming.dsl.plan.Planner.merge (Planner.scala:86)
 at io.gearpump.streaming.dsl.plan.Planner.$anonfun$optimize$2 
(Planner.scala:71)
 at io.gearpump.streaming.dsl.plan.Planner.$anonfun$optimize$2$adapted 
(Planner.scala:70)
 at scala.collection.mutable.HashSet.foreach (HashSet.scala:77)
 at io.gearpump.streaming.dsl.plan.Planner.$anonfun$optimize$1 
(Planner.scala:70)
 at io.gearpump.streaming.dsl.plan.Planner.$anonfun$optimize$1$adapted 
(Planner.scala:68)
 at scala.collection.immutable.List.foreach (List.scala:388)
 at io.gearpump.streaming.dsl.plan.Planner.optimize (Planner.scala:68)
 at io.gearpump.streaming.dsl.plan.Planner.plan (Planner.scala:48)
 at io.gearpump.streaming.dsl.scalaapi.StreamApp.plan (StreamApp.scala:59)
 at io.gearpump.streaming.dsl.scalaapi.StreamApp$.streamAppToApplication 
(StreamApp.scala:82)
 at io.gearpump.streaming.dsl.javaapi.JavaStreamApp.submit 
(JavaStreamApp.scala:44)
 at org.apache.beam.runners.gearpump.GearpumpRunner.run (GearpumpRunner.java:83)
 at org.apache.beam.runners.gearpump.GearpumpRunner.run (GearpumpRunner.java:44)
 at org.apache.beam.sdk.Pipeline.run (Pipeline.java:313)
 at org.apache.beam.sdk.Pipeline.run (Pipeline.java:299)
 at org.apache.beam.examples.WordCount.runWordCount (WordCount.java:185)
 at org.apache.beam.examples.WordCount.main (WordCount.java:192)
 at sun.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
 at sun.reflect.NativeMethodAccessorImpl.invoke 
(NativeMethodAccessorImpl.java:62)
 at sun.reflect.DelegatingMethodAccessorImpl.invoke 
(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke (Method.java:498)
 at org.codehaus.mojo.exec.ExecJavaMojo$1.run (ExecJavaMojo.java:282)
 at java.lang.Thread.run (Thread.java:748)
Caused by: java.lang.ClassNotFoundException: 
com.gs.collections.api.block.procedure.Procedure
 at java.net.URLClassLoader.findClass (URLClassLoader.java:381)
 at java.lang.ClassLoader.loadClass (ClassLoader.java:424)
 at java.lang.ClassLoader.loadClass (ClassLoader.java:357)
 at io.gearpump.streaming.dsl.plan.WindowOp.chain (OP.scala:273)
 at io.gearpump.streaming.dsl.plan.Planner.merge (Planner.scala:86)
 at io.gearpump.streaming.dsl.plan.Planner.$anonfun$optimize$2 
(Planner.scala:71)
 at io.gearpump.streaming.dsl.plan.Planner.$anonfun$optimize$2$adapted 
(Planner.scala:70)
 at scala.collection.mutable.HashSet.foreach (HashSet.scala:77)
 at io.gearpump.streaming.dsl.plan.Planner.$anonfun$optimize$1 
(Planner.scala:70)
 at io.gearpump.streaming.dsl.plan.Planner.$anonfun$optimize$1$adapted 
(Planner.scala:68)
 at scala.collection.immutable.List.foreach (List.scala:388)
 at io.gearpump.streaming.dsl.plan.Planner.optimize (Planner.scala:68)
 at io.gearpump.streaming.dsl.plan.Planner.plan (Planner.scala:48)
 at io.gearpump.streaming.dsl.scalaapi.StreamApp.plan (StreamApp.scala:59)
 at 

[jira] [Commented] (BEAM-7467) Gearpump Quickstart fails, java.lang.NoClassDefFoundError: com/gs/collections/api/block/procedure/Procedure

2019-05-31 Thread Luke Cwik (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-7467?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16853221#comment-16853221
 ] 

Luke Cwik commented on BEAM-7467:
-

Based upon 
[https://github.com/gearpump/gearpump/blob/ddd926f374d5b3eb5c5a67eac4501998d0b45b9a/project/Dependencies.scala#L37,]
 I tried adding
{code:java}

com.goldmansachs
gs-collections
6.2.0
runtime

{code}
to the gearpump profile and then the pipeline fails due to:
{code:java}
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in 
[jar:file:/usr/local/google/home/lcwik/.m2/repository/org/slf4j/slf4j-jdk14/1.7.25/slf4j-jdk14-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in 
[jar:file:/usr/local/google/home/lcwik/.m2/repository/org/slf4j/slf4j-log4j12/1.7.16/slf4j-log4j12-1.7.16.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.JDK14LoggerFactory]
May 31, 2019 10:02:16 AM akka.event.slf4j.Slf4jLogger$$anonfun$receive$1 
applyOrElse
INFO: Slf4jLogger started
May 31, 2019 10:02:16 AM akka.event.slf4j.Slf4jLogger$$anonfun$receive$1 
$anonfun$applyOrElse$3
INFO: Starting remoting
May 31, 2019 10:02:16 AM akka.event.slf4j.Slf4jLogger$$anonfun$receive$1 
$anonfun$applyOrElse$3
INFO: Remoting started; listening on addresses 
:[akka.tcp://client1356788836@127.0.0.1:44201]
May 31, 2019 10:02:17 AM io.gearpump.metrics.Metrics$ createExtension
INFO: Metrics is enabled..., false
May 31, 2019 10:02:17 AM io.gearpump.cluster.master.MasterProxy 
INFO: Master Proxy is started...
May 31, 2019 10:02:17 AM io.gearpump.cluster.client.ClientContext 
master$lzycompute
INFO: Creating master proxy 
Actor[akka://client1356788836/user/masterproxyclient1356788836#2058731731] for 
master list: ArrayBuffer()
May 31, 2019 10:02:17 AM io.gearpump.cluster.master.MasterProxy 
INFO: Master Proxy is started...
May 31, 2019 10:02:17 AM 
io.gearpump.cluster.master.MasterProxy$$anonfun$establishing$1 applyOrElse
INFO: Stashing SubmitApplication
[WARNING] 
akka.pattern.AskTimeoutException: Ask timed out on 
[Actor[akka://client1356788836/user/masterproxyclient1356788836#2058731731]] 
after [9 ms]. Message of type 
[io.gearpump.cluster.ClientToMaster$SubmitApplication]. A typical reason for 
`AskTimeoutException` is that the recipient actor didn't send a reply.
 at akka.pattern.PromiseActorRef$.$anonfun$defaultOnTimeout$1 
(AskSupport.scala:635)
 at akka.pattern.PromiseActorRef$.$anonfun$apply$1 (AskSupport.scala:650)
 at akka.actor.Scheduler$$anon$4.run (Scheduler.scala:205)
 at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute 
(Future.scala:870)
 at scala.concurrent.BatchingExecutor.execute (BatchingExecutor.scala:109)
 at scala.concurrent.BatchingExecutor.execute$ (BatchingExecutor.scala:103)
 at scala.concurrent.Future$InternalCallbackExecutor$.execute (Future.scala:868)
 at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask 
(LightArrayRevolverScheduler.scala:328)
 at akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1 
(LightArrayRevolverScheduler.scala:279)
 at akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick 
(LightArrayRevolverScheduler.scala:283)
 at akka.actor.LightArrayRevolverScheduler$$anon$4.run 
(LightArrayRevolverScheduler.scala:235)
 at java.lang.Thread.run (Thread.java:748){code}

> Gearpump Quickstart fails, java.lang.NoClassDefFoundError: 
> com/gs/collections/api/block/procedure/Procedure
> ---
>
> Key: BEAM-7467
> URL: https://issues.apache.org/jira/browse/BEAM-7467
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-gearpump
>Affects Versions: 2.13.0
>Reporter: Luke Cwik
>Assignee: Ankur Goenka
>Priority: Blocker
>
> After generating the archetype for the 2.13.0 RC2, the following quick start 
> command fails:
> {code:java}
> mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount 
> -Dexec.args="--inputFile=pom.xml --output=counts --runner=GearpumpRunner" 
> -Pgearpump-runner{code}
> I also tried:
> {code:java}
> mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount 
> -Dexec.args="--inputFile=pom.xml --output=counts --runner=GearpumpRunner" 
> -Pgearpump-runner{code}
> Log:
> {code:java}
> SLF4J: Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in 
> [jar:file:/usr/local/google/home/lcwik/.m2/repository/org/slf4j/slf4j-jdk14/1.7.25/slf4j-jdk14-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/usr/local/google/home/lcwik/.m2/repository/org/slf4j/slf4j-log4j12/1.7.16/slf4j-log4j12-1.7.16.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: See 

[jira] [Resolved] (BEAM-7357) Kinesis IO.write throws LimitExceededException

2019-05-31 Thread Alexey Romanenko (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7357?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexey Romanenko resolved BEAM-7357.

   Resolution: Fixed
Fix Version/s: 2.14.0

> Kinesis IO.write throws LimitExceededException
> --
>
> Key: BEAM-7357
> URL: https://issues.apache.org/jira/browse/BEAM-7357
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-kinesis
>Affects Versions: 2.11.0
>Reporter: Brachi Packter
>Assignee: Alexey Romanenko
>Priority: Major
> Fix For: 2.14.0
>
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> I used Kinesis IO to write to kinesis. I get very quickly many exceptions 
> like:
> [shard_map.cc:150] Shard map update for stream "***" failed. Code: 
> LimitExceededException Message: Rate exceeded for stream *** under account 
> ***; retrying in ..
> Also, I see many exceptions like:
> Caused by: java.lang.IllegalArgumentException: Stream ** does not exist at 
> org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument(Preconditions.java:191)
>  at 
> org.apache.beam.sdk.io.kinesis.KinesisIO$Write$KinesisWriterFn.setup(KinesisIO.java:515)
> I'm sure this stream exists because I can see some data from my pipeline that 
> was successfully ingested to it.
>  
> Here is my code:
>  
>  
> {code:java}
> .apply(KinesisIO.write()
>        .withStreamName("**")
>        .withPartitioner(new KinesisPartitioner() {
>                        @Override
>                         public String getPartitionKey(byte[] value) {
>                                         return UUID.randomUUID().toString()
>                          }
>                         @Override
>                         public String getExplicitHashKey(byte[] value) {
>                                         return null;
>                         }
>        })
>.withAWSClientsProvider("**","***",Regions.US_EAST_1));{code}
>  
> I tried to not use the Kinesis IO. and everything works well, I can't figure 
> out what went wrong.
> I tried using the same API as the library did.
>  
> {code:java}
> .apply(
>  ParDo.of(new DoFn() {
>  private transient IKinesisProducer inlineProducer;
>  @Setup
>  public void setup(){
>  KinesisProducerConfiguration config =   
> KinesisProducerConfiguration.fromProperties(new Properties());
>  config.setRegion(Regions.US_EAST_1.getName());
>  config.setCredentialsProvider(new AWSStaticCredentialsProvider(new 
> BasicAWSCredentials("***", "***")));
>  inlineProducer = new KinesisProducer(config);
>  }
>  @ProcessElement
>  public void processElement(ProcessContext c) throws Exception {
> ByteBuffer data = ByteBuffer.wrap(c.element());
> String partitionKey =UUID.randomUUID().toString();
> ListenableFuture f =
> getProducer().addUserRecord("***", partitionKey, data);
>Futures.addCallback(f, new UserRecordResultFutureCallback());
> }
>  class UserRecordResultFutureCallback implements 
> FutureCallback {
>  @Override
>  public void onFailure(Throwable cause) {
>throw new RuntimeException("failed produce:"+cause);
>  }
>  @Override
>  public void onSuccess(UserRecordResult result) {
>  }
>  }
>  })
>  );
>  
> {code}
>  
> Any idea what I did wrong? or what the error in the KinesisIO?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (BEAM-7462) Add Sampled Byte Count Metric to the Java SDK

2019-05-31 Thread Alex Amato (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7462?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alex Amato updated BEAM-7462:
-
Attachment: bundle_descriptor_dump.txt

> Add Sampled Byte Count Metric to the Java SDK
> -
>
> Key: BEAM-7462
> URL: https://issues.apache.org/jira/browse/BEAM-7462
> Project: Beam
>  Issue Type: New Feature
>  Components: java-fn-execution
>Reporter: Alex Amato
>Priority: Major
> Attachments: bundle_descriptor_dump.txt
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-3759) Add support for PaneInfo descriptor in Python SDK

2019-05-31 Thread Tanay Tummalapalli (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-3759?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16853152#comment-16853152
 ] 

Tanay Tummalapalli commented on BEAM-3759:
--

Thanks [~ccy]! I'd love to work on adding full support for PaneInfo after my 
GSoC project. 

> Add support for PaneInfo descriptor in Python SDK
> -
>
> Key: BEAM-3759
> URL: https://issues.apache.org/jira/browse/BEAM-3759
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-py-core
>Affects Versions: 2.3.0
>Reporter: Charles Chen
>Priority: Major
>  Time Spent: 3h 40m
>  Remaining Estimate: 0h
>
> The PaneInfo descriptor allows a user to determine which particular 
> triggering emitted a value.  This allows the user to differentiate between 
> speculative (early), on-time (at end of window) and late value emissions 
> coming out of a GroupByKey.  We should add support for this feature in the 
> Python SDK.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-7357) Kinesis IO.write throws LimitExceededException

2019-05-31 Thread Alexey Romanenko (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-7357?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16853173#comment-16853173
 ] 

Alexey Romanenko commented on BEAM-7357:


[~brachi_packter] The fix was merged into master. Would you be able to test it 
against next snapshot version of jar?

> Kinesis IO.write throws LimitExceededException
> --
>
> Key: BEAM-7357
> URL: https://issues.apache.org/jira/browse/BEAM-7357
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-kinesis
>Affects Versions: 2.11.0
>Reporter: Brachi Packter
>Assignee: Alexey Romanenko
>Priority: Major
> Fix For: 2.14.0
>
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> I used Kinesis IO to write to kinesis. I get very quickly many exceptions 
> like:
> [shard_map.cc:150] Shard map update for stream "***" failed. Code: 
> LimitExceededException Message: Rate exceeded for stream *** under account 
> ***; retrying in ..
> Also, I see many exceptions like:
> Caused by: java.lang.IllegalArgumentException: Stream ** does not exist at 
> org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument(Preconditions.java:191)
>  at 
> org.apache.beam.sdk.io.kinesis.KinesisIO$Write$KinesisWriterFn.setup(KinesisIO.java:515)
> I'm sure this stream exists because I can see some data from my pipeline that 
> was successfully ingested to it.
>  
> Here is my code:
>  
>  
> {code:java}
> .apply(KinesisIO.write()
>        .withStreamName("**")
>        .withPartitioner(new KinesisPartitioner() {
>                        @Override
>                         public String getPartitionKey(byte[] value) {
>                                         return UUID.randomUUID().toString()
>                          }
>                         @Override
>                         public String getExplicitHashKey(byte[] value) {
>                                         return null;
>                         }
>        })
>.withAWSClientsProvider("**","***",Regions.US_EAST_1));{code}
>  
> I tried to not use the Kinesis IO. and everything works well, I can't figure 
> out what went wrong.
> I tried using the same API as the library did.
>  
> {code:java}
> .apply(
>  ParDo.of(new DoFn() {
>  private transient IKinesisProducer inlineProducer;
>  @Setup
>  public void setup(){
>  KinesisProducerConfiguration config =   
> KinesisProducerConfiguration.fromProperties(new Properties());
>  config.setRegion(Regions.US_EAST_1.getName());
>  config.setCredentialsProvider(new AWSStaticCredentialsProvider(new 
> BasicAWSCredentials("***", "***")));
>  inlineProducer = new KinesisProducer(config);
>  }
>  @ProcessElement
>  public void processElement(ProcessContext c) throws Exception {
> ByteBuffer data = ByteBuffer.wrap(c.element());
> String partitionKey =UUID.randomUUID().toString();
> ListenableFuture f =
> getProducer().addUserRecord("***", partitionKey, data);
>Futures.addCallback(f, new UserRecordResultFutureCallback());
> }
>  class UserRecordResultFutureCallback implements 
> FutureCallback {
>  @Override
>  public void onFailure(Throwable cause) {
>throw new RuntimeException("failed produce:"+cause);
>  }
>  @Override
>  public void onSuccess(UserRecordResult result) {
>  }
>  }
>  })
>  );
>  
> {code}
>  
> Any idea what I did wrong? or what the error in the KinesisIO?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (BEAM-7466) Remove the enable_health_checker flag after Python SDK 2.13 becomes unsupported

2019-05-31 Thread Yueyang Qiu (JIRA)
Yueyang Qiu created BEAM-7466:
-

 Summary: Remove the enable_health_checker flag after Python SDK 
2.13 becomes unsupported
 Key: BEAM-7466
 URL: https://issues.apache.org/jira/browse/BEAM-7466
 Project: Beam
  Issue Type: Improvement
  Components: runner-dataflow
Reporter: Yueyang Qiu
Assignee: Yueyang Qiu


[PR8681|https://github.com/apache/beam/pull/8681] introduced a new flag 
`enable_health_checker` to allow Python streaming jobs running SDK versions <= 
2.13 to run on Dataflow. Once Python SDK 2.13 becomes unsupported, this flag 
can be cleaned up, and Dataflow will always enable health checker by default.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-7453) [beam_PostCommit_Py_ValCont] [test_wordcount_fnapi_it] TimedOutException

2019-05-31 Thread Andrew Pilloud (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-7453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16853279#comment-16853279
 ] 

Andrew Pilloud commented on BEAM-7453:
--

Still hard failing: 
[https://builds.apache.org/job/beam_PostCommit_Py_ValCont/3408/console]

> [beam_PostCommit_Py_ValCont] [test_wordcount_fnapi_it] TimedOutException
> 
>
> Key: BEAM-7453
> URL: https://issues.apache.org/jira/browse/BEAM-7453
> Project: Beam
>  Issue Type: Bug
>  Components: examples-python, test-failures
>Reporter: Andrew Pilloud
>Assignee: Luke Cwik
>Priority: Major
>  Labels: currently-failing
>
> _These tests are hard failing, is it possible that the timeout is too low?_
>  * [Jenkins 
> Job|[https://builds.apache.org/job/beam_PostCommit_Py_ValCont/3385/console]]
> Initial investigation:
> *14:57:04* TimedOutException: 'test_wordcount_fnapi_it 
> (apache_beam.examples.wordcount_it_test.WordCountIT)'
> ...
> 14:57:04 TimedOutException: 'test_metrics_fnapi_it 
> (apache_beam.runners.dataflow.dataflow_exercise_metrics_pipeline_test.ExerciseMetricsPipelineTest)'
> 
> _After you've filled out the above details, please [assign the issue to an 
> individual|https://beam.apache.org/contribute/postcommits-guides/index.html#find_specialist].
>  Assignee should [treat test failures as 
> high-priority|https://beam.apache.org/contribute/postcommits-policies/#assigned-failing-test],
>  helping to fix the issue or find a more appropriate owner. See [Apache Beam 
> Post-Commit 
> Policies|https://beam.apache.org/contribute/postcommits-policies]._



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-7442) Bounded Reads for Flink Runner fails with OOM

2019-05-31 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7442?focusedWorklogId=251843=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-251843
 ]

ASF GitHub Bot logged work on BEAM-7442:


Author: ASF GitHub Bot
Created on: 31/May/19 15:34
Start Date: 31/May/19 15:34
Worklog Time Spent: 10m 
  Work Description: mxm commented on issue #8715: [BEAM-7442][BEAM-5650] 
Read sequentially from bounded sources in UnboundedSourceWrapper
URL: https://github.com/apache/beam/pull/8715#issuecomment-497754414
 
 
   Thanks for the review!
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 251843)
Time Spent: 1h 50m  (was: 1h 40m)

> Bounded Reads for Flink Runner fails with OOM
> -
>
> Key: BEAM-7442
> URL: https://issues.apache.org/jira/browse/BEAM-7442
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: Akshay Iyangar
>Assignee: Akshay Iyangar
>Priority: Major
>  Time Spent: 1h 50m
>  Remaining Estimate: 0h
>
> When Flink runner is reading from a bounded source and if the total number of 
> files are huge and the count is more. FlinkRunner throws an OOM error. This 
> is happening because the current implementation doesn't read them 
> sequentially but simultaneously thus causing all of the files to be in memory 
> which quickly breaks the cluster.
> Solution : To wrap `UnboundedReadFromBoundedSource` class by a wrapper to see 
> that when the stream is a bounded source we make it read it sequentially 
> using a queue.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-7357) Kinesis IO.write throws LimitExceededException

2019-05-31 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7357?focusedWorklogId=251844=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-251844
 ]

ASF GitHub Bot logged work on BEAM-7357:


Author: ASF GitHub Bot
Created on: 31/May/19 15:34
Start Date: 31/May/19 15:34
Worklog Time Spent: 10m 
  Work Description: aromanenko-dev commented on issue #8730: [BEAM-7357] 
KinesisIO: fix too many checks that writing stream exists.
URL: https://github.com/apache/beam/pull/8730#issuecomment-497754551
 
 
   Run Java PreCommit
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 251844)
Time Spent: 20m  (was: 10m)

> Kinesis IO.write throws LimitExceededException
> --
>
> Key: BEAM-7357
> URL: https://issues.apache.org/jira/browse/BEAM-7357
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-kinesis
>Affects Versions: 2.11.0
>Reporter: Brachi Packter
>Assignee: Alexey Romanenko
>Priority: Major
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> I used Kinesis IO to write to kinesis. I get very quickly many exceptions 
> like:
> [shard_map.cc:150] Shard map update for stream "***" failed. Code: 
> LimitExceededException Message: Rate exceeded for stream *** under account 
> ***; retrying in ..
> Also, I see many exceptions like:
> Caused by: java.lang.IllegalArgumentException: Stream ** does not exist at 
> org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument(Preconditions.java:191)
>  at 
> org.apache.beam.sdk.io.kinesis.KinesisIO$Write$KinesisWriterFn.setup(KinesisIO.java:515)
> I'm sure this stream exists because I can see some data from my pipeline that 
> was successfully ingested to it.
>  
> Here is my code:
>  
>  
> {code:java}
> .apply(KinesisIO.write()
>        .withStreamName("**")
>        .withPartitioner(new KinesisPartitioner() {
>                        @Override
>                         public String getPartitionKey(byte[] value) {
>                                         return UUID.randomUUID().toString()
>                          }
>                         @Override
>                         public String getExplicitHashKey(byte[] value) {
>                                         return null;
>                         }
>        })
>.withAWSClientsProvider("**","***",Regions.US_EAST_1));{code}
>  
> I tried to not use the Kinesis IO. and everything works well, I can't figure 
> out what went wrong.
> I tried using the same API as the library did.
>  
> {code:java}
> .apply(
>  ParDo.of(new DoFn() {
>  private transient IKinesisProducer inlineProducer;
>  @Setup
>  public void setup(){
>  KinesisProducerConfiguration config =   
> KinesisProducerConfiguration.fromProperties(new Properties());
>  config.setRegion(Regions.US_EAST_1.getName());
>  config.setCredentialsProvider(new AWSStaticCredentialsProvider(new 
> BasicAWSCredentials("***", "***")));
>  inlineProducer = new KinesisProducer(config);
>  }
>  @ProcessElement
>  public void processElement(ProcessContext c) throws Exception {
> ByteBuffer data = ByteBuffer.wrap(c.element());
> String partitionKey =UUID.randomUUID().toString();
> ListenableFuture f =
> getProducer().addUserRecord("***", partitionKey, data);
>Futures.addCallback(f, new UserRecordResultFutureCallback());
> }
>  class UserRecordResultFutureCallback implements 
> FutureCallback {
>  @Override
>  public void onFailure(Throwable cause) {
>throw new RuntimeException("failed produce:"+cause);
>  }
>  @Override
>  public void onSuccess(UserRecordResult result) {
>  }
>  }
>  })
>  );
>  
> {code}
>  
> Any idea what I did wrong? or what the error in the KinesisIO?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-7442) Bounded Reads for Flink Runner fails with OOM

2019-05-31 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7442?focusedWorklogId=251845=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-251845
 ]

ASF GitHub Bot logged work on BEAM-7442:


Author: ASF GitHub Bot
Created on: 31/May/19 15:34
Start Date: 31/May/19 15:34
Worklog Time Spent: 10m 
  Work Description: mxm commented on issue #8715: [BEAM-7442][BEAM-5650] 
Read sequentially from bounded sources in UnboundedSourceWrapper
URL: https://github.com/apache/beam/pull/8715#issuecomment-497754559
 
 
   Unrelated failure in Java PreCommit: 
```org.apache.beam.runners.dataflow.worker.fn.control.TimerReceiverTest.testSingleTimerScheduling```
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 251845)
Time Spent: 2h  (was: 1h 50m)

> Bounded Reads for Flink Runner fails with OOM
> -
>
> Key: BEAM-7442
> URL: https://issues.apache.org/jira/browse/BEAM-7442
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: Akshay Iyangar
>Assignee: Akshay Iyangar
>Priority: Major
>  Time Spent: 2h
>  Remaining Estimate: 0h
>
> When Flink runner is reading from a bounded source and if the total number of 
> files are huge and the count is more. FlinkRunner throws an OOM error. This 
> is happening because the current implementation doesn't read them 
> sequentially but simultaneously thus causing all of the files to be in memory 
> which quickly breaks the cluster.
> Solution : To wrap `UnboundedReadFromBoundedSource` class by a wrapper to see 
> that when the stream is a bounded source we make it read it sequentially 
> using a queue.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-2888) Runner Comparison / Capability Matrix revamp

2019-05-31 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-2888?focusedWorklogId=251887=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-251887
 ]

ASF GitHub Bot logged work on BEAM-2888:


Author: ASF GitHub Bot
Created on: 31/May/19 17:02
Start Date: 31/May/19 17:02
Worklog Time Spent: 10m 
  Work Description: kennknowles commented on issue #8576: [BEAM-2888] Add 
the not-yet-fully-designed drain and checkpoint to runner comparison
URL: https://github.com/apache/beam/pull/8576#issuecomment-497785713
 
 
   Ping. Any more to add?
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 251887)
Time Spent: 2h 10m  (was: 2h)

> Runner Comparison / Capability Matrix revamp
> 
>
> Key: BEAM-2888
> URL: https://issues.apache.org/jira/browse/BEAM-2888
> Project: Beam
>  Issue Type: Improvement
>  Components: website
>Reporter: Kenneth Knowles
>Assignee: Griselda Cuevas Zambrano
>Priority: Major
>  Labels: gsod, gsod2019
>  Time Spent: 2h 10m
>  Remaining Estimate: 0h
>
> Discussion: 
> https://lists.apache.org/thread.html/8aff7d70c254356f2dae3109fb605e0b60763602225a877d3dadf8b7@%3Cdev.beam.apache.org%3E
> Summarizing that discussion, we have a lot of issues/wishes. Some can be 
> addressed as one-off and some need a unified reorganization of the runner 
> comparison.
> Basic corrections:
>  - Remove rows that impossible to not support (ParDo)
>  - Remove rows where "support" doesn't really make sense (Composite 
> transforms)
>  - Deduplicate rows are actually the same model feature (all non-merging 
> windowing / all merging windowing)
>  - Clearly separate rows that represent optimizations (Combine)
>  - Correct rows in the wrong place (Timers are actually a "what...?" row)
>  - Separate or remove rows have not been designed ([Meta]Data driven 
> triggers, retractions)
>  - Rename rows with names that appear no where else (Timestamp control, which 
> is called a TimestampCombiner in Java)
>  - Switch to a more distinct color scheme for full/partial support (currently 
> just solid/faded colors)
>  - Switch to something clearer than "~" for partial support, versus ✘ and ✓ 
> for none and full.
>  - Correct Gearpump support for merging windows (see BEAM-2759)
>  - Correct Spark support for non-merging and merging windows (see BEAM-2499)
> Minor rewrites:
>  - Lump all the basic stuff (ParDo, GroupByKey, Read, Window) into one row
>  - Make sections as users see them, like "ParDo" / "side Inputs" not "What?" 
> / "side inputs"
>  - Add rows for non-model things, like portability framework support, metrics 
> backends, etc
> Bigger rewrites:
>  - Add versioning to the comparison, as in BEAM-166
>  - Find a way to fit in a plain English summary of runner's support in Beam. 
> It should come first, as it is what new users need before getting to details.
>  - Find a way to describe production readiness of runners and/or testimonials 
> of who is using it in production.
>  - Have a place to compare non-model differences between runners
> Changes requiring engineering efforts:
>  - Gather and add quantitative runner metrics, perhaps Nexmark results for 
> mid-level, smaller benchmarks for measuring aspects of specific features, and 
> larger end-to-end benchmarks to get an idea how it might actually perform on 
> a use case
>  - Tighter coupling of the matrix portion of the comparison with tags on 
> ValidatesRunner tests
> If you care to address some aspect of this, please reach out and/or just file 
> a subtask and address it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (BEAM-7467) Gearpump Quickstart fails, java.lang.NoClassDefFoundError: com/gs/collections/api/block/procedure/Procedure

2019-05-31 Thread Luke Cwik (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7467?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Cwik reassigned BEAM-7467:
---

Assignee: Manu Zhang  (was: Ankur Goenka)

> Gearpump Quickstart fails, java.lang.NoClassDefFoundError: 
> com/gs/collections/api/block/procedure/Procedure
> ---
>
> Key: BEAM-7467
> URL: https://issues.apache.org/jira/browse/BEAM-7467
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-gearpump
>Affects Versions: 2.13.0
>Reporter: Luke Cwik
>Assignee: Manu Zhang
>Priority: Blocker
>
> After generating the archetype for the 2.13.0 RC2, the following quick start 
> command fails:
> {code:java}
> mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount 
> -Dexec.args="--inputFile=pom.xml --output=counts --runner=GearpumpRunner" 
> -Pgearpump-runner{code}
> I also tried:
> {code:java}
> mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount 
> -Dexec.args="--inputFile=pom.xml --output=counts --runner=GearpumpRunner" 
> -Pgearpump-runner{code}
> Log:
> {code:java}
> SLF4J: Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in 
> [jar:file:/usr/local/google/home/lcwik/.m2/repository/org/slf4j/slf4j-jdk14/1.7.25/slf4j-jdk14-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/usr/local/google/home/lcwik/.m2/repository/org/slf4j/slf4j-log4j12/1.7.16/slf4j-log4j12-1.7.16.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an 
> explanation.
> SLF4J: Actual binding is of type [org.slf4j.impl.JDK14LoggerFactory]
> May 31, 2019 9:38:26 AM akka.event.slf4j.Slf4jLogger$$anonfun$receive$1 
> applyOrElse
> INFO: Slf4jLogger started
> May 31, 2019 9:38:26 AM akka.event.slf4j.Slf4jLogger$$anonfun$receive$1 
> $anonfun$applyOrElse$3
> INFO: Starting remoting
> May 31, 2019 9:38:26 AM akka.event.slf4j.Slf4jLogger$$anonfun$receive$1 
> $anonfun$applyOrElse$3
> INFO: Remoting started; listening on addresses 
> :[akka.tcp://client1820912745@127.0.0.1:38849]
> May 31, 2019 9:38:26 AM io.gearpump.metrics.Metrics$ createExtension
> INFO: Metrics is enabled..., false
> May 31, 2019 9:38:26 AM io.gearpump.cluster.master.MasterProxy 
> INFO: Master Proxy is started...
> [WARNING] 
> java.lang.NoClassDefFoundError: 
> com/gs/collections/api/block/procedure/Procedure
>  at io.gearpump.streaming.dsl.plan.WindowOp.chain (OP.scala:273)
>  at io.gearpump.streaming.dsl.plan.Planner.merge (Planner.scala:86)
>  at io.gearpump.streaming.dsl.plan.Planner.$anonfun$optimize$2 
> (Planner.scala:71)
>  at io.gearpump.streaming.dsl.plan.Planner.$anonfun$optimize$2$adapted 
> (Planner.scala:70)
>  at scala.collection.mutable.HashSet.foreach (HashSet.scala:77)
>  at io.gearpump.streaming.dsl.plan.Planner.$anonfun$optimize$1 
> (Planner.scala:70)
>  at io.gearpump.streaming.dsl.plan.Planner.$anonfun$optimize$1$adapted 
> (Planner.scala:68)
>  at scala.collection.immutable.List.foreach (List.scala:388)
>  at io.gearpump.streaming.dsl.plan.Planner.optimize (Planner.scala:68)
>  at io.gearpump.streaming.dsl.plan.Planner.plan (Planner.scala:48)
>  at io.gearpump.streaming.dsl.scalaapi.StreamApp.plan (StreamApp.scala:59)
>  at io.gearpump.streaming.dsl.scalaapi.StreamApp$.streamAppToApplication 
> (StreamApp.scala:82)
>  at io.gearpump.streaming.dsl.javaapi.JavaStreamApp.submit 
> (JavaStreamApp.scala:44)
>  at org.apache.beam.runners.gearpump.GearpumpRunner.run 
> (GearpumpRunner.java:83)
>  at org.apache.beam.runners.gearpump.GearpumpRunner.run 
> (GearpumpRunner.java:44)
>  at org.apache.beam.sdk.Pipeline.run (Pipeline.java:313)
>  at org.apache.beam.sdk.Pipeline.run (Pipeline.java:299)
>  at org.apache.beam.examples.WordCount.runWordCount (WordCount.java:185)
>  at org.apache.beam.examples.WordCount.main (WordCount.java:192)
>  at sun.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
>  at sun.reflect.NativeMethodAccessorImpl.invoke 
> (NativeMethodAccessorImpl.java:62)
>  at sun.reflect.DelegatingMethodAccessorImpl.invoke 
> (DelegatingMethodAccessorImpl.java:43)
>  at java.lang.reflect.Method.invoke (Method.java:498)
>  at org.codehaus.mojo.exec.ExecJavaMojo$1.run (ExecJavaMojo.java:282)
>  at java.lang.Thread.run (Thread.java:748)
> Caused by: java.lang.ClassNotFoundException: 
> com.gs.collections.api.block.procedure.Procedure
>  at java.net.URLClassLoader.findClass (URLClassLoader.java:381)
>  at java.lang.ClassLoader.loadClass (ClassLoader.java:424)
>  at java.lang.ClassLoader.loadClass (ClassLoader.java:357)
>  at io.gearpump.streaming.dsl.plan.WindowOp.chain (OP.scala:273)
>  at io.gearpump.streaming.dsl.plan.Planner.merge (Planner.scala:86)
>  at 

[jira] [Closed] (BEAM-6623) Dataflow ValidatesRunner test suite should also exercise ValidatesRunner tests under Python 3.

2019-05-31 Thread Valentyn Tymofieiev (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6623?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Valentyn Tymofieiev closed BEAM-6623.
-
Resolution: Fixed

> Dataflow ValidatesRunner test suite should also exercise ValidatesRunner 
> tests under Python 3.
> --
>
> Key: BEAM-6623
> URL: https://issues.apache.org/jira/browse/BEAM-6623
> Project: Beam
>  Issue Type: Sub-task
>  Components: testing
>Reporter: Valentyn Tymofieiev
>Assignee: Frederik Bode
>Priority: Blocker
> Fix For: 2.14.0
>
>  Time Spent: 14h 40m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (BEAM-7459) Add smoke integration tests to Precommit test suites on Python 3

2019-05-31 Thread Valentyn Tymofieiev (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7459?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Valentyn Tymofieiev closed BEAM-7459.
-
   Resolution: Fixed
Fix Version/s: Not applicable

> Add smoke integration tests to Precommit  test suites on Python 3 
> --
>
> Key: BEAM-7459
> URL: https://issues.apache.org/jira/browse/BEAM-7459
> Project: Beam
>  Issue Type: Sub-task
>  Components: testing
>Reporter: Valentyn Tymofieiev
>Assignee: Valentyn Tymofieiev
>Priority: Major
> Fix For: Not applicable
>
>  Time Spent: 1h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (BEAM-7332) Blog post announcing Beam Katas

2019-05-31 Thread Henry Suryawirawan (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7332?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Henry Suryawirawan resolved BEAM-7332.
--
Resolution: Fixed

> Blog post announcing Beam Katas
> ---
>
> Key: BEAM-7332
> URL: https://issues.apache.org/jira/browse/BEAM-7332
> Project: Beam
>  Issue Type: Task
>  Components: website
>Reporter: Henry Suryawirawan
>Priority: Trivial
> Fix For: Not applicable
>
>  Time Spent: 40m
>  Remaining Estimate: 0h
>
> Publishing a quick blog post that lets the users know about Beam Katas.
>   



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (BEAM-7332) Blog post announcing Beam Katas

2019-05-31 Thread Henry Suryawirawan (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7332?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Henry Suryawirawan closed BEAM-7332.


The blog has been posted.

> Blog post announcing Beam Katas
> ---
>
> Key: BEAM-7332
> URL: https://issues.apache.org/jira/browse/BEAM-7332
> Project: Beam
>  Issue Type: Task
>  Components: website
>Reporter: Henry Suryawirawan
>Priority: Trivial
> Fix For: Not applicable
>
>  Time Spent: 40m
>  Remaining Estimate: 0h
>
> Publishing a quick blog post that lets the users know about Beam Katas.
>   



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-7342) Extend SyntheticPipeline map steps to be able to be splittable (Beam Python SDK)

2019-05-31 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7342?focusedWorklogId=251996=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-251996
 ]

ASF GitHub Bot logged work on BEAM-7342:


Author: ASF GitHub Bot
Created on: 31/May/19 18:57
Start Date: 31/May/19 18:57
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on pull request #8607: [BEAM-7342] 
Extend SyntheticPipeline map steps to be splittable.
URL: https://github.com/apache/beam/pull/8607#discussion_r289514063
 
 

 ##
 File path: sdks/python/apache_beam/testing/synthetic_pipeline.py
 ##
 @@ -116,6 +118,149 @@ def process(self, element):
 yield element
 
 
+class NonLiquidShardingOffsetRangeTracker(OffsetRestrictionTracker):
+  """ A OffsetRangeTracker that doesn't allow splitting. """
+
+  def try_split(self, split_offset):
+return  # Don't split.
+
+  def checkpoint(self):
+return # Don't split.
+
+
+class SyntheticSDFStepRestrictionProvider(RestrictionProvider):
+  """A `RestrictionProvider` for SyntheticSDFStep.
+
+  An initial_restriction and split that operate on num_records and ignore
+  source description (element). Splits into initial_splitting_num_bundles.
+  Returns size_estimate_override as restriction size, if set. Otherwise uses
+  element size.
+
+  If initial_splitting_uneven_chunks, produces uneven chunks.
+
+  """
+
+  def __init__(self, num_records, initial_splitting_num_bundles,
+   initial_splitting_uneven_chunks, disable_liquid_sharding,
+   size_estimate_override):
+self._num_records = num_records
+self._initial_splitting_num_bundles = initial_splitting_num_bundles
+self._initial_splitting_uneven_chunks = initial_splitting_uneven_chunks
+self._disable_liquid_sharding = disable_liquid_sharding
+self._size_estimate_override = size_estimate_override
+
+  def initial_restriction(self, element):
+return (0, self._num_records)
+
+  def create_tracker(self, restriction):
+if self._disable_liquid_sharding:
+  return NonLiquidShardingOffsetRangeTracker(restriction[0],
+ restriction[1])
+else:
+  # OffsetRange.new_tracker returns a RangeTracker - not 
RestrictionTracker.
+  return OffsetRestrictionTracker(restriction[0], restriction[1])
+
+  def split_randomly(self, restriction):
+''' Randomly split the restriction into the right number of bundles.'''
+elems = restriction[1] - restriction[0]
+bundles = self._initial_splitting_num_bundles
+randomNums = [np.random.randint(0, elems - 1) for _ in
+  range(0, bundles - 1)]
+print randomNums
+randomNums.append(0)
+randomNums.append(elems)
+randomNums.sort()
+for i in range(1, len(randomNums)):
+  yield (restriction[0] + randomNums[i - 1],
+ restriction[0] + randomNums[i])
+
+  def split(self, element, restriction):
+elems = restriction[1] - restriction[0]
 
 Review comment:
   You can also factor out the offsets_per_split logic that you have below to 
reuse 
https://github.com/apache/beam/blob/2471e3e4656806f4b1aefa2d677ef0e3256b41f1/sdks/python/apache_beam/testing/synthetic_pipeline.py#L214
   I'm just trying to ensure that the two behaviors don't diverge in how the 
splits are computed.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 251996)
Time Spent: 6h 20m  (was: 6h 10m)

> Extend SyntheticPipeline map steps to be able to be splittable (Beam Python 
> SDK)
> 
>
> Key: BEAM-7342
> URL: https://issues.apache.org/jira/browse/BEAM-7342
> Project: Beam
>  Issue Type: New Feature
>  Components: testing
> Environment: Beam Python
>Reporter: Lara Schmidt
>Assignee: Lara Schmidt
>Priority: Minor
>   Original Estimate: 1m
>  Time Spent: 6h 20m
>  Remaining Estimate: 0h
>
> Add the ability for map steps to be configured to be splittable. 
> Possible configuration options:
>  - uneven bundle sizes
>  - possible incorrect sizing returned



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-7443) BoundedSource->SDF needs a wrapper in Python SDK

2019-05-31 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7443?focusedWorklogId=251997=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-251997
 ]

ASF GitHub Bot logged work on BEAM-7443:


Author: ASF GitHub Bot
Created on: 31/May/19 18:57
Start Date: 31/May/19 18:57
Worklog Time Spent: 10m 
  Work Description: boyuanzz commented on pull request #8641: [BEAM-7443] 
Create a BoundedSource -> SDF wrapper in Python SDK
URL: https://github.com/apache/beam/pull/8641#discussion_r289514154
 
 

 ##
 File path: sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
 ##
 @@ -478,6 +479,22 @@ def process(
   self.assertEqual(1, len(counters))
   self.assertEqual(counters[0].committed, len(''.join(data)))
 
+  def test_sdf_wrap_range_source(self):
+with self.create_pipeline() as p:
+  from apache_beam.options.pipeline_options import DebugOptions
+  experiments = (p._options.view_as(DebugOptions).experiments or [])
+
+  # Setup experiment option to enable using SDFBoundedSourceWrapper
+  if not 'use_sdf_bounded_source' in experiments:
+experiments.append('use_sdf_bounded_source')
+
+  p._options.view_as(DebugOptions).experiments = experiments
+
+  actual = (
+  p | beam.io.Read(RangeSource(0, 4))
+  )
+  assert_that(actual, equal_to([0, 1, 2, 3]))
 
 Review comment:
   Adding one more test case to verify overrides: 
https://github.com/apache/beam/pull/8641/commits/2392ae94101c71135466837547838d4b8bc1e090
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 251997)
Time Spent: 2h 50m  (was: 2h 40m)

>  BoundedSource->SDF needs a wrapper in Python SDK
> -
>
> Key: BEAM-7443
> URL: https://issues.apache.org/jira/browse/BEAM-7443
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: Boyuan Zhang
>Assignee: Boyuan Zhang
>Priority: Major
>  Time Spent: 2h 50m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-7305) Add first version of Hazelcast Jet Runner

2019-05-31 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7305?focusedWorklogId=252064=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-252064
 ]

ASF GitHub Bot logged work on BEAM-7305:


Author: ASF GitHub Bot
Created on: 31/May/19 21:13
Start Date: 31/May/19 21:13
Worklog Time Spent: 10m 
  Work Description: viliam-durina commented on pull request #8699: 
[BEAM-7305] Fix issues in and extend documentation for Hazelcast Jet Runner 
URL: https://github.com/apache/beam/pull/8699#discussion_r289552490
 
 

 ##
 File path: 
runners/jet-experimental/src/main/java/org/apache/beam/runners/jet/processors/ImpulseP.java
 ##
 @@ -19,28 +19,89 @@
 
 import com.hazelcast.jet.core.AbstractProcessor;
 import com.hazelcast.jet.core.Processor;
-import com.hazelcast.jet.function.SupplierEx;
+import com.hazelcast.jet.core.ProcessorMetaSupplier;
+import com.hazelcast.jet.core.ProcessorSupplier;
+import com.hazelcast.nio.Address;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.function.Function;
+import javax.annotation.Nonnull;
+import org.apache.beam.runners.jet.Utils;
+import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.util.WindowedValue;
 
 /**
  * /** * Jet {@link com.hazelcast.jet.core.Processor} implementation for 
Beam's Impulse primitive.
  */
 public class ImpulseP extends AbstractProcessor {
 
+  private final boolean active;
+  private final Coder outputCoder;
   private final String ownerId; // do not remove it, very useful for debugging
 
-  private ImpulseP(String ownerId) {
+  private ImpulseP(boolean active, Coder outputCoder, String ownerId) {
+this.active = active;
+this.outputCoder = outputCoder;
 this.ownerId = ownerId;
   }
 
-  public static SupplierEx supplier(String ownerId) {
-return () -> new ImpulseP(ownerId);
-  }
-
   @Override
   public boolean complete() {
-return tryEmit(
-WindowedValue.valueInGlobalWindow(
-new byte[0])); // todo: should EACH processor emit this byte[] or 
just a SINGLE one?
+if (active) {
+  return tryEmit(Utils.encode(WindowedValue.valueInGlobalWindow(new 
byte[0]), outputCoder));
+} else {
+  return true;
+}
+  }
+
+  public static ProcessorMetaSupplier supplier(Coder outputCoder, String 
ownerId) {
+return new ImpulseMetaProcessorSupplier(outputCoder, ownerId);
+  }
+
+  private static class ImpulseMetaProcessorSupplier implements 
ProcessorMetaSupplier {
+
+private final Coder outputCoder;
+private final String ownerId;
+
+private ImpulseMetaProcessorSupplier(Coder outputCoder, String ownerId) {
+  this.outputCoder = outputCoder;
+  this.ownerId = ownerId;
+}
+
+@SuppressWarnings("unchecked")
+@Nonnull
+@Override
+public Function get(
+@Nonnull List addresses) {
+  return address -> new ImpulseProcessorSupplier(outputCoder, ownerId);
+}
+  }
+
+  private static class ImpulseProcessorSupplier implements 
ProcessorSupplier {
+private final Coder outputCoder;
+private final String ownerId;
+private transient ProcessorSupplier.Context context;
+
+private ImpulseProcessorSupplier(Coder outputCoder, String ownerId) {
+  this.outputCoder = outputCoder;
+  this.ownerId = ownerId;
+}
+
+@Override
+public void init(@Nonnull Context context) {
+  this.context = context;
+}
+
+@Nonnull
+@Override
+public Collection get(int count) {
+  int indexBase = context.memberIndex() * context.localParallelism();
+  List res = new ArrayList<>(count);
+  for (int i = 0; i < count; i++, indexBase++) {
+res.add(new ImpulseP(indexBase == 0, outputCoder, ownerId));
 
 Review comment:
   You don't need PMS/PS for this, `Processor.Context` has 
`globalProcessorIndex()`.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 252064)
Time Spent: 8.5h  (was: 8h 20m)

> Add first version of Hazelcast Jet Runner
> -
>
> Key: BEAM-7305
> URL: https://issues.apache.org/jira/browse/BEAM-7305
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-jet
>Reporter: Maximilian Michels
>Assignee: Jozsef Bartok
>Priority: Major
> Fix For: 2.14.0
>
>  Time Spent: 8.5h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-7305) Add first version of Hazelcast Jet Runner

2019-05-31 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7305?focusedWorklogId=252065=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-252065
 ]

ASF GitHub Bot logged work on BEAM-7305:


Author: ASF GitHub Bot
Created on: 31/May/19 21:13
Start Date: 31/May/19 21:13
Worklog Time Spent: 10m 
  Work Description: viliam-durina commented on pull request #8699: 
[BEAM-7305] Fix issues in and extend documentation for Hazelcast Jet Runner 
URL: https://github.com/apache/beam/pull/8699#discussion_r289551291
 
 

 ##
 File path: 
runners/jet-experimental/src/main/java/org/apache/beam/runners/jet/processors/AbstractParDoP.java
 ##
 @@ -247,7 +255,12 @@ private void processBufferedRegularItems(Inbox inbox) {
 
   @Override
   public boolean tryProcess() {
-return outputManager.tryFlush();
+boolean successful = outputManager.tryFlush();
+if (successful && System.currentTimeMillis() > lastMetricsFlushTime + 
metricsFlushPeriod) {
 
 Review comment:
   Should use `System.nanoTime`, real-time clock can move back and forth. Also 
you can use `nextMetricsFlushTime` and save one add operation for most calls.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 252065)
Time Spent: 8h 40m  (was: 8.5h)

> Add first version of Hazelcast Jet Runner
> -
>
> Key: BEAM-7305
> URL: https://issues.apache.org/jira/browse/BEAM-7305
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-jet
>Reporter: Maximilian Michels
>Assignee: Jozsef Bartok
>Priority: Major
> Fix For: 2.14.0
>
>  Time Spent: 8h 40m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-7397) Avoid String.format in state namespace construction

2019-05-31 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7397?focusedWorklogId=251947=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-251947
 ]

ASF GitHub Bot logged work on BEAM-7397:


Author: ASF GitHub Bot
Created on: 31/May/19 18:27
Start Date: 31/May/19 18:27
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on issue #8662: [BEAM-7397] Avoid 
String.format in state namespace key construction
URL: https://github.com/apache/beam/pull/8662#issuecomment-497814046
 
 
   Run Java PreCommit
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 251947)
Time Spent: 0.5h  (was: 20m)

> Avoid String.format in state namespace construction
> ---
>
> Key: BEAM-7397
> URL: https://issues.apache.org/jira/browse/BEAM-7397
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-core
>Reporter: Steve Niemitz
>Assignee: Steve Niemitz
>Priority: Minor
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> I've noticed that for jobs that access state frequently, a percent or two of 
> overall CPU time is actually spent calculating the state namespace name.
> StateNamespaces.java uses a String.format for the window namespaces, but 
> could simply use a string concatenation to do the same thing with much less 
> overhead. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-7397) Avoid String.format in state namespace construction

2019-05-31 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7397?focusedWorklogId=251948=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-251948
 ]

ASF GitHub Bot logged work on BEAM-7397:


Author: ASF GitHub Bot
Created on: 31/May/19 18:27
Start Date: 31/May/19 18:27
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on issue #8662: [BEAM-7397] Avoid 
String.format in state namespace key construction
URL: https://github.com/apache/beam/pull/8662#issuecomment-497814120
 
 
   Will merge on green
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 251948)
Time Spent: 40m  (was: 0.5h)

> Avoid String.format in state namespace construction
> ---
>
> Key: BEAM-7397
> URL: https://issues.apache.org/jira/browse/BEAM-7397
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-core
>Reporter: Steve Niemitz
>Assignee: Steve Niemitz
>Priority: Minor
>  Time Spent: 40m
>  Remaining Estimate: 0h
>
> I've noticed that for jobs that access state frequently, a percent or two of 
> overall CPU time is actually spent calculating the state namespace name.
> StateNamespaces.java uses a String.format for the window namespaces, but 
> could simply use a string concatenation to do the same thing with much less 
> overhead. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3489) Expose the message id of received messages within PubsubMessage

2019-05-31 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3489?focusedWorklogId=251981=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-251981
 ]

ASF GitHub Bot logged work on BEAM-3489:


Author: ASF GitHub Bot
Created on: 31/May/19 18:38
Start Date: 31/May/19 18:38
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on issue #8370: [BEAM-3489] add 
PubSub messageId in PubsubMessage
URL: https://github.com/apache/beam/pull/8370#issuecomment-497818076
 
 
   I kicked off the postcommit to validate that the IT you added.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 251981)
Time Spent: 2h  (was: 1h 50m)

> Expose the message id of received messages within PubsubMessage
> ---
>
> Key: BEAM-3489
> URL: https://issues.apache.org/jira/browse/BEAM-3489
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-gcp
>Reporter: Luke Cwik
>Assignee: Thinh Ha
>Priority: Minor
>  Labels: newbie, starter
>  Time Spent: 2h
>  Remaining Estimate: 0h
>
> This task is about passing forward the message id from the pubsub proto to 
> the java PubsubMessage.
> Add a message id field to PubsubMessage.
> Update the coder for PubsubMessage to encode the message id.
> Update the translation from the Pubsub proto message to the Dataflow message:
> https://github.com/apache/beam/blob/2e275264b21db45787833502e5e42907b05e28b8/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java#L976



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3489) Expose the message id of received messages within PubsubMessage

2019-05-31 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3489?focusedWorklogId=251980=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-251980
 ]

ASF GitHub Bot logged work on BEAM-3489:


Author: ASF GitHub Bot
Created on: 31/May/19 18:38
Start Date: 31/May/19 18:38
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on issue #8370: [BEAM-3489] add 
PubSub messageId in PubsubMessage
URL: https://github.com/apache/beam/pull/8370#issuecomment-497817941
 
 
   Run Java PostCommit
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 251980)
Time Spent: 1h 50m  (was: 1h 40m)

> Expose the message id of received messages within PubsubMessage
> ---
>
> Key: BEAM-3489
> URL: https://issues.apache.org/jira/browse/BEAM-3489
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-gcp
>Reporter: Luke Cwik
>Assignee: Thinh Ha
>Priority: Minor
>  Labels: newbie, starter
>  Time Spent: 1h 50m
>  Remaining Estimate: 0h
>
> This task is about passing forward the message id from the pubsub proto to 
> the java PubsubMessage.
> Add a message id field to PubsubMessage.
> Update the coder for PubsubMessage to encode the message id.
> Update the translation from the Pubsub proto message to the Dataflow message:
> https://github.com/apache/beam/blob/2e275264b21db45787833502e5e42907b05e28b8/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java#L976



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-6674) The JdbcIO source should produce schemas

2019-05-31 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6674?focusedWorklogId=251951=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-251951
 ]

ASF GitHub Bot logged work on BEAM-6674:


Author: ASF GitHub Bot
Created on: 31/May/19 18:28
Start Date: 31/May/19 18:28
Worklog Time Spent: 10m 
  Work Description: reuvenlax commented on pull request #8725: [BEAM-6674] 
Add schema support to JdbcIO read
URL: https://github.com/apache/beam/pull/8725#discussion_r289493336
 
 

 ##
 File path: 
sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/SchemaUtil.java
 ##
 @@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.jdbc;
+
+import java.io.Serializable;
+import java.sql.Array;
+import java.sql.Date;
+import java.sql.JDBCType;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.time.ZoneOffset;
+import java.time.ZonedDateTime;
+import java.util.ArrayList;
+import java.util.EnumMap;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.beam.sdk.schemas.LogicalTypes;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.values.Row;
+import 
org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting;
+import 
org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap;
+import org.joda.time.DateTime;
+import org.joda.time.chrono.ISOChronology;
+
+/** Provides utility functions for working with Beam {@link Schema} types. */
+class SchemaUtil {
+  private static final String SQL_DATE = "SqlDateType";
+  private static final String SQL_TIME = "SqlTimeType";
+  private static final String SQL_TIMESTAMP_WITH_LOCAL_TZ = 
"SqlTimestampWithLocalTzType";
+
+  @VisibleForTesting
+  static final Schema.FieldType SQL_DATE_LOGICAL_TYPE =
+  Schema.FieldType.logicalType(
+  new LogicalTypes.PassThroughLogicalType(
+  SQL_DATE, "", Schema.FieldType.DATETIME) {});
+
+  @VisibleForTesting
+  static final Schema.FieldType SQL_TIME_LOGICAL_TYPE =
+  Schema.FieldType.logicalType(
+  new LogicalTypes.PassThroughLogicalType(
+  SQL_TIME, "", Schema.FieldType.DATETIME) {});
+
+  @VisibleForTesting
+  static final Schema.FieldType SQL_TIMESTAMP_WITH_LOCAL_TZ_LOGICAL_TYPE =
+  Schema.FieldType.logicalType(
+  new LogicalTypes.PassThroughLogicalType(
+  SQL_TIMESTAMP_WITH_LOCAL_TZ, "", Schema.FieldType.DATETIME) {});
+
+  /**
+   * Interface implemented by functions that extract values of different types 
from a JDBC
+   * ResultSet.
+   */
+  @FunctionalInterface
+  interface ResultSetFieldExtractor extends Serializable {
+Object extract(ResultSet rs, Integer index) throws SQLException;
+  }
+
+  // ResultSetExtractors for primitive schema types (excluding arrays, structs 
and logical types).
+  private static final EnumMap
+  RESULTSET_FIELD_EXTRACTORS =
+  new EnumMap<>(
+  ImmutableMap.builder()
+  .put(Schema.TypeName.BOOLEAN, ResultSet::getBoolean)
+  .put(Schema.TypeName.BYTE, ResultSet::getByte)
+  .put(Schema.TypeName.BYTES, ResultSet::getBytes)
+  .put(Schema.TypeName.DATETIME, ResultSet::getTimestamp)
+  .put(Schema.TypeName.DECIMAL, ResultSet::getBigDecimal)
+  .put(Schema.TypeName.DOUBLE, ResultSet::getDouble)
+  .put(Schema.TypeName.FLOAT, ResultSet::getFloat)
+  .put(Schema.TypeName.INT16, ResultSet::getShort)
+  .put(Schema.TypeName.INT32, ResultSet::getInt)
+  .put(Schema.TypeName.INT64, ResultSet::getLong)
+  .put(Schema.TypeName.STRING, ResultSet::getString)
+  .build());
+
+  private static final ResultSetFieldExtractor DATE_EXTRACTOR = 
createDateExtractor();
+  private static final 

[jira] [Work logged] (BEAM-6674) The JdbcIO source should produce schemas

2019-05-31 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6674?focusedWorklogId=251954=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-251954
 ]

ASF GitHub Bot logged work on BEAM-6674:


Author: ASF GitHub Bot
Created on: 31/May/19 18:28
Start Date: 31/May/19 18:28
Worklog Time Spent: 10m 
  Work Description: reuvenlax commented on pull request #8725: [BEAM-6674] 
Add schema support to JdbcIO read
URL: https://github.com/apache/beam/pull/8725#discussion_r289435182
 
 

 ##
 File path: 
sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/SchemaUtil.java
 ##
 @@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.jdbc;
+
+import java.io.Serializable;
+import java.sql.Array;
+import java.sql.Date;
+import java.sql.JDBCType;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.time.ZoneOffset;
+import java.time.ZonedDateTime;
+import java.util.ArrayList;
+import java.util.EnumMap;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.beam.sdk.schemas.LogicalTypes;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.values.Row;
+import 
org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting;
+import 
org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap;
+import org.joda.time.DateTime;
+import org.joda.time.chrono.ISOChronology;
+
+/** Provides utility functions for working with Beam {@link Schema} types. */
+class SchemaUtil {
+  private static final String SQL_DATE = "SqlDateType";
+  private static final String SQL_TIME = "SqlTimeType";
+  private static final String SQL_TIMESTAMP_WITH_LOCAL_TZ = 
"SqlTimestampWithLocalTzType";
+
+  @VisibleForTesting
+  static final Schema.FieldType SQL_DATE_LOGICAL_TYPE =
+  Schema.FieldType.logicalType(
+  new LogicalTypes.PassThroughLogicalType(
+  SQL_DATE, "", Schema.FieldType.DATETIME) {});
+
+  @VisibleForTesting
+  static final Schema.FieldType SQL_TIME_LOGICAL_TYPE =
+  Schema.FieldType.logicalType(
+  new LogicalTypes.PassThroughLogicalType(
+  SQL_TIME, "", Schema.FieldType.DATETIME) {});
+
+  @VisibleForTesting
+  static final Schema.FieldType SQL_TIMESTAMP_WITH_LOCAL_TZ_LOGICAL_TYPE =
+  Schema.FieldType.logicalType(
+  new LogicalTypes.PassThroughLogicalType(
+  SQL_TIMESTAMP_WITH_LOCAL_TZ, "", Schema.FieldType.DATETIME) {});
+
+  /**
+   * Interface implemented by functions that extract values of different types 
from a JDBC
+   * ResultSet.
+   */
+  @FunctionalInterface
+  interface ResultSetFieldExtractor extends Serializable {
+Object extract(ResultSet rs, Integer index) throws SQLException;
+  }
+
+  // ResultSetExtractors for primitive schema types (excluding arrays, structs 
and logical types).
+  private static final EnumMap
+  RESULTSET_FIELD_EXTRACTORS =
+  new EnumMap<>(
+  ImmutableMap.builder()
+  .put(Schema.TypeName.BOOLEAN, ResultSet::getBoolean)
+  .put(Schema.TypeName.BYTE, ResultSet::getByte)
+  .put(Schema.TypeName.BYTES, ResultSet::getBytes)
+  .put(Schema.TypeName.DATETIME, ResultSet::getTimestamp)
+  .put(Schema.TypeName.DECIMAL, ResultSet::getBigDecimal)
+  .put(Schema.TypeName.DOUBLE, ResultSet::getDouble)
+  .put(Schema.TypeName.FLOAT, ResultSet::getFloat)
+  .put(Schema.TypeName.INT16, ResultSet::getShort)
+  .put(Schema.TypeName.INT32, ResultSet::getInt)
+  .put(Schema.TypeName.INT64, ResultSet::getLong)
+  .put(Schema.TypeName.STRING, ResultSet::getString)
+  .build());
+
+  private static final ResultSetFieldExtractor DATE_EXTRACTOR = 
createDateExtractor();
+  private static final 

[jira] [Work logged] (BEAM-6674) The JdbcIO source should produce schemas

2019-05-31 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6674?focusedWorklogId=251952=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-251952
 ]

ASF GitHub Bot logged work on BEAM-6674:


Author: ASF GitHub Bot
Created on: 31/May/19 18:28
Start Date: 31/May/19 18:28
Worklog Time Spent: 10m 
  Work Description: reuvenlax commented on pull request #8725: [BEAM-6674] 
Add schema support to JdbcIO read
URL: https://github.com/apache/beam/pull/8725#discussion_r289493790
 
 

 ##
 File path: 
sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/SchemaUtil.java
 ##
 @@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.jdbc;
+
+import java.io.Serializable;
+import java.sql.Array;
+import java.sql.Date;
+import java.sql.JDBCType;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.time.ZoneOffset;
+import java.time.ZonedDateTime;
+import java.util.ArrayList;
+import java.util.EnumMap;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.beam.sdk.schemas.LogicalTypes;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.values.Row;
+import 
org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting;
+import 
org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap;
+import org.joda.time.DateTime;
+import org.joda.time.chrono.ISOChronology;
+
+/** Provides utility functions for working with Beam {@link Schema} types. */
+class SchemaUtil {
+  private static final String SQL_DATE = "SqlDateType";
+  private static final String SQL_TIME = "SqlTimeType";
+  private static final String SQL_TIMESTAMP_WITH_LOCAL_TZ = 
"SqlTimestampWithLocalTzType";
+
+  @VisibleForTesting
+  static final Schema.FieldType SQL_DATE_LOGICAL_TYPE =
+  Schema.FieldType.logicalType(
+  new LogicalTypes.PassThroughLogicalType(
+  SQL_DATE, "", Schema.FieldType.DATETIME) {});
+
+  @VisibleForTesting
+  static final Schema.FieldType SQL_TIME_LOGICAL_TYPE =
+  Schema.FieldType.logicalType(
+  new LogicalTypes.PassThroughLogicalType(
+  SQL_TIME, "", Schema.FieldType.DATETIME) {});
+
+  @VisibleForTesting
+  static final Schema.FieldType SQL_TIMESTAMP_WITH_LOCAL_TZ_LOGICAL_TYPE =
+  Schema.FieldType.logicalType(
+  new LogicalTypes.PassThroughLogicalType(
+  SQL_TIMESTAMP_WITH_LOCAL_TZ, "", Schema.FieldType.DATETIME) {});
+
+  /**
+   * Interface implemented by functions that extract values of different types 
from a JDBC
+   * ResultSet.
+   */
+  @FunctionalInterface
+  interface ResultSetFieldExtractor extends Serializable {
+Object extract(ResultSet rs, Integer index) throws SQLException;
+  }
+
+  // ResultSetExtractors for primitive schema types (excluding arrays, structs 
and logical types).
+  private static final EnumMap
+  RESULTSET_FIELD_EXTRACTORS =
+  new EnumMap<>(
+  ImmutableMap.builder()
+  .put(Schema.TypeName.BOOLEAN, ResultSet::getBoolean)
+  .put(Schema.TypeName.BYTE, ResultSet::getByte)
+  .put(Schema.TypeName.BYTES, ResultSet::getBytes)
+  .put(Schema.TypeName.DATETIME, ResultSet::getTimestamp)
+  .put(Schema.TypeName.DECIMAL, ResultSet::getBigDecimal)
+  .put(Schema.TypeName.DOUBLE, ResultSet::getDouble)
+  .put(Schema.TypeName.FLOAT, ResultSet::getFloat)
+  .put(Schema.TypeName.INT16, ResultSet::getShort)
+  .put(Schema.TypeName.INT32, ResultSet::getInt)
+  .put(Schema.TypeName.INT64, ResultSet::getLong)
+  .put(Schema.TypeName.STRING, ResultSet::getString)
+  .build());
+
+  private static final ResultSetFieldExtractor DATE_EXTRACTOR = 
createDateExtractor();
+  private static final 

[jira] [Work logged] (BEAM-6674) The JdbcIO source should produce schemas

2019-05-31 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6674?focusedWorklogId=251953=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-251953
 ]

ASF GitHub Bot logged work on BEAM-6674:


Author: ASF GitHub Bot
Created on: 31/May/19 18:28
Start Date: 31/May/19 18:28
Worklog Time Spent: 10m 
  Work Description: reuvenlax commented on pull request #8725: [BEAM-6674] 
Add schema support to JdbcIO read
URL: https://github.com/apache/beam/pull/8725#discussion_r289503686
 
 

 ##
 File path: 
sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
 ##
 @@ -188,6 +191,15 @@
 .build();
   }
 
+  /** Read Beam {@link Row}s from a JDBC data source. */
+  @Experimental(Experimental.Kind.SCHEMAS)
+  public static ReadRows readRows() {
+return new AutoValue_JdbcIO_ReadRows.Builder()
+.setFetchSize(DEFAULT_FETCH_SIZE)
+.setOutputParallelization(true)
+.build();
+  }
 
 Review comment:
   This shouldn't be limited to just reading Row objects, as any type can have 
a schema. So you should be able to have a readWithSchema(Class clazz) and 
get back a PCollection (if the SchemaRegistry contains a schema for clazz, 
and if that schema matches the one inferred at the source). readRows is still 
useful but can be a wrapper around this.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 251953)
Time Spent: 50m  (was: 40m)

> The JdbcIO source should produce schemas
> 
>
> Key: BEAM-6674
> URL: https://issues.apache.org/jira/browse/BEAM-6674
> Project: Beam
>  Issue Type: Sub-task
>  Components: io-java-jdbc
>Reporter: Reuven Lax
>Assignee: Shehzaad Nakhoda
>Priority: Major
>  Time Spent: 50m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-6674) The JdbcIO source should produce schemas

2019-05-31 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6674?focusedWorklogId=251955=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-251955
 ]

ASF GitHub Bot logged work on BEAM-6674:


Author: ASF GitHub Bot
Created on: 31/May/19 18:28
Start Date: 31/May/19 18:28
Worklog Time Spent: 10m 
  Work Description: reuvenlax commented on pull request #8725: [BEAM-6674] 
Add schema support to JdbcIO read
URL: https://github.com/apache/beam/pull/8725#discussion_r289504158
 
 

 ##
 File path: 
sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
 ##
 @@ -391,6 +403,123 @@ DataSource buildDatasource() {
 void setParameters(PreparedStatement preparedStatement) throws Exception;
   }
 
+  /** Implementation of {@link #readRows()}. */
+  @AutoValue
+  @Experimental(Experimental.Kind.SCHEMAS)
+  public abstract static class ReadRows extends PTransform> {
 
 Review comment:
   can we share the part that's common with Read? I'm worried that that people 
will end up adding features to one codepath and not the other.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 251955)
Time Spent: 1h 10m  (was: 1h)

> The JdbcIO source should produce schemas
> 
>
> Key: BEAM-6674
> URL: https://issues.apache.org/jira/browse/BEAM-6674
> Project: Beam
>  Issue Type: Sub-task
>  Components: io-java-jdbc
>Reporter: Reuven Lax
>Assignee: Shehzaad Nakhoda
>Priority: Major
>  Time Spent: 1h 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-6674) The JdbcIO source should produce schemas

2019-05-31 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6674?focusedWorklogId=251950=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-251950
 ]

ASF GitHub Bot logged work on BEAM-6674:


Author: ASF GitHub Bot
Created on: 31/May/19 18:28
Start Date: 31/May/19 18:28
Worklog Time Spent: 10m 
  Work Description: reuvenlax commented on pull request #8725: [BEAM-6674] 
Add schema support to JdbcIO read
URL: https://github.com/apache/beam/pull/8725#discussion_r289441423
 
 

 ##
 File path: 
sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java
 ##
 @@ -272,6 +277,31 @@ public void testReadWithSingleStringParameter() {
 pipeline.run();
   }
 
+  @Test
+  public void testReadRows() {
+SerializableFunction dataSourceProvider = ignored -> 
dataSource;
+PCollection rows =
+pipeline.apply(
+JdbcIO.readRows()
+.withDataSourceProviderFn(dataSourceProvider)
+.withQuery(String.format("select name,id from %s where name = 
?", readTableName))
+.withStatementPreparator(
+preparedStatement ->
+preparedStatement.setString(1, 
TestRow.getNameForSeed(1;
 
 Review comment:
   test that rows.getSchema() returns the expected schema?
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 251950)
Time Spent: 0.5h  (was: 20m)

> The JdbcIO source should produce schemas
> 
>
> Key: BEAM-6674
> URL: https://issues.apache.org/jira/browse/BEAM-6674
> Project: Beam
>  Issue Type: Sub-task
>  Components: io-java-jdbc
>Reporter: Reuven Lax
>Assignee: Shehzaad Nakhoda
>Priority: Major
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-7443) BoundedSource->SDF needs a wrapper in Python SDK

2019-05-31 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7443?focusedWorklogId=252056=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-252056
 ]

ASF GitHub Bot logged work on BEAM-7443:


Author: ASF GitHub Bot
Created on: 31/May/19 21:04
Start Date: 31/May/19 21:04
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on pull request #8641: [BEAM-7443] 
Create a BoundedSource -> SDF wrapper in Python SDK
URL: https://github.com/apache/beam/pull/8641#discussion_r289549901
 
 

 ##
 File path: sdks/python/apache_beam/io/iobase.py
 ##
 @@ -1180,15 +1180,47 @@ def check_done(self):
 raise NotImplementedError
 
   def try_split(self, fraction_of_remainder):
+"""Splits current restriction based on fraction_of_remainder.
+
+Invoked when SDK receiving ProcessBundleSplitRequest during processing
+bundle.
+
+Args:
+  fraction_of_remainder: a fraction of (cur_pos, stop_pos).
+
+Returns: ``None`` when current restriction has been checkpointed, or
+split_point is out of current restriction range. Otherwise, return
+((start_pos, split_pos), (split_pos, stop_pos)).
+
+** Thread safety **
+Accessing to position and checkpoint status should be guarded by a single
+lock object.
+"""
 raise NotImplementedError
 
   def try_claim(self, position):
+""" Claims position as current_position.
 
 Review comment:
   Try using the javadoc from 
https://github.com/apache/beam/blob/c0de4570b186793ff477f4a5bd86cc27191e2999/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java#L28
 to write this version.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 252056)
Time Spent: 3.5h  (was: 3h 20m)

>  BoundedSource->SDF needs a wrapper in Python SDK
> -
>
> Key: BEAM-7443
> URL: https://issues.apache.org/jira/browse/BEAM-7443
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: Boyuan Zhang
>Assignee: Boyuan Zhang
>Priority: Major
>  Time Spent: 3.5h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-7443) BoundedSource->SDF needs a wrapper in Python SDK

2019-05-31 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7443?focusedWorklogId=252057=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-252057
 ]

ASF GitHub Bot logged work on BEAM-7443:


Author: ASF GitHub Bot
Created on: 31/May/19 21:04
Start Date: 31/May/19 21:04
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on pull request #8641: [BEAM-7443] 
Create a BoundedSource -> SDF wrapper in Python SDK
URL: https://github.com/apache/beam/pull/8641#discussion_r289549631
 
 

 ##
 File path: sdks/python/apache_beam/io/iobase.py
 ##
 @@ -1180,15 +1180,47 @@ def check_done(self):
 raise NotImplementedError
 
   def try_split(self, fraction_of_remainder):
+"""Splits current restriction based on fraction_of_remainder.
+
+Invoked when SDK receiving ProcessBundleSplitRequest during processing
+bundle.
+
+Args:
+  fraction_of_remainder: a fraction of (cur_pos, stop_pos).
 
 Review comment:
   fraction_of_remainder: A hint as to the fraction of work the primary 
restriction should represent based upon the current known remaining amount of 
work.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 252057)
Time Spent: 3h 40m  (was: 3.5h)

>  BoundedSource->SDF needs a wrapper in Python SDK
> -
>
> Key: BEAM-7443
> URL: https://issues.apache.org/jira/browse/BEAM-7443
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: Boyuan Zhang
>Assignee: Boyuan Zhang
>Priority: Major
>  Time Spent: 3h 40m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-7443) BoundedSource->SDF needs a wrapper in Python SDK

2019-05-31 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7443?focusedWorklogId=252054=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-252054
 ]

ASF GitHub Bot logged work on BEAM-7443:


Author: ASF GitHub Bot
Created on: 31/May/19 21:04
Start Date: 31/May/19 21:04
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on pull request #8641: [BEAM-7443] 
Create a BoundedSource -> SDF wrapper in Python SDK
URL: https://github.com/apache/beam/pull/8641#discussion_r289541805
 
 

 ##
 File path: sdks/python/apache_beam/io/iobase.py
 ##
 @@ -1180,15 +1180,47 @@ def check_done(self):
 raise NotImplementedError
 
   def try_split(self, fraction_of_remainder):
+"""Splits current restriction based on fraction_of_remainder.
+
+Invoked when SDK receiving ProcessBundleSplitRequest during processing
+bundle.
+
+Args:
+  fraction_of_remainder: a fraction of (cur_pos, stop_pos).
+
+Returns: ``None`` when current restriction has been checkpointed, or
+split_point is out of current restriction range. Otherwise, return
+((start_pos, split_pos), (split_pos, stop_pos)).
+
+** Thread safety **
+Accessing to position and checkpoint status should be guarded by a single
+lock object.
 
 Review comment:
   ```
   Returns: ``(primary_restriction, residual_restriction)`` if a split was 
possible, otherwise returns ``None``.
   ```
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 252054)
Time Spent: 3h 10m  (was: 3h)

>  BoundedSource->SDF needs a wrapper in Python SDK
> -
>
> Key: BEAM-7443
> URL: https://issues.apache.org/jira/browse/BEAM-7443
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: Boyuan Zhang
>Assignee: Boyuan Zhang
>Priority: Major
>  Time Spent: 3h 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-7443) BoundedSource->SDF needs a wrapper in Python SDK

2019-05-31 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7443?focusedWorklogId=252053=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-252053
 ]

ASF GitHub Bot logged work on BEAM-7443:


Author: ASF GitHub Bot
Created on: 31/May/19 21:04
Start Date: 31/May/19 21:04
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on pull request #8641: [BEAM-7443] 
Create a BoundedSource -> SDF wrapper in Python SDK
URL: https://github.com/apache/beam/pull/8641#discussion_r289534766
 
 

 ##
 File path: sdks/python/apache_beam/io/iobase.py
 ##
 @@ -1180,15 +1180,47 @@ def check_done(self):
 raise NotImplementedError
 
   def try_split(self, fraction_of_remainder):
+"""Splits current restriction based on fraction_of_remainder.
+
+Invoked when SDK receiving ProcessBundleSplitRequest during processing
+bundle.
+
+Args:
+  fraction_of_remainder: a fraction of (cur_pos, stop_pos).
+
+Returns: ``None`` when current restriction has been checkpointed, or
+split_point is out of current restriction range. Otherwise, return
+((start_pos, split_pos), (split_pos, stop_pos)).
+
+** Thread safety **
 
 Review comment:
   use the same terminology for thread safety from the `check_done` method
   
   ```
   ** Thread safety **
   
   Methods of the class ``RestrictionTracker`` including this method may get
   invoked by different threads, hence must be made thread-safe, e.g. by 
using
   a single lock object.
   ```
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 252053)
Time Spent: 3h  (was: 2h 50m)

>  BoundedSource->SDF needs a wrapper in Python SDK
> -
>
> Key: BEAM-7443
> URL: https://issues.apache.org/jira/browse/BEAM-7443
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: Boyuan Zhang
>Assignee: Boyuan Zhang
>Priority: Major
>  Time Spent: 3h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-7443) BoundedSource->SDF needs a wrapper in Python SDK

2019-05-31 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7443?focusedWorklogId=252055=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-252055
 ]

ASF GitHub Bot logged work on BEAM-7443:


Author: ASF GitHub Bot
Created on: 31/May/19 21:04
Start Date: 31/May/19 21:04
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on pull request #8641: [BEAM-7443] 
Create a BoundedSource -> SDF wrapper in Python SDK
URL: https://github.com/apache/beam/pull/8641#discussion_r289549079
 
 

 ##
 File path: sdks/python/apache_beam/io/iobase.py
 ##
 @@ -1180,15 +1180,47 @@ def check_done(self):
 raise NotImplementedError
 
   def try_split(self, fraction_of_remainder):
+"""Splits current restriction based on fraction_of_remainder.
+
+Invoked when SDK receiving ProcessBundleSplitRequest during processing
 
 Review comment:
   ```
   If splitting the current restriction is possible, the current restriction is 
split into a primary and residual restriction pair. This invocation updates the 
``current_restriction()`` to be the primary restriction effectively having the 
current ``DoFn.process()`` execution responsible for performing the work that 
the primary restriction represents. The residual restriction will be executed 
in a separate ``DoFn.process()`` invocation (likely in a different process). 
The work performed by executing the primary and residual restrictions as 
separate ``DoFn.process()`` invocations MUST be equivalent to the work 
performed as if this split never occurred.
   
   The ``fraction_of_remainder`` should be used in a best effort manner to 
choose a primary and residual restriction based upon the fraction of the 
remaining work that the current ``DoFn.process()`` invocation is responsible 
for. For example, if a ``DoFn.process()`` was reading a file with a restriction 
representing the offset range [100, 200) and has processed up to offset 130 
with a fraction_of_remainder of 0.7, the primary and residual restrictions 
returned would be [100, 179), [179, 200) (note: current_offset + 
fraction_of_remainder * remaining_work = 130 + 0.7 * 70 = 179).
   
   It is very important for pipeline scaling and end to end pipeline execution 
that try_split is implemented well.
   ```
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 252055)
Time Spent: 3h 20m  (was: 3h 10m)

>  BoundedSource->SDF needs a wrapper in Python SDK
> -
>
> Key: BEAM-7443
> URL: https://issues.apache.org/jira/browse/BEAM-7443
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: Boyuan Zhang
>Assignee: Boyuan Zhang
>Priority: Major
>  Time Spent: 3h 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-7450) Unbounded HCatalogIO Reader using splittable pardos

2019-05-31 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7450?focusedWorklogId=252068=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-252068
 ]

ASF GitHub Bot logged work on BEAM-7450:


Author: ASF GitHub Bot
Created on: 31/May/19 21:29
Start Date: 31/May/19 21:29
Worklog Time Spent: 10m 
  Work Description: apilloud commented on issue #8718: [BEAM-7450] - Add an 
unbounded HcatalogIO reader using splittable pardo
URL: https://github.com/apache/beam/pull/8718#issuecomment-497866247
 
 
   Test failure looks real.
   ```
   
   java.lang.IllegalArgumentException: 
org.apache.beam.sdk.io.hcatalog.PartitionPoller, @ProcessElement 
processElement(ProcessContext, PartitionRangeTracker): Has tracker type 
PartitionRangeTracker, but the DoFn's tracker type must be of type 
RestrictionTracker.
at 
org.apache.beam.sdk.transforms.reflect.DoFnSignatures$ErrorReporter.throwIllegalArgument(DoFnSignatures.java:1495)
at 
org.apache.beam.sdk.transforms.reflect.DoFnSignatures$ErrorReporter.checkArgument(DoFnSignatures.java:1500)
at 
org.apache.beam.sdk.transforms.reflect.DoFnSignatures.verifySplittableMethods(DoFnSignatures.java:595)
at 
org.apache.beam.sdk.transforms.reflect.DoFnSignatures.parseSignature(DoFnSignatures.java:474)
at 
org.apache.beam.sdk.transforms.reflect.DoFnSignatures.lambda$getSignature$0(DoFnSignatures.java:140)
at java.util.HashMap.computeIfAbsent(HashMap.java:1127)
at 
org.apache.beam.sdk.transforms.reflect.DoFnSignatures.getSignature(DoFnSignatures.java:140)
at org.apache.beam.sdk.transforms.ParDo.validate(ParDo.java:547)
at org.apache.beam.sdk.transforms.ParDo.of(ParDo.java:397)
at 
org.apache.beam.sdk.io.hcatalog.HCatalogUnboundedReader.expand(HCatalogUnboundedReader.java:110)
at 
org.apache.beam.sdk.io.hcatalog.HCatalogUnboundedReader.expand(HCatalogUnboundedReader.java:77)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)
at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:471)
at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:357)
at 
org.apache.beam.sdk.io.hcatalog.HCatalogIOTest.testWriteThenUnboundedReadSuccess(HCatalogIOTest.java:220)
   ```
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 252068)
Time Spent: 0.5h  (was: 20m)

> Unbounded HCatalogIO Reader using splittable pardos
> ---
>
> Key: BEAM-7450
> URL: https://issues.apache.org/jira/browse/BEAM-7450
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-java-core
>Reporter: Ankit Jhalaria
>Assignee: Ankit Jhalaria
>Priority: Minor
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> # Current version of HcatalogIO is a bounded source.
>  # While migrating our jobs to aws, we realized that it would be helpful to 
> have an unbounded hcat reader that can behave as an unbounded source and 
> polls for new partitions as and when they become available.
>  # I have used splittable pardo(s) to do this. There is a flag that can be 
> set to treat this as a bounded source which will terminate if that flag is 
> set.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-7397) Avoid String.format in state namespace construction

2019-05-31 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7397?focusedWorklogId=252011=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-252011
 ]

ASF GitHub Bot logged work on BEAM-7397:


Author: ASF GitHub Bot
Created on: 31/May/19 19:39
Start Date: 31/May/19 19:39
Worklog Time Spent: 10m 
  Work Description: steveniemitz commented on issue #8662: [BEAM-7397] 
Avoid String.format in state namespace key construction
URL: https://github.com/apache/beam/pull/8662#issuecomment-497836220
 
 
   Thanks!
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 252011)
Time Spent: 50m  (was: 40m)

> Avoid String.format in state namespace construction
> ---
>
> Key: BEAM-7397
> URL: https://issues.apache.org/jira/browse/BEAM-7397
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-core
>Reporter: Steve Niemitz
>Assignee: Steve Niemitz
>Priority: Minor
>  Time Spent: 50m
>  Remaining Estimate: 0h
>
> I've noticed that for jobs that access state frequently, a percent or two of 
> overall CPU time is actually spent calculating the state namespace name.
> StateNamespaces.java uses a String.format for the window namespaces, but 
> could simply use a string concatenation to do the same thing with much less 
> overhead. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (BEAM-7469) [beam_PostCommit_Java_Nexmark_Dataflow] [:sdks:java:testing:nexmark:run] Job was unexpectedly updated

2019-05-31 Thread Andrew Pilloud (JIRA)
Andrew Pilloud created BEAM-7469:


 Summary: [beam_PostCommit_Java_Nexmark_Dataflow] 
[:sdks:java:testing:nexmark:run] Job was unexpectedly updated
 Key: BEAM-7469
 URL: https://issues.apache.org/jira/browse/BEAM-7469
 Project: Beam
  Issue Type: Bug
  Components: examples-nexmark, test-failures
Reporter: Andrew Pilloud


_Use this form to file an issue for test failure:_
 * [Jenkins 
Job|[https://builds.apache.org/job/beam_PostCommit_Java_Nexmark_Dataflow/2880/]]
 * [Gradle Build Scan|[https://scans.gradle.com/s/zuxqe2j2cjaf6]]

Initial investigation:

I've seen this a few times over the last few months. This is the pure java 
nexmark.
{code:java}
23:03:32 2019-05-31T06:03:32.895Z FAILED Query4
...
0004 -1.0 -1.0 -1
*** Job was unexpectedly updated ***{code}
 

Looks like we need more logging around what is going on here, but this happened 
right about the same time the first worker VM started. 

_After you've filled out the above details, please [assign the issue to an 
individual|https://beam.apache.org/contribute/postcommits-guides/index.html#find_specialist].
 Assignee should [treat test failures as 
high-priority|https://beam.apache.org/contribute/postcommits-policies/#assigned-failing-test],
 helping to fix the issue or find a more appropriate owner. See [Apache Beam 
Post-Commit Policies|https://beam.apache.org/contribute/postcommits-policies]._



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-7342) Extend SyntheticPipeline map steps to be able to be splittable (Beam Python SDK)

2019-05-31 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7342?focusedWorklogId=251995=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-251995
 ]

ASF GitHub Bot logged work on BEAM-7342:


Author: ASF GitHub Bot
Created on: 31/May/19 18:54
Start Date: 31/May/19 18:54
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on pull request #8607: [BEAM-7342] 
Extend SyntheticPipeline map steps to be splittable.
URL: https://github.com/apache/beam/pull/8607#discussion_r289510457
 
 

 ##
 File path: sdks/python/apache_beam/testing/synthetic_pipeline.py
 ##
 @@ -76,8 +78,34 @@ def rotate_key(element):
   return key[-1:] + key[:-1], value
 
 
+def initial_splitting_zipf(start_position, stop_position,
+   desired_num_bundles, distribution_parameter,
+   num_total_records=None):
+  """Split the given range (defined by start_position, stop_position) into
+ desired_num_bundles using zipf with the given distribution_parameter.
+  """
+  if not num_total_records:
+num_total_records = stop_position - start_position
+  samples = np.random.zipf(distribution_parameter, desired_num_bundles)
+  total = sum(samples)
+  relative_bundle_sizes = [(float(sample) / total) for sample in samples]
+  bundle_ranges = []
+  start = start_position
+  index = 0
+  while start < stop_position:
+if index == desired_num_bundles - 1:
+  bundle_ranges.append((start, stop_position))
+  break
+stop = start + int(num_total_records * relative_bundle_sizes[index])
+bundle_ranges.append((start, stop))
+start = stop
+index += 1
+  return bundle_ranges
+
+
 class SyntheticStep(beam.DoFn):
   """A DoFn of which behavior can be controlled through prespecified 
parameters.
+
 
 Review comment:
   ```suggestion
   ```
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 251995)
Time Spent: 6h 10m  (was: 6h)

> Extend SyntheticPipeline map steps to be able to be splittable (Beam Python 
> SDK)
> 
>
> Key: BEAM-7342
> URL: https://issues.apache.org/jira/browse/BEAM-7342
> Project: Beam
>  Issue Type: New Feature
>  Components: testing
> Environment: Beam Python
>Reporter: Lara Schmidt
>Assignee: Lara Schmidt
>Priority: Minor
>   Original Estimate: 1m
>  Time Spent: 6h 10m
>  Remaining Estimate: 0h
>
> Add the ability for map steps to be configured to be splittable. 
> Possible configuration options:
>  - uneven bundle sizes
>  - possible incorrect sizing returned



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-7397) Avoid String.format in state namespace construction

2019-05-31 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7397?focusedWorklogId=252021=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-252021
 ]

ASF GitHub Bot logged work on BEAM-7397:


Author: ASF GitHub Bot
Created on: 31/May/19 20:05
Start Date: 31/May/19 20:05
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on pull request #8662: [BEAM-7397] 
Avoid String.format in state namespace key construction
URL: https://github.com/apache/beam/pull/8662
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 252021)
Time Spent: 1h  (was: 50m)

> Avoid String.format in state namespace construction
> ---
>
> Key: BEAM-7397
> URL: https://issues.apache.org/jira/browse/BEAM-7397
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-core
>Reporter: Steve Niemitz
>Assignee: Steve Niemitz
>Priority: Minor
> Fix For: 2.14.0
>
>  Time Spent: 1h
>  Remaining Estimate: 0h
>
> I've noticed that for jobs that access state frequently, a percent or two of 
> overall CPU time is actually spent calculating the state namespace name.
> StateNamespaces.java uses a String.format for the window namespaces, but 
> could simply use a string concatenation to do the same thing with much less 
> overhead. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (BEAM-7397) Avoid String.format in state namespace construction

2019-05-31 Thread Luke Cwik (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7397?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Cwik closed BEAM-7397.
---
   Resolution: Fixed
Fix Version/s: 2.14.0

> Avoid String.format in state namespace construction
> ---
>
> Key: BEAM-7397
> URL: https://issues.apache.org/jira/browse/BEAM-7397
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-core
>Reporter: Steve Niemitz
>Assignee: Steve Niemitz
>Priority: Minor
> Fix For: 2.14.0
>
>  Time Spent: 1h
>  Remaining Estimate: 0h
>
> I've noticed that for jobs that access state frequently, a percent or two of 
> overall CPU time is actually spent calculating the state namespace name.
> StateNamespaces.java uses a String.format for the window namespaces, but 
> could simply use a string concatenation to do the same thing with much less 
> overhead. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


  1   2   >