[jira] [Resolved] (SAMZA-1693) Samza-sql: Adding Serde for rel record

2018-04-30 Thread Aditya (JIRA)

 [ 
https://issues.apache.org/jira/browse/SAMZA-1693?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aditya resolved SAMZA-1693.
---
Resolution: Fixed

> Samza-sql: Adding Serde for rel record
> --
>
> Key: SAMZA-1693
> URL: https://issues.apache.org/jira/browse/SAMZA-1693
> Project: Samza
>  Issue Type: Bug
>Reporter: Aditya
>Assignee: Aditya
>Priority: Major
>
> Adding Serde for rel record, as calcite expects the keys to be in string 
> format. Rel converters are always expected to provide keys as strings. If key 
> is an avro record, it is expected that the rel converter changes the avro 
> record to rel record and serializes it and deserializes it when conerting rel 
> message to samza message.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (SAMZA-1243) Flaky test: Multiple tests in TestLocalStoreMonitor

2018-04-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/SAMZA-1243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16459323#comment-16459323
 ] 

ASF GitHub Bot commented on SAMZA-1243:
---

GitHub user ahmedahamid opened a pull request:

https://github.com/apache/samza/pull/497

SAMZA-1243: Fix flaky tests in TestLocalStoreMonitor

Having attempted to reproduce this issue without success, careful analysis 
of the code indicates the underlying issue is most likely incomplete cleanup of 
file system directories after test execution possibly due to incomplete or 
manual test runs. To address this issue, this fix generates a different local 
store directory for every test to isolate file system side-effects across 
different tests and test runs.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ahmedahamid/samza dev/fix-1243

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/samza/pull/497.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #497


commit 5ad2f47545b6c33fb389ab0d3231ea041d1ca7f5
Author: Ahmed Abdul Hamid 
Date:   2018-04-30T22:15:18Z

SAMZA-1243: Fix flaky tests in TestLocalStoreMonitor

Having attempted to reproduce this issue without success, careful analysis 
of the code indicates the underlying issue is most likely incomplete cleanup of 
file system directories after test execution possibly due to incomplete manual 
test runs. To address this issue, this fix generates a different local store 
directory for every test to isolate file system side-effects across different 
tests and test runs.




> Flaky test: Multiple tests in TestLocalStoreMonitor
> ---
>
> Key: SAMZA-1243
> URL: https://issues.apache.org/jira/browse/SAMZA-1243
> Project: Samza
>  Issue Type: Bug
>Reporter: Prateek Maheshwari
>Assignee: Shanthoosh Venkataraman
>Priority: Major
> Fix For: 0.15.0
>
>
> Failed locally on my laptop (OS X).
> At commit 35a5cd9ad210a3246eff530b5d74145225cbe5d2.
> {code}
> :samza-rest:test
> shouldDeleteTaskStoreWhenTaskPreferredStoreIsNotLocalHost FAILED
> junit.framework.AssertionFailedError: expected:<1> but was:<2>
> at junit.framework.Assert.fail(Assert.java:57)
> at junit.framework.Assert.failNotEquals(Assert.java:329)
> at junit.framework.Assert.assertEquals(Assert.java:78)
> at junit.framework.Assert.assertEquals(Assert.java:159)
> at junit.framework.Assert.assertEquals(Assert.java:166)
> at junit.framework.TestCase.assertEquals(TestCase.java:324)
> at 
> org.apache.samza.monitor.TestLocalStoreMonitor.shouldDeleteTaskStoreWhenTaskPreferredStoreIsNotLocalHost(TestLocalStoreMonitor.java:159)
> shouldDeleteLocalTaskStoreWhenItHasNoOffsetFile FAILED
> junit.framework.AssertionFailedError: expected:<1> but was:<2>
> at junit.framework.Assert.fail(Assert.java:57)
> at junit.framework.Assert.failNotEquals(Assert.java:329)
> at junit.framework.Assert.assertEquals(Assert.java:78)
> at junit.framework.Assert.assertEquals(Assert.java:159)
> at junit.framework.Assert.assertEquals(Assert.java:166)
> at junit.framework.TestCase.assertEquals(TestCase.java:324)
> at 
> org.apache.samza.monitor.TestLocalStoreMonitor.shouldDeleteLocalTaskStoreWhenItHasNoOffsetFile(TestLocalStoreMonitor.java:105)
> 30 tests completed, 2 failed, 2 skipped
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (SAMZA-1693) Samza-sql: Adding Serde for rel record

2018-04-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/SAMZA-1693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16459289#comment-16459289
 ] 

ASF GitHub Bot commented on SAMZA-1693:
---

Github user asfgit closed the pull request at:

https://github.com/apache/samza/pull/495


> Samza-sql: Adding Serde for rel record
> --
>
> Key: SAMZA-1693
> URL: https://issues.apache.org/jira/browse/SAMZA-1693
> Project: Samza
>  Issue Type: Bug
>Reporter: Aditya
>Assignee: Aditya
>Priority: Major
>
> Adding Serde for rel record, as calcite expects the keys to be in string 
> format. Rel converters are always expected to provide keys as strings. If key 
> is an avro record, it is expected that the rel converter changes the avro 
> record to rel record and serializes it and deserializes it when conerting rel 
> message to samza message.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


samza git commit: SAMZA-1693: Samza-sql - Adding Serde for rel record and few other minor fixes for Avro and Rel conversion.

2018-04-30 Thread jagadish
Repository: samza
Updated Branches:
  refs/heads/master 603af35a5 -> 445d1e26c


SAMZA-1693: Samza-sql - Adding Serde for rel record and few other minor fixes 
for Avro and Rel conversion.

Adding Serde for rel record, as calcite expects the keys to be in string 
format. Rel converters are always expected to provide keys as strings. If key 
is an avro record, it is expected that the rel converter changes the avro 
record to rel record and serializes it and deserializes it when conerting rel 
message to samza message.

Author: Aditya Toomula 

Reviewers: Srini P

Closes #495 from atoomula/rel1


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/445d1e26
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/445d1e26
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/445d1e26

Branch: refs/heads/master
Commit: 445d1e26c9a222061b43b9cb5a637358039270b9
Parents: 603af35
Author: Aditya Toomula 
Authored: Mon Apr 30 16:51:37 2018 -0700
Committer: Jagadish 
Committed: Mon Apr 30 16:51:37 2018 -0700

--
 .../apache/samza/sql/avro/AvroRelConverter.java | 65 +--
 .../SamzaSqlRelRecordSerdeFactory.java  | 67 +++
 .../samza/sql/TestSamzaSqlRelMessageSerde.java  |  3 +-
 .../samza/sql/TestSamzaSqlRelRecordSerde.java   | 87 
 4 files changed, 175 insertions(+), 47 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/samza/blob/445d1e26/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java
--
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java 
b/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java
index f121983..c9c30cc 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java
@@ -102,14 +102,7 @@ public class AvroRelConverter implements SamzaRelConverter 
{
   throw new SamzaException(msg);
 }
 
-Object key = samzaMessage.getKey();
-if (key != null && key instanceof IndexedRecord) {
-  IndexedRecord keyRecord = (IndexedRecord) key;
-  Schema keySchema = keyRecord.getSchema();
-  key = convertToJavaObject(samzaMessage.getKey(), keySchema);
-}
-
-return new SamzaSqlRelMessage(key, fieldNames, fieldValues);
+return new SamzaSqlRelMessage(samzaMessage.getKey(), fieldNames, 
fieldValues);
   }
 
   private SamzaSqlRelRecord convertToRelRecord(IndexedRecord avroRecord) {
@@ -169,17 +162,11 @@ public class AvroRelConverter implements 
SamzaRelConverter {
   case RECORD:
 return convertToGenericRecord((SamzaSqlRelRecord) relObj, 
getNonNullUnionSchema(schema));
   case ARRAY:
-if (((List) relObj).size() == 0) {
-  return null;
-}
 List avroList = ((List) relObj).stream()
 .map(o -> convertToAvroObject(o, 
getNonNullUnionSchema(schema).getElementType()))
 .collect(Collectors.toList());
 return avroList;
   case MAP:
-if (((Map) relObj).size() == 0) {
-  return null;
-}
 return ((Map) relObj).entrySet()
 .stream()
 .collect(Collectors.toMap(Map.Entry::getKey, e -> 
convertToAvroObject(e.getValue(),
@@ -198,53 +185,41 @@ public class AvroRelConverter implements 
SamzaRelConverter {
   // Not doing any validations of data types with Avro schema considering the 
resource cost per message.
   // Casting would fail if the data types are not in sync with the schema.
   public Object convertToJavaObject(Object avroObj, Schema schema) {
+if (avroObj == null) {
+  return null;
+}
 switch(schema.getType()) {
   case RECORD:
-if (avroObj == null) {
-  return null;
-}
 return convertToRelRecord((IndexedRecord) avroObj);
   case ARRAY: {
 ArrayList retVal = new ArrayList<>();
-if (avroObj != null) {
-  List avroArray;
-  if (avroObj instanceof GenericData.Array) {
-avroArray = (GenericData.Array) avroObj;
-  } else if (avroObj instanceof List) {
-avroArray = (List) avroObj;
-  } else {
-throw new SamzaException("Unsupported array type " + 
avroObj.getClass().getSimpleName());
-  }
-
-  if (avroArray != null) {
-retVal.addAll(
-avroArray.stream()
-.map(v -> convertToJavaObject(v, 
getNonNullUnionSchema(schema).getElementType()))
-.collect(Collectors.toList()));
-  }
+List avroArray;
+if (avroObj instanceof 

[jira] [Commented] (SAMZA-1371) Some Samza Containers get stuck at "Starting BrokerProxy for hostname:portnum" while others seem to be fine

2018-04-30 Thread Zihao Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/SAMZA-1371?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16459203#comment-16459203
 ] 

Zihao Zhang commented on SAMZA-1371:


Hi [~nickpan47]

I'm a colleague with Hao Song and I'm working on this issue now. According to 
our investigation, the reason why some containers get stuck is becuase, in 
BrokerProxy thread, SimpleConsumer class (package kafka.consumer) used in 
DefaultFetchSimpleConsumer(package org.apache.samza.system.kafka) could not 
receive any message for a given partition a specific offset. The stuck offset 
is between the first valid offset and the latest valid offset for this given 
partition. So the consumer will keep sending request and trying to receive 
message in an infinite loop. And The stuck offset would not change for a given 
partition by multiple restarts.

My remote debugging indicated that the inputStream(invoked by 
NetworkReceive#readFromReadableChannel()) to feed this consumer only contains 
some meta data with the topic/partition/offset info instead of a valid topic 
message for this given offset.

I've verifyed that the message in the specific offset is still valid. To do 
this, I wrote a small tool to fetch message directly from the brokers. When i 
tried to fetch message for the stuck offset, it led to the same result - 
timeout before receiving any message. Then I bumped up the version of 
kafka.clients to 0.11.0.2 and tried again. This time, the consumer will skipped 
the stuck offset and return the following x messages as asked.

Due to some build issue, i've not get a chance to try the newest version by 
pulling from master. But according to my investigation, there's a good chance 
that the higher version of kafka client would help to solved this stuck 
containers issue. I'm wondering: 
1. Do you have any other suggestion / ideas on this?
2. Do you have a timeline to realse the new samza version with a kafka-clients 
dependency of 0.11 ?

Thank you.

> Some Samza Containers get stuck at "Starting BrokerProxy for 
> hostname:portnum" while others seem to be fine
> ---
>
> Key: SAMZA-1371
> URL: https://issues.apache.org/jira/browse/SAMZA-1371
> Project: Samza
>  Issue Type: Bug
>  Components: container
>Affects Versions: 0.11.0, 0.12.0
> Environment: Samza version: 0.11, 0.12
> Kafka version: 0.11.0.0
>Reporter: Ak Ka
>Priority: Blocker
> Attachments: stdout.log, thread_dump.txt
>
>
> We have multiple Samza apps using local store that have this issue. Some 
> containers get stuck on "Starting BrokerProxy for hostname:portnum" while 
> others seem to work as expected.  
> Here is the log:
> stuck:
> ```
> [...]
> 2017-07-25 17:11:26.546 [main] org.apache.samza.system.kafka.BrokerProxy 
> [INFO] Creating new SimpleConsumer for host hostname:portnum for system kafka
> 2017-07-25 17:11:26.547 [main] org.apache.samza.system.kafka.GetOffset [INFO] 
> Validating offset 0 for topic and partition 
> [prod.localStateChangeLog.prod.AlertsOrganizerInstant_matcherValidation,2]
> 2017-07-25 17:11:26.648 [main] org.apache.samza.system.kafka.GetOffset [INFO] 
> Able to successfully read from offset 0 for topic and partition 
> [prod.localStateChangeLog.prod.AlertsOrganizerInstant_matcherValidation,2]. 
> Using it to instantiate consumer.
> 2017-07-25 17:11:26.649 [main] org.apache.samza.system.kafka.BrokerProxy 
> [INFO] Starting BrokerProxy for hostname:portnum
> // it's dead, Jim
> ```
> healthy:
> ```
> [...]
> 2017-07-25 17:11:26.920 [main] org.apache.samza.system.kafka.BrokerProxy 
> [INFO] Creating new SimpleConsumer for host hostname:portnum for system kafka
> 2017-07-25 17:11:26.921 [main] org.apache.samza.system.kafka.GetOffset [INFO] 
> Validating offset 0 for topic and partition 
> [prod.localStateChangeLog.prod.AlertsOrganizerInstant_matcherValidation,1]
> 2017-07-25 17:11:27.023 [main] org.apache.samza.system.kafka.GetOffset [INFO] 
> Able to successfully read from offset 0 for topic and partition 
> [prod.localStateChangeLog.prod.AlertsOrganizerInstant_matcherValidation,1]. 
> Using it to instantiate consumer.
> 2017-07-25 17:11:27.023 [main] org.apache.samza.system.kafka.BrokerProxy 
> [INFO] Starting BrokerProxy for hostname:portnum
> 2017-07-25 17:11:29.194 [main] org.apache.samza.system.kafka.BrokerProxy 
> [INFO] Shutting down BrokerProxy for hostname:portnum
> 2017-07-25 17:11:29.194 [main] org.apache.samza.system.kafka.BrokerProxy 
> [INFO] closing simple consumer...
> 2017-07-25 17:11:29.239 [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at 
> hostname:portnum for client samza_consumer-prod_AlertsOrganizerInstant-1] 
> org.apache.samza.system.kafka.DefaultFetchSimpleConsumer [INFO] Reconnect due 
> to socket error: 

[jira] [Commented] (SAMZA-1563) Make rocksDB local store directory configurable.

2018-04-30 Thread Xinyu Liu (JIRA)

[ 
https://issues.apache.org/jira/browse/SAMZA-1563?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16458910#comment-16458910
 ] 

Xinyu Liu commented on SAMZA-1563:
--

pr: https://github.com/apache/samza/pull/491

> Make rocksDB local store directory configurable.
> 
>
> Key: SAMZA-1563
> URL: https://issues.apache.org/jira/browse/SAMZA-1563
> Project: Samza
>  Issue Type: Improvement
>Reporter: Shanthoosh Venkataraman
>Assignee: Bharath Kumarasubramanian
>Priority: Major
> Fix For: 0.14.1
>
>
> With existing setup in yarn, local store directory is set through a System 
> defined environment variable.
> However, for scenarios such as standalone, beam-runner it's essential to 
> expose a configuration for this and make this  configurable.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (SAMZA-1692) Standalone stability fixes.

2018-04-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/SAMZA-1692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16458883#comment-16458883
 ] 

ASF GitHub Bot commented on SAMZA-1692:
---

GitHub user shanthoosh opened a pull request:

https://github.com/apache/samza/pull/496

SAMZA-1692: Standalone stability fixes.

- Currently, on session expiration processorListener with incorrect 
generationId is registered with zookeeper(ZkUtils generationId is incremented 
on reconnect but the generationId in processorListener is zero all the time). 
When a session reconnect happens to a processor successive to leader, leader 
expiration event will be skipped. This will prevent leader re-election on a 
current leader death and will stall the processors group. Fix is to 
reinstantiate and then register processorChangeListener on session expiration.
- Add processorId to debounce thread name (this can aid debugging when 
multiple processors are running within a jvm).
- After ScheduleAfterDebounceTime queue is shutdown, don't accept new 
schedule requests. Current ZkJobCoordinator shutdown sequence comprise of the 
following steps
 - Shutdown the ScheduleAfterDebounceTime queue.
 - Stop the zkClient  and relinquish it's resources.

After we shutdown ScheduleAfterDebounceTime and before zkclient is stopped, 
any new operations can be scheduled in ScheduleAfterDebounceTime queue. This 
will result in RejectedExecutionException, since executorService is stopped.

```
Caused by: java.util.concurrent.RejectedExecutionException: Task 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@23f962a8 
rejected from java.util.concurrent.ScheduledThreadPoolExecutor@43408be8
```

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/shanthoosh/samza master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/samza/pull/496.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #496


commit 5f9e5bdd4adc9ba14ef06b3de536787cb36cd0cc
Author: Shanthoosh Venkataraman 
Date:   2018-04-30T04:28:14Z

SAMZA-1692: Standalone stability fixes.

- Currently, on session expiration processorListener with incorrect 
generationId is registered with zookeeper(ZkUtils generationId is incremented 
on reconnect but the generationId in processorListener is zero all the time). 
When a session reconnect happens to a processor successive to leader, leader 
expiration event will be skipped. This will prevent leader re-election on a 
current leader death and will stall the processors group. Fix is to 
reinstantiate and then register processorChangeListener on session expiration.
- Add processorId to debounce thread name (this can aid debugging when 
multiple processors are running within a jvm).
- After ScheduleAfterDebounceTime queue is shutdown, don't accept new 
schedule requests. Current ZkJobCoordinator shutdown sequence comprise of the 
following steps
 - Shutdown the ScheduleAfterDebounceTime queue.
 - Stop the zkClient  and relinquish it's resources.

After we shutdown ScheduleAfterDebounceTime and before zkclient is stopped, 
any new operations can be scheduled in ScheduleAfterDebounceTime queue. This 
will result in RejectedExecutionException, since executorService is stopped.

```
Caused by: java.util.concurrent.RejectedExecutionException: Task 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@23f962a8 
rejected from java.util.concurrent.ScheduledThreadPoolExecutor@43408be8
```




> Standalone stability fixes.
> ---
>
> Key: SAMZA-1692
> URL: https://issues.apache.org/jira/browse/SAMZA-1692
> Project: Samza
>  Issue Type: Bug
>Reporter: Shanthoosh Venkataraman
>Assignee: Shanthoosh Venkataraman
>Priority: Major
>
> * Currently on session expiration, processorListener with incorrect 
> generationId is registered with zookeeper(ZkUtils generationId is incremented 
> on reconnect but the generationId in processorListener is zero all the time). 
> When this happens to immediate successor to leader, leader expiration event 
> will be skipped by that processor. This will prevent leader re-election on a 
> current leader death and will stall the processors group. Fix is to 
> re-instantiate and then register processorChangeListener on session 
> expiration.
> * Add processorId to debounce thread name (this can aid debugging when 
> multiple processors are running within a jvm).
> * After ScheduleAfterDebounceTime queue is shutdown, don't accept new 
> schedule requests. Current ZkJobCoordinator shutdown sequence comprise of the 

[jira] [Resolved] (SAMZA-1645) A few issues found by BEAM stress test

2018-04-30 Thread Xinyu Liu (JIRA)

 [ 
https://issues.apache.org/jira/browse/SAMZA-1645?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xinyu Liu resolved SAMZA-1645.
--
Resolution: Fixed

> A few issues found by BEAM stress test
> --
>
> Key: SAMZA-1645
> URL: https://issues.apache.org/jira/browse/SAMZA-1645
> Project: Samza
>  Issue Type: Bug
>Reporter: Xinyu Liu
>Assignee: Xinyu Liu
>Priority: Major
> Fix For: 0.14.1
>
>
> # Setting high priority on intermediate streams caused starvation on certain 
> tasks. Due to the intermediate message kept coming, the input streams are not 
> consumed and watermark messages are not generated, which makes downstream 
> accumulates large state and makes processing slower thus make starvation 
> worse.
>  # Watermark broadcast needs to check whether the watermark is advanced.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (SAMZA-1645) A few issues found by BEAM stress test

2018-04-30 Thread Xinyu Liu (JIRA)

 [ 
https://issues.apache.org/jira/browse/SAMZA-1645?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xinyu Liu updated SAMZA-1645:
-
Fix Version/s: 0.14.1

> A few issues found by BEAM stress test
> --
>
> Key: SAMZA-1645
> URL: https://issues.apache.org/jira/browse/SAMZA-1645
> Project: Samza
>  Issue Type: Bug
>Reporter: Xinyu Liu
>Assignee: Xinyu Liu
>Priority: Major
> Fix For: 0.14.1
>
>
> # Setting high priority on intermediate streams caused starvation on certain 
> tasks. Due to the intermediate message kept coming, the input streams are not 
> consumed and watermark messages are not generated, which makes downstream 
> accumulates large state and makes processing slower thus make starvation 
> worse.
>  # Watermark broadcast needs to check whether the watermark is advanced.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (SAMZA-1647) Fix NPE in JobModelExpired event handler.

2018-04-30 Thread Xinyu Liu (JIRA)

 [ 
https://issues.apache.org/jira/browse/SAMZA-1647?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xinyu Liu resolved SAMZA-1647.
--
Resolution: Fixed

> Fix NPE in JobModelExpired event handler.
> -
>
> Key: SAMZA-1647
> URL: https://issues.apache.org/jira/browse/SAMZA-1647
> Project: Samza
>  Issue Type: Bug
>Reporter: Shanthoosh Venkataraman
>Assignee: Shanthoosh Venkataraman
>Priority: Major
> Fix For: 0.14.1
>
>
> *Problem:*
> There's a minor race condition when container shutdown is triggered from 
> onJobModelExpired event handler. This race condition leads to an NPE and 
> eventually kills the StreamProcessor.
> *Reason:*
> Following is the JobModelExpired  handler logic executed from the 
> DebounceThread:
>  1. Invoke container shutdown. 
>  2. Wait for configured task.shutdown.ms through a Latch(L).
>  3. Log the containerId after the configured wait time.
> Following is the onContainerFailed handler logic executed from the 
> ContainerThread:
>  1. countdown the Latch(L).
>  2. Set the container field to null.
> If the Step 2. of containerFailed handler is executed before the step 3 of 
> JobModelExpired handler, container value will be set null before we log it in 
> JobModelExpired handler. This results in an NPE.
> *Relevant stacktrace:* 
> {code:java}
> [p-00-container-thread-0] ERROR 
> org.apache.samza.processor.StreamProcessor - Container failed. Stopping the 
> processor.
> org.apache.samza.system.SystemProducerException: Flush failed. One or more 
> batches of messages were not sent!
> at 
> org.apache.samza.system.kafka.KafkaSystemProducer$$anonfun$flush$1.apply$mcV$sp(KafkaSystemProducer.scala:159)
> at 
> org.apache.samza.system.kafka.KafkaSystemProducer$$anonfun$flush$1.apply(KafkaSystemProducer.scala:130)
> at 
> org.apache.samza.system.kafka.KafkaSystemProducer$$anonfun$flush$1.apply(KafkaSystemProducer.scala:130)
> at org.apache.samza.util.TimerUtils$class.updateTimer(TimerUtils.scala:37)
> at 
> org.apache.samza.system.kafka.KafkaSystemProducer.updateTimer(KafkaSystemProducer.scala:39)
> at 
> org.apache.samza.system.kafka.KafkaSystemProducer.flush(KafkaSystemProducer.scala:130)
> at 
> org.apache.samza.system.SystemProducers$$anonfun$flush$2.apply(SystemProducers.scala:64)
> at 
> org.apache.samza.system.SystemProducers$$anonfun$flush$2.apply(SystemProducers.scala:64)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206)
> at org.apache.samza.system.SystemProducers.flush(SystemProducers.scala:64)
> at 
> org.apache.samza.task.TaskInstanceCollector.flush(TaskInstanceCollector.scala:69)
> at org.apache.samza.container.TaskInstance.commit(TaskInstance.scala:210)
> at 
> org.apache.samza.container.SamzaContainer$$anonfun$shutdownTask$9.apply(SamzaContainer.scala:1024)
> at 
> org.apache.samza.container.SamzaContainer$$anonfun$shutdownTask$9.apply(SamzaContainer.scala:1024)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206)
> at 
> org.apache.samza.container.SamzaContainer.shutdownTask(SamzaContainer.scala:1024)
> at org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:731)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.samza.system.SystemProducerException: Failed to send 
> message for Source: TaskName-SystemStreamPartition [TestSystemName, 
> test-input-single-partition-topic-21194330-d462-4d98-8915-fbbac765d895, 0] on 
> Syste m:TestSystemName 
> Topic:test-output-single-partition-topic-21194330-d462-4d98-8915-fbbac765d895
> at 
> org.apache.samza.system.kafka.KafkaSystemProducer$$anon$1.onCompletion(KafkaSystemProducer.scala:109)
> at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:201)
> at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:185)
> at 
> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:600)
> at 
> org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:272)
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:223)
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:162)
> ... 1 more
> Caused by: 

[jira] [Assigned] (SAMZA-1649) Improve host-aware allocation to account for strict locality

2018-04-30 Thread Xinyu Liu (JIRA)

 [ 
https://issues.apache.org/jira/browse/SAMZA-1649?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xinyu Liu reassigned SAMZA-1649:


Assignee: Jagadish

> Improve host-aware allocation to account for strict locality
> 
>
> Key: SAMZA-1649
> URL: https://issues.apache.org/jira/browse/SAMZA-1649
> Project: Samza
>  Issue Type: Bug
>Reporter: Jagadish
>Assignee: Jagadish
>Priority: Major
> Fix For: 0.14.1
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (SAMZA-1647) Fix NPE in JobModelExpired event handler.

2018-04-30 Thread Xinyu Liu (JIRA)

 [ 
https://issues.apache.org/jira/browse/SAMZA-1647?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xinyu Liu updated SAMZA-1647:
-
Fix Version/s: 0.14.1

> Fix NPE in JobModelExpired event handler.
> -
>
> Key: SAMZA-1647
> URL: https://issues.apache.org/jira/browse/SAMZA-1647
> Project: Samza
>  Issue Type: Bug
>Reporter: Shanthoosh Venkataraman
>Assignee: Shanthoosh Venkataraman
>Priority: Major
> Fix For: 0.14.1
>
>
> *Problem:*
> There's a minor race condition when container shutdown is triggered from 
> onJobModelExpired event handler. This race condition leads to an NPE and 
> eventually kills the StreamProcessor.
> *Reason:*
> Following is the JobModelExpired  handler logic executed from the 
> DebounceThread:
>  1. Invoke container shutdown. 
>  2. Wait for configured task.shutdown.ms through a Latch(L).
>  3. Log the containerId after the configured wait time.
> Following is the onContainerFailed handler logic executed from the 
> ContainerThread:
>  1. countdown the Latch(L).
>  2. Set the container field to null.
> If the Step 2. of containerFailed handler is executed before the step 3 of 
> JobModelExpired handler, container value will be set null before we log it in 
> JobModelExpired handler. This results in an NPE.
> *Relevant stacktrace:* 
> {code:java}
> [p-00-container-thread-0] ERROR 
> org.apache.samza.processor.StreamProcessor - Container failed. Stopping the 
> processor.
> org.apache.samza.system.SystemProducerException: Flush failed. One or more 
> batches of messages were not sent!
> at 
> org.apache.samza.system.kafka.KafkaSystemProducer$$anonfun$flush$1.apply$mcV$sp(KafkaSystemProducer.scala:159)
> at 
> org.apache.samza.system.kafka.KafkaSystemProducer$$anonfun$flush$1.apply(KafkaSystemProducer.scala:130)
> at 
> org.apache.samza.system.kafka.KafkaSystemProducer$$anonfun$flush$1.apply(KafkaSystemProducer.scala:130)
> at org.apache.samza.util.TimerUtils$class.updateTimer(TimerUtils.scala:37)
> at 
> org.apache.samza.system.kafka.KafkaSystemProducer.updateTimer(KafkaSystemProducer.scala:39)
> at 
> org.apache.samza.system.kafka.KafkaSystemProducer.flush(KafkaSystemProducer.scala:130)
> at 
> org.apache.samza.system.SystemProducers$$anonfun$flush$2.apply(SystemProducers.scala:64)
> at 
> org.apache.samza.system.SystemProducers$$anonfun$flush$2.apply(SystemProducers.scala:64)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206)
> at org.apache.samza.system.SystemProducers.flush(SystemProducers.scala:64)
> at 
> org.apache.samza.task.TaskInstanceCollector.flush(TaskInstanceCollector.scala:69)
> at org.apache.samza.container.TaskInstance.commit(TaskInstance.scala:210)
> at 
> org.apache.samza.container.SamzaContainer$$anonfun$shutdownTask$9.apply(SamzaContainer.scala:1024)
> at 
> org.apache.samza.container.SamzaContainer$$anonfun$shutdownTask$9.apply(SamzaContainer.scala:1024)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206)
> at 
> org.apache.samza.container.SamzaContainer.shutdownTask(SamzaContainer.scala:1024)
> at org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:731)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.samza.system.SystemProducerException: Failed to send 
> message for Source: TaskName-SystemStreamPartition [TestSystemName, 
> test-input-single-partition-topic-21194330-d462-4d98-8915-fbbac765d895, 0] on 
> Syste m:TestSystemName 
> Topic:test-output-single-partition-topic-21194330-d462-4d98-8915-fbbac765d895
> at 
> org.apache.samza.system.kafka.KafkaSystemProducer$$anon$1.onCompletion(KafkaSystemProducer.scala:109)
> at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:201)
> at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:185)
> at 
> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:600)
> at 
> org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:272)
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:223)
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:162)
> ... 1 more
> Caused by: 

[jira] [Resolved] (SAMZA-1649) Improve host-aware allocation to account for strict locality

2018-04-30 Thread Xinyu Liu (JIRA)

 [ 
https://issues.apache.org/jira/browse/SAMZA-1649?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xinyu Liu resolved SAMZA-1649.
--
Resolution: Fixed

> Improve host-aware allocation to account for strict locality
> 
>
> Key: SAMZA-1649
> URL: https://issues.apache.org/jira/browse/SAMZA-1649
> Project: Samza
>  Issue Type: Bug
>Reporter: Jagadish
>Assignee: Jagadish
>Priority: Major
> Fix For: 0.14.1
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (SAMZA-1649) Improve host-aware allocation to account for strict locality

2018-04-30 Thread Xinyu Liu (JIRA)

 [ 
https://issues.apache.org/jira/browse/SAMZA-1649?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xinyu Liu updated SAMZA-1649:
-
Fix Version/s: 0.14.1

> Improve host-aware allocation to account for strict locality
> 
>
> Key: SAMZA-1649
> URL: https://issues.apache.org/jira/browse/SAMZA-1649
> Project: Samza
>  Issue Type: Bug
>Reporter: Jagadish
>Assignee: Jagadish
>Priority: Major
> Fix For: 0.14.1
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (SAMZA-1650) Window high level API: Trigger is not fired at the end of trigger interval for tumbling window

2018-04-30 Thread Xinyu Liu (JIRA)

 [ 
https://issues.apache.org/jira/browse/SAMZA-1650?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xinyu Liu reassigned SAMZA-1650:


Assignee: Aditya

> Window high level API: Trigger is not fired at the end of trigger interval 
> for tumbling window
> --
>
> Key: SAMZA-1650
> URL: https://issues.apache.org/jira/browse/SAMZA-1650
> Project: Samza
>  Issue Type: Bug
>Reporter: Aditya
>Assignee: Aditya
>Priority: Major
> Fix For: 0.14.1
>
>
> [~jagadish1...@gmail.com] root-caused this to computeTriggerInterval() in 
> JobNode where we are not computing the timeInterval if joinTtlInterval is not 
> set, hence discarding windowTimerIntervals even if they are set.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (SAMZA-1650) Window high level API: Trigger is not fired at the end of trigger interval for tumbling window

2018-04-30 Thread Xinyu Liu (JIRA)

 [ 
https://issues.apache.org/jira/browse/SAMZA-1650?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xinyu Liu resolved SAMZA-1650.
--
Resolution: Fixed

> Window high level API: Trigger is not fired at the end of trigger interval 
> for tumbling window
> --
>
> Key: SAMZA-1650
> URL: https://issues.apache.org/jira/browse/SAMZA-1650
> Project: Samza
>  Issue Type: Bug
>Reporter: Aditya
>Assignee: Aditya
>Priority: Major
> Fix For: 0.14.1
>
>
> [~jagadish1...@gmail.com] root-caused this to computeTriggerInterval() in 
> JobNode where we are not computing the timeInterval if joinTtlInterval is not 
> set, hence discarding windowTimerIntervals even if they are set.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (SAMZA-1650) Window high level API: Trigger is not fired at the end of trigger interval for tumbling window

2018-04-30 Thread Xinyu Liu (JIRA)

 [ 
https://issues.apache.org/jira/browse/SAMZA-1650?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xinyu Liu updated SAMZA-1650:
-
Fix Version/s: 0.14.1

> Window high level API: Trigger is not fired at the end of trigger interval 
> for tumbling window
> --
>
> Key: SAMZA-1650
> URL: https://issues.apache.org/jira/browse/SAMZA-1650
> Project: Samza
>  Issue Type: Bug
>Reporter: Aditya
>Assignee: Aditya
>Priority: Major
> Fix For: 0.14.1
>
>
> [~jagadish1...@gmail.com] root-caused this to computeTriggerInterval() in 
> JobNode where we are not computing the timeInterval if joinTtlInterval is not 
> set, hence discarding windowTimerIntervals even if they are set.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (SAMZA-1651) Samza-sql: Implement GROUP BY SQL operator

2018-04-30 Thread Xinyu Liu (JIRA)

 [ 
https://issues.apache.org/jira/browse/SAMZA-1651?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xinyu Liu updated SAMZA-1651:
-
Fix Version/s: 0.14.1

> Samza-sql: Implement GROUP BY SQL operator
> --
>
> Key: SAMZA-1651
> URL: https://issues.apache.org/jira/browse/SAMZA-1651
> Project: Samza
>  Issue Type: Bug
>Reporter: Aditya
>Assignee: Aditya
>Priority: Major
> Fix For: 0.14.1
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (SAMZA-1691) Support get iterable from KeyValueStore

2018-04-30 Thread Xinyu Liu (JIRA)

 [ 
https://issues.apache.org/jira/browse/SAMZA-1691?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xinyu Liu updated SAMZA-1691:
-
Fix Version/s: 0.14.1

> Support get iterable from KeyValueStore
> ---
>
> Key: SAMZA-1691
> URL: https://issues.apache.org/jira/browse/SAMZA-1691
> Project: Samza
>  Issue Type: Bug
>Reporter: Xinyu Liu
>Assignee: Xinyu Liu
>Priority: Major
> Fix For: 0.14.1
>
>
> Right now for KeyValueStore we have a range query to return an iterator. For 
> usage in BEAM, we need a iterable which can create an iterator when needed. 
> Add the iterate() function in KeyValueStore to support it. It's implemented 
> as follows:
> 1) for rocksDb, it will create the iterator when it's called, which will has 
> a snapshot of the elements. Then every time when the iterator is needed, we 
> will seek the iterator from beginning;
> 2) for inMemoryDb, it will create the snapshot submap when iterate() is 
> called. The submap is an iterable and it can return a new iterator when 
> needed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (SAMZA-1653) Support waitForFinish() in remote and add duration to both

2018-04-30 Thread Xinyu Liu (JIRA)

 [ 
https://issues.apache.org/jira/browse/SAMZA-1653?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xinyu Liu updated SAMZA-1653:
-
Fix Version/s: 0.14.1

> Support waitForFinish() in remote and add duration to both
> --
>
> Key: SAMZA-1653
> URL: https://issues.apache.org/jira/browse/SAMZA-1653
> Project: Samza
>  Issue Type: Bug
>Reporter: Xinyu Liu
>Assignee: Bharath Kumarasubramanian
>Priority: Major
> Fix For: 0.14.1
>
>
> WE need the consistent api for waitForFinish() in remote runner. This is for 
> two reasons:
>  # feature parity with local runner
>  # for future use case if we support submit a batch job to a remote cluster 
> and allow user program to wait till it completes to check the output.
> We need another variant of waitForFinish(duration). This is to be compatible 
> with BEAM API. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (SAMZA-1656) EventHubSystemAdmin does not fetch metadata for valid streams.

2018-04-30 Thread Xinyu Liu (JIRA)

 [ 
https://issues.apache.org/jira/browse/SAMZA-1656?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xinyu Liu updated SAMZA-1656:
-
Fix Version/s: (was: 0.15.0)
   0.14.1

> EventHubSystemAdmin does not fetch metadata for valid streams.
> --
>
> Key: SAMZA-1656
> URL: https://issues.apache.org/jira/browse/SAMZA-1656
> Project: Samza
>  Issue Type: Bug
>Reporter: Shanthoosh Venkataraman
>Assignee: Shanthoosh Venkataraman
>Priority: Major
> Fix For: 0.14.1
>
>
> Currently successive invocation of 
> EventHubSystemAdmin.getSystemStreamMetadata with the same stream collection 
> returns empty results.
> This is an implementation bug, where the streams requested as a part of prior 
> invocations of EventHubSystemAdmin.getSystemStreamMetadata are ignored(due to 
> stale caching). 
> This bug causes the JobModel generation phase to fail and kills the 
> StreamProcessor. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (SAMZA-1664) ZkJobCoordinator stability fixes.

2018-04-30 Thread Xinyu Liu (JIRA)

 [ 
https://issues.apache.org/jira/browse/SAMZA-1664?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xinyu Liu updated SAMZA-1664:
-
Fix Version/s: 0.14.1

> ZkJobCoordinator stability fixes.
> -
>
> Key: SAMZA-1664
> URL: https://issues.apache.org/jira/browse/SAMZA-1664
> Project: Samza
>  Issue Type: Bug
>Reporter: Shanthoosh Venkataraman
>Assignee: Shanthoosh Venkataraman
>Priority: Major
> Fix For: 0.14.1
>
>
> Issues fixed:
> * Handle job coordinator shutdown gracefully in case of unclean container 
> shutdowns.
> * Fix the zookeeper session handling logic.
> * Fix the forever retry timeout in ZkClient re-connect.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (SAMZA-1664) ZkJobCoordinator stability fixes.

2018-04-30 Thread Xinyu Liu (JIRA)

 [ 
https://issues.apache.org/jira/browse/SAMZA-1664?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xinyu Liu resolved SAMZA-1664.
--
Resolution: Fixed

> ZkJobCoordinator stability fixes.
> -
>
> Key: SAMZA-1664
> URL: https://issues.apache.org/jira/browse/SAMZA-1664
> Project: Samza
>  Issue Type: Bug
>Reporter: Shanthoosh Venkataraman
>Assignee: Shanthoosh Venkataraman
>Priority: Major
> Fix For: 0.14.1
>
>
> Issues fixed:
> * Handle job coordinator shutdown gracefully in case of unclean container 
> shutdowns.
> * Fix the zookeeper session handling logic.
> * Fix the forever retry timeout in ZkClient re-connect.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (SAMZA-1671) SamzaSQL: add insert into table support

2018-04-30 Thread Xinyu Liu (JIRA)

 [ 
https://issues.apache.org/jira/browse/SAMZA-1671?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xinyu Liu resolved SAMZA-1671.
--
Resolution: Fixed

> SamzaSQL: add insert into table support
> ---
>
> Key: SAMZA-1671
> URL: https://issues.apache.org/jira/browse/SAMZA-1671
> Project: Samza
>  Issue Type: Improvement
>Reporter: Peng Du
>Assignee: Peng Du
>Priority: Major
> Fix For: 0.14.1
>
>
> SamzaSQL recently has added support for table joins. It still lacks the 
> support for inserting into a table as currently only stream is supported as 
> the sink. 
> We need to add the capability into SamzaSQL to allow inserting into a table, 
> ie. local and remote. This is especially useful when used with remote table 
> where we can use SQL to perform writes to remote services, eg. Key-value 
> store, REST services.  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (SAMZA-1671) SamzaSQL: add insert into table support

2018-04-30 Thread Xinyu Liu (JIRA)

 [ 
https://issues.apache.org/jira/browse/SAMZA-1671?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xinyu Liu updated SAMZA-1671:
-
Fix Version/s: 0.14.1

> SamzaSQL: add insert into table support
> ---
>
> Key: SAMZA-1671
> URL: https://issues.apache.org/jira/browse/SAMZA-1671
> Project: Samza
>  Issue Type: Improvement
>Reporter: Peng Du
>Assignee: Peng Du
>Priority: Major
> Fix For: 0.14.1
>
>
> SamzaSQL recently has added support for table joins. It still lacks the 
> support for inserting into a table as currently only stream is supported as 
> the sink. 
> We need to add the capability into SamzaSQL to allow inserting into a table, 
> ie. local and remote. This is especially useful when used with remote table 
> where we can use SQL to perform writes to remote services, eg. Key-value 
> store, REST services.  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (SAMZA-1681) Samza-sql: Add support for handling older record schema versions in AvroRelConverter

2018-04-30 Thread Xinyu Liu (JIRA)

 [ 
https://issues.apache.org/jira/browse/SAMZA-1681?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xinyu Liu updated SAMZA-1681:
-
Fix Version/s: 0.14.1

> Samza-sql: Add support for handling older record schema versions in 
> AvroRelConverter
> 
>
> Key: SAMZA-1681
> URL: https://issues.apache.org/jira/browse/SAMZA-1681
> Project: Samza
>  Issue Type: Bug
>Reporter: Aditya
>Assignee: Aditya
>Priority: Major
> Fix For: 0.14.1
>
>
> Handling newer version record schema will be addressed as part of SAMZA-1679



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (SAMZA-1685) Need to resolve redirects for virtualenv download location for running integration tests

2018-04-30 Thread Xinyu Liu (JIRA)

 [ 
https://issues.apache.org/jira/browse/SAMZA-1685?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xinyu Liu updated SAMZA-1685:
-
Fix Version/s: 0.14.1

> Need to resolve redirects for virtualenv download location for running 
> integration tests
> 
>
> Key: SAMZA-1685
> URL: https://issues.apache.org/jira/browse/SAMZA-1685
> Project: Samza
>  Issue Type: Bug
>  Components: test
>Reporter: Cameron Lee
>Priority: Major
> Fix For: 0.14.1
>
>
> The current download location for virtualenv in the bin/integration-tests.sh 
> script returns 301 Moved Permanently instead of the actual package, so the 
> integration test startup fails.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (SAMZA-1685) Need to resolve redirects for virtualenv download location for running integration tests

2018-04-30 Thread Xinyu Liu (JIRA)

 [ 
https://issues.apache.org/jira/browse/SAMZA-1685?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xinyu Liu reassigned SAMZA-1685:


Assignee: Cameron Lee

> Need to resolve redirects for virtualenv download location for running 
> integration tests
> 
>
> Key: SAMZA-1685
> URL: https://issues.apache.org/jira/browse/SAMZA-1685
> Project: Samza
>  Issue Type: Bug
>  Components: test
>Reporter: Cameron Lee
>Assignee: Cameron Lee
>Priority: Major
> Fix For: 0.14.1
>
>
> The current download location for virtualenv in the bin/integration-tests.sh 
> script returns 301 Moved Permanently instead of the actual package, so the 
> integration test startup fails.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (SAMZA-1686) Set finite operation timeout when creating zkClient.

2018-04-30 Thread Xinyu Liu (JIRA)

 [ 
https://issues.apache.org/jira/browse/SAMZA-1686?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xinyu Liu updated SAMZA-1686:
-
Fix Version/s: (was: 0.15.0)
   0.14.1

> Set finite operation timeout when creating zkClient.
> 
>
> Key: SAMZA-1686
> URL: https://issues.apache.org/jira/browse/SAMZA-1686
> Project: Samza
>  Issue Type: Bug
>Reporter: Shanthoosh Venkataraman
>Assignee: Shanthoosh Venkataraman
>Priority: Major
> Fix For: 0.14.1
>
>
> Currently zkClient is created with operationRetryTimeOut of -1. This causes 
> zkClient to retry indefinitely in case of irrecoverable exceptions thereby 
> delaying the StreamProcessor shutdown. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (SAMZA-1624) EventHub system should prefix the configs with senstive for SasKey and SasToken

2018-04-30 Thread Hai (JIRA)

 [ 
https://issues.apache.org/jira/browse/SAMZA-1624?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hai updated SAMZA-1624:
---
Fix Version/s: 0.14.1

> EventHub system should prefix the configs with senstive for SasKey and 
> SasToken
> ---
>
> Key: SAMZA-1624
> URL: https://issues.apache.org/jira/browse/SAMZA-1624
> Project: Samza
>  Issue Type: Bug
>Reporter: Srinivasulu Punuru
>Priority: Major
>  Labels: EventHub
> Fix For: 0.14.1
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (SAMZA-1673) EventHub: readLatency metric returns negative value

2018-04-30 Thread Hai (JIRA)

 [ 
https://issues.apache.org/jira/browse/SAMZA-1673?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hai updated SAMZA-1673:
---
Fix Version/s: 0.14.1

> EventHub: readLatency metric returns negative value
> ---
>
> Key: SAMZA-1673
> URL: https://issues.apache.org/jira/browse/SAMZA-1673
> Project: Samza
>  Issue Type: Sub-task
>Reporter: Srinivasulu Punuru
>Assignee: Hai
>Priority: Major
>  Labels: EventHub
> Fix For: 0.14.1
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (SAMZA-1675) EventHub: Log the metadata that we are fetching from the event hubs.

2018-04-30 Thread Hai (JIRA)

 [ 
https://issues.apache.org/jira/browse/SAMZA-1675?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hai updated SAMZA-1675:
---
Fix Version/s: 0.14.1

> EventHub: Log the metadata that we are fetching from the event hubs.
> 
>
> Key: SAMZA-1675
> URL: https://issues.apache.org/jira/browse/SAMZA-1675
> Project: Samza
>  Issue Type: Sub-task
>Reporter: Srinivasulu Punuru
>Assignee: Hai
>Priority: Major
>  Labels: EventHub
> Fix For: 0.14.1
>
>
> Log the partitionRuntimeinformation that we are fetching and also any other 
> missing logging in the event hub system.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (SAMZA-1674) EventHub: Rename readLatency to consumptionLagMs

2018-04-30 Thread Hai (JIRA)

 [ 
https://issues.apache.org/jira/browse/SAMZA-1674?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hai updated SAMZA-1674:
---
Fix Version/s: 0.14.1

> EventHub: Rename readLatency to consumptionLagMs
> 
>
> Key: SAMZA-1674
> URL: https://issues.apache.org/jira/browse/SAMZA-1674
> Project: Samza
>  Issue Type: Sub-task
>Reporter: Srinivasulu Punuru
>Assignee: Hai
>Priority: Major
>  Labels: EventHub
> Fix For: 0.14.1
>
>
> Rename the metric readLatency to consumptionLagMs



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (SAMZA-1625) EventHub systemAdmin is swallowing exceptions

2018-04-30 Thread Hai (JIRA)

 [ 
https://issues.apache.org/jira/browse/SAMZA-1625?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hai updated SAMZA-1625:
---
Fix Version/s: 0.14.1

> EventHub systemAdmin is swallowing exceptions
> -
>
> Key: SAMZA-1625
> URL: https://issues.apache.org/jira/browse/SAMZA-1625
> Project: Samza
>  Issue Type: Sub-task
>Reporter: Srinivasulu Punuru
>Assignee: Hai
>Priority: Major
>  Labels: EventHub
> Fix For: 0.14.1
>
>
> Right now eventhubsystem admin is swallowing exception and creating a new 
> exception rather than wrapping them.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (SAMZA-1676) miscellaneous fix and improvement for eventhubs system

2018-04-30 Thread Hai (JIRA)

 [ 
https://issues.apache.org/jira/browse/SAMZA-1676?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hai updated SAMZA-1676:
---
Fix Version/s: 0.14.1

> miscellaneous fix and improvement for eventhubs system
> --
>
> Key: SAMZA-1676
> URL: https://issues.apache.org/jira/browse/SAMZA-1676
> Project: Samza
>  Issue Type: Improvement
>Reporter: Hai
>Assignee: Hai
>Priority: Major
>  Labels: EventHub
> Fix For: 0.14.1
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (SAMZA-1688) use per partition eventhubs client

2018-04-30 Thread Xinyu Liu (JIRA)

 [ 
https://issues.apache.org/jira/browse/SAMZA-1688?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xinyu Liu updated SAMZA-1688:
-
Fix Version/s: 0.14.1

> use per partition eventhubs client
> --
>
> Key: SAMZA-1688
> URL: https://issues.apache.org/jira/browse/SAMZA-1688
> Project: Samza
>  Issue Type: Improvement
>Reporter: Hai
>Assignee: Hai
>Priority: Major
>  Labels: EventHub
> Fix For: 0.14.1
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (SAMZA-1689) Add validations before state transitions in ZkBarrierForVersionUpgrade.

2018-04-30 Thread Xinyu Liu (JIRA)

 [ 
https://issues.apache.org/jira/browse/SAMZA-1689?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xinyu Liu resolved SAMZA-1689.
--
Resolution: Fixed

> Add validations before state transitions in ZkBarrierForVersionUpgrade. 
> 
>
> Key: SAMZA-1689
> URL: https://issues.apache.org/jira/browse/SAMZA-1689
> Project: Samza
>  Issue Type: Bug
>Reporter: Shanthoosh Venkataraman
>Assignee: Shanthoosh Venkataraman
>Priority: Major
> Fix For: 0.14.1
>
>
> *Problem:*
> A barrier in zookeeper can be in either DONE or TimedOut state. Existing 
> ZkBarrier implementation doesn't have any state validations before acting 
> upon the barrier events received from zookeeper. This can corrupt barrier 
> states when any of the following occur:
>  - When a stream processor joins a barrier marked as timed out by the leader 
> of the group.
>  - When the leader of a group expires a barrier marked as Done. This can 
> happen when a leader plays out a stale events from its in-memory buffer.
> This will cause the processors group to go through unnecessary rebalancing 
> phase(worse, corrupt the barrier state).
> *Changes:*
>  - Introduce a barrier state: NEW which will be the beginning state of a 
> barrier(In existing implementation, barrier is created with empty/null 
> value). Barrier will be marked as Done or TimedOut only if the current 
> barrier state is NEW. 
>  - Fix the disabled barrier test cases. Add new test cases to validate the 
> changed functionality.
>  - Improve logging to aid debugging.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (SAMZA-1689) Add validations before state transitions in ZkBarrierForVersionUpgrade.

2018-04-30 Thread Xinyu Liu (JIRA)

 [ 
https://issues.apache.org/jira/browse/SAMZA-1689?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xinyu Liu updated SAMZA-1689:
-
Fix Version/s: 0.14.1

> Add validations before state transitions in ZkBarrierForVersionUpgrade. 
> 
>
> Key: SAMZA-1689
> URL: https://issues.apache.org/jira/browse/SAMZA-1689
> Project: Samza
>  Issue Type: Bug
>Reporter: Shanthoosh Venkataraman
>Assignee: Shanthoosh Venkataraman
>Priority: Major
> Fix For: 0.14.1
>
>
> *Problem:*
> A barrier in zookeeper can be in either DONE or TimedOut state. Existing 
> ZkBarrier implementation doesn't have any state validations before acting 
> upon the barrier events received from zookeeper. This can corrupt barrier 
> states when any of the following occur:
>  - When a stream processor joins a barrier marked as timed out by the leader 
> of the group.
>  - When the leader of a group expires a barrier marked as Done. This can 
> happen when a leader plays out a stale events from its in-memory buffer.
> This will cause the processors group to go through unnecessary rebalancing 
> phase(worse, corrupt the barrier state).
> *Changes:*
>  - Introduce a barrier state: NEW which will be the beginning state of a 
> barrier(In existing implementation, barrier is created with empty/null 
> value). Barrier will be marked as Done or TimedOut only if the current 
> barrier state is NEW. 
>  - Fix the disabled barrier test cases. Add new test cases to validate the 
> changed functionality.
>  - Improve logging to aid debugging.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (SAMZA-1681) Samza-sql: Add support for handling older record schema versions in AvroRelConverter

2018-04-30 Thread Aditya (JIRA)

 [ 
https://issues.apache.org/jira/browse/SAMZA-1681?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aditya resolved SAMZA-1681.
---
Resolution: Fixed

> Samza-sql: Add support for handling older record schema versions in 
> AvroRelConverter
> 
>
> Key: SAMZA-1681
> URL: https://issues.apache.org/jira/browse/SAMZA-1681
> Project: Samza
>  Issue Type: Bug
>Reporter: Aditya
>Assignee: Aditya
>Priority: Major
>
> Handling newer version record schema will be addressed as part of SAMZA-1679



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (SAMZA-1693) Samza-sql: Adding Serde for rel record

2018-04-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/SAMZA-1693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16458727#comment-16458727
 ] 

ASF GitHub Bot commented on SAMZA-1693:
---

GitHub user atoomula opened a pull request:

https://github.com/apache/samza/pull/495

SAMZA-1693: Samza-sql - Adding Serde for rel record and few other minor 
fixes for Avro and Rel conversion.

Adding Serde for rel record, as calcite expects the keys to be in string 
format. Rel converters are always expected to provide keys as strings. If key 
is an avro record, it is expected that the rel converter changes the avro 
record to rel record and serializes it and deserializes it when conerting rel 
message to samza message.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/atoomula/samza rel1

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/samza/pull/495.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #495


commit deb5cad77e6a4e7d8634825e56f69d3b9a4bb5a4
Author: Aditya Toomula 
Date:   2018-04-30T16:25:48Z

Adding Serde for rel record as well as keys could be avro records but 
calcite expects the keys to be in string format. Rel converters are always 
expected to provide keys as strings. If key is an avro record, it is expected 
that the rel converter changes the avro record to rel record and serializes it 
and deserializes it when conerting rel message to samza message.




> Samza-sql: Adding Serde for rel record
> --
>
> Key: SAMZA-1693
> URL: https://issues.apache.org/jira/browse/SAMZA-1693
> Project: Samza
>  Issue Type: Bug
>Reporter: Aditya
>Assignee: Aditya
>Priority: Major
>
> Adding Serde for rel record, as calcite expects the keys to be in string 
> format. Rel converters are always expected to provide keys as strings. If key 
> is an avro record, it is expected that the rel converter changes the avro 
> record to rel record and serializes it and deserializes it when conerting rel 
> message to samza message.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (SAMZA-1693) Samza-sql: Adding Serde for rel record

2018-04-30 Thread Aditya (JIRA)
Aditya created SAMZA-1693:
-

 Summary: Samza-sql: Adding Serde for rel record
 Key: SAMZA-1693
 URL: https://issues.apache.org/jira/browse/SAMZA-1693
 Project: Samza
  Issue Type: Bug
Reporter: Aditya
Assignee: Aditya


Adding Serde for rel record, as calcite expects the keys to be in string 
format. Rel converters are always expected to provide keys as strings. If key 
is an avro record, it is expected that the rel converter changes the avro 
record to rel record and serializes it and deserializes it when conerting rel 
message to samza message.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)