[GitHub] [samza] cameronlee314 commented on a change in pull request #947: SAMZA-2120: Enable custom handling of ConsumerRecords consumed by Kafka

2019-03-11 Thread GitBox
cameronlee314 commented on a change in pull request #947: SAMZA-2120: Enable 
custom handling of ConsumerRecords consumed by Kafka
URL: https://github.com/apache/samza/pull/947#discussion_r264368204
 
 

 ##
 File path: 
samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
 ##
 @@ -51,7 +53,7 @@
  * This class is not thread safe. There will be only one instance of this 
class per KafkaSystemConsumer object.
  * We still need some synchronization around kafkaConsumer. See pollConsumer() 
method for details.
  */
-class KafkaConsumerProxy {
+public class KafkaConsumerProxy {
 
 Review comment:
   The  does get used for the kafka `Consumer` which is wired in. 
Ultimately, it doesn't get used since `IncomingMessageEnvelope` doesn't have 
type params, but it looks like it helps to ensure the wiring matches up for the 
proxy and Samza/Kafka consumer objects.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] shanthoosh opened a new pull request #949: Remove scala 2.10 version from documentation.

2019-03-11 Thread GitBox
shanthoosh opened a new pull request #949: Remove scala 2.10 version from 
documentation.
URL: https://github.com/apache/samza/pull/949
 
 
   Post 1.0, samza does not support scala 2.10 version. This patch removes the 
references of the scala 2.10 version from the documentation.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] rmatharu commented on a change in pull request #941: SAMZA-2120: Enable custom handling of ConsumerRecords consumed by Kafka

2019-03-07 Thread GitBox
rmatharu commented on a change in pull request #941: SAMZA-2120: Enable custom 
handling of ConsumerRecords consumed by Kafka 
URL: https://github.com/apache/samza/pull/941#discussion_r263553107
 
 

 ##
 File path: 
samza-core/src/main/java/org/apache/samza/storage/TaskSideInputStorageManager.java
 ##
 @@ -74,7 +74,7 @@
   private final SystemAdmins systemAdmins;
   private final TaskName taskName;
   private final TaskMode taskMode;
-  private final Map lastProcessedOffsets = new 
ConcurrentHashMap<>();
 
 Review comment:
   Perhaps undo this change. 
   Since this map actually stores the "last processed offset for the given side 
input" (which is used to emit a metric). 
   SideInputs are only checkpointed locally (see writeOffsetFiles).


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] rmatharu commented on a change in pull request #941: SAMZA-2120: Enable custom handling of ConsumerRecords consumed by Kafka

2019-03-07 Thread GitBox
rmatharu commented on a change in pull request #941: SAMZA-2120: Enable custom 
handling of ConsumerRecords consumed by Kafka 
URL: https://github.com/apache/samza/pull/941#discussion_r263554894
 
 

 ##
 File path: samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java
 ##
 @@ -611,10 +611,12 @@ public void run() {
 List callbacksToUpdate = 
callbackManager.updateCallback(callbackImpl);
 for (TaskCallbackImpl callbackToUpdate : callbacksToUpdate) {
   IncomingMessageEnvelope envelope = callbackToUpdate.envelope;
-  log.trace("Update offset for ssp {}, offset {}", 
envelope.getSystemStreamPartition(), envelope.getOffset());
+  log.trace("Update offset for ssp {}, checkpoint offset {}", 
envelope.getSystemStreamPartition(),
 
 Review comment:
   log.trace("Update offset for ssp {}, offset {}, checkpoint offset {}", 
envelope.getSystemStreamPartition(), envelope.getOffset(), 
envelope.getCheckpointOffset());


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] dxichen opened a new pull request #944: Release version updates

2019-03-07 Thread GitBox
dxichen opened a new pull request #944: Release version updates
URL: https://github.com/apache/samza/pull/944
 
 
   Updated hard coded version for the upcoming release


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] xinyuiscool merged pull request #943: Update Samza version for 1.1.0 release branch

2019-03-07 Thread GitBox
xinyuiscool merged pull request #943: Update Samza version for 1.1.0 release 
branch
URL: https://github.com/apache/samza/pull/943
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] mynameborat commented on a change in pull request #942: Bugfix: Recent CSM refactor was causing some metrics to not be emitted. Fixed -restore-time metric.

2019-03-07 Thread GitBox
mynameborat commented on a change in pull request #942: Bugfix: Recent CSM 
refactor was causing some metrics to not be emitted. Fixed -restore-time 
metric.
URL: https://github.com/apache/samza/pull/942#discussion_r263637333
 
 

 ##
 File path: 
samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
 ##
 @@ -573,6 +576,25 @@ private StorageEngine createStore(String storeName, 
TaskName taskName, TaskModel
 return 
this.sideInputStorageManagers.values().stream().collect(Collectors.toSet());
   }
 
+  /**
+   * Registers any CSM created metrics such as side-inputs related metrics, 
and standby-task related metrics.
+   * @param metricsReporters metrics reporters to use
+   */
+  public void registerMetrics(Map metricsReporters) {
 
 Review comment:
   +1, we would need to change the `SystemConsumerMetrics` to take a 
configurable group name instead of using the default class name as the group 
id. I feel that change is much simpler compared to copying over the boiler 
plate code for metrics registration to CSM.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] cameronlee314 commented on a change in pull request #940: SAMZA-2121: Add checkpoint offset field to IncomingMessageEnvelope

2019-03-07 Thread GitBox
cameronlee314 commented on a change in pull request #940: SAMZA-2121: Add 
checkpoint offset field to IncomingMessageEnvelope
URL: https://github.com/apache/samza/pull/940#discussion_r263639549
 
 

 ##
 File path: 
samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java
 ##
 @@ -86,7 +101,29 @@ public IncomingMessageEnvelope(SystemStreamPartition 
systemStreamPartition, Stri
*/
   public IncomingMessageEnvelope(SystemStreamPartition systemStreamPartition, 
String offset,
   Object key, Object message, int size, long eventTime, long arrivalTime) {
-this(systemStreamPartition, offset, key, message, size);
+this(systemStreamPartition, offset, offset, key, message, size, eventTime, 
arrivalTime);
+  }
+
+  /**
+   * Constructs a new IncomingMessageEnvelope from specified components
+   * @param systemStreamPartition The aggregate object representing the 
incoming stream name, the name of the cluster
+   * from which the stream came, and the partition of the stream from which 
the message was received.
+   * @param offset The offset in the partition that the message was received 
from.
+   * @param checkpointOffset offset that can be checkpointed when this {@link 
IncomingMessageEnvelope} is processed
+   * @param key A deserialized key received from the partition offset.
+   * @param message A deserialized message received from the partition offset.
+   * @param size size of the message and key in bytes.
+   * @param eventTime the timestamp (in epochMillis) of when this event 
happened
+   * @param arrivalTime the timestamp (in epochMillis) of when this event 
arrived to (i.e., was picked-up by) Samza
+   */
+  public IncomingMessageEnvelope(SystemStreamPartition systemStreamPartition, 
String offset, String checkpointOffset,
+  Object key, Object message, int size, long eventTime, long arrivalTime) {
+this.systemStreamPartition = systemStreamPartition;
+this.offset = offset;
+this.checkpointOffset = checkpointOffset;
 
 Review comment:
   1. Some more details and an example are described at 
https://issues.apache.org/jira/browse/SAMZA-2120.
   2. We could make this part of the internal API by creating an 
`InternalIncomingMessageEnvelope` which is passed around internally and extends 
`IncomingMessageEnvelope`. Would that help avoid complicating the public API?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] vjagadish1989 edited a comment on issue #905: SAMZA-2055: [WIP] Async high level api

2019-03-07 Thread GitBox
vjagadish1989 edited a comment on issue #905: SAMZA-2055: [WIP] Async high 
level api
URL: https://github.com/apache/samza/pull/905#issuecomment-470775503
 
 
   also adding @xinyuiscool in case he has additional feedback on the async-api


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] dxichen opened a new pull request #943: Update Samza version for 1.1.0 release branch

2019-03-07 Thread GitBox
dxichen opened a new pull request #943: Update Samza version for 1.1.0 release 
branch
URL: https://github.com/apache/samza/pull/943
 
 
   Change versions as per RELEASE.md


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] shanthoosh commented on a change in pull request #940: SAMZA-2121: Add checkpoint offset field to IncomingMessageEnvelope

2019-03-07 Thread GitBox
shanthoosh commented on a change in pull request #940: SAMZA-2121: Add 
checkpoint offset field to IncomingMessageEnvelope
URL: https://github.com/apache/samza/pull/940#discussion_r263628931
 
 

 ##
 File path: 
samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java
 ##
 @@ -86,7 +101,29 @@ public IncomingMessageEnvelope(SystemStreamPartition 
systemStreamPartition, Stri
*/
   public IncomingMessageEnvelope(SystemStreamPartition systemStreamPartition, 
String offset,
   Object key, Object message, int size, long eventTime, long arrivalTime) {
-this(systemStreamPartition, offset, key, message, size);
+this(systemStreamPartition, offset, offset, key, message, size, eventTime, 
arrivalTime);
+  }
+
+  /**
+   * Constructs a new IncomingMessageEnvelope from specified components
+   * @param systemStreamPartition The aggregate object representing the 
incoming stream name, the name of the cluster
+   * from which the stream came, and the partition of the stream from which 
the message was received.
+   * @param offset The offset in the partition that the message was received 
from.
+   * @param checkpointOffset offset that can be checkpointed when this {@link 
IncomingMessageEnvelope} is processed
+   * @param key A deserialized key received from the partition offset.
+   * @param message A deserialized message received from the partition offset.
+   * @param size size of the message and key in bytes.
+   * @param eventTime the timestamp (in epochMillis) of when this event 
happened
+   * @param arrivalTime the timestamp (in epochMillis) of when this event 
arrived to (i.e., was picked-up by) Samza
+   */
+  public IncomingMessageEnvelope(SystemStreamPartition systemStreamPartition, 
String offset, String checkpointOffset,
+  Object key, Object message, int size, long eventTime, long arrivalTime) {
+this.systemStreamPartition = systemStreamPartition;
+this.offset = offset;
+this.checkpointOffset = checkpointOffset;
 
 Review comment:
   1. Can you please share the rationale behind adding checkpoint offset to 
`IncomingMessageEnvelope` public API. 
   2. Can you please throw some light on the other options considered here? Is 
it not plausible to do this making this change in public API. 
   
   IMHO, it'll be better not to proliferate the public API with these internal 
details which will be hard to remove later on. Anyone who reads this in the 
future, would wonder why there are two different offsets in 
`IncomingMessageEnvelope` contract. Is it possible to maintain the 
mapping(association we want for safe-checkpoint) internally within the samza 
framework side and expose it to users.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] shanthoosh commented on a change in pull request #940: SAMZA-2121: Add checkpoint offset field to IncomingMessageEnvelope

2019-03-07 Thread GitBox
shanthoosh commented on a change in pull request #940: SAMZA-2121: Add 
checkpoint offset field to IncomingMessageEnvelope
URL: https://github.com/apache/samza/pull/940#discussion_r263628931
 
 

 ##
 File path: 
samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java
 ##
 @@ -86,7 +101,29 @@ public IncomingMessageEnvelope(SystemStreamPartition 
systemStreamPartition, Stri
*/
   public IncomingMessageEnvelope(SystemStreamPartition systemStreamPartition, 
String offset,
   Object key, Object message, int size, long eventTime, long arrivalTime) {
-this(systemStreamPartition, offset, key, message, size);
+this(systemStreamPartition, offset, offset, key, message, size, eventTime, 
arrivalTime);
+  }
+
+  /**
+   * Constructs a new IncomingMessageEnvelope from specified components
+   * @param systemStreamPartition The aggregate object representing the 
incoming stream name, the name of the cluster
+   * from which the stream came, and the partition of the stream from which 
the message was received.
+   * @param offset The offset in the partition that the message was received 
from.
+   * @param checkpointOffset offset that can be checkpointed when this {@link 
IncomingMessageEnvelope} is processed
+   * @param key A deserialized key received from the partition offset.
+   * @param message A deserialized message received from the partition offset.
+   * @param size size of the message and key in bytes.
+   * @param eventTime the timestamp (in epochMillis) of when this event 
happened
+   * @param arrivalTime the timestamp (in epochMillis) of when this event 
arrived to (i.e., was picked-up by) Samza
+   */
+  public IncomingMessageEnvelope(SystemStreamPartition systemStreamPartition, 
String offset, String checkpointOffset,
+  Object key, Object message, int size, long eventTime, long arrivalTime) {
+this.systemStreamPartition = systemStreamPartition;
+this.offset = offset;
+this.checkpointOffset = checkpointOffset;
 
 Review comment:
   1. Can you please share the rationale behind adding checkpoint offset to 
`IncomingMessageEnvelope` public API. 
   2. Can you please throw some light on the other options considered here? Is 
it not plausible to do this making this change in public API. 
   
   IMHO, it'll be better not to proliferate the public API with these internal 
details which will be hard to remove later on. Anyone who reads this in the 
future, would wonder why there are two different offsets in 
`IncomingMessageEnvelope` contract. Is it possible to maintain the 
mapping(association we want for safe-checkpoint) internally within the samza 
framework side and not expose it to samza-users.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] shanthoosh commented on a change in pull request #940: SAMZA-2121: Add checkpoint offset field to IncomingMessageEnvelope

2019-03-07 Thread GitBox
shanthoosh commented on a change in pull request #940: SAMZA-2121: Add 
checkpoint offset field to IncomingMessageEnvelope
URL: https://github.com/apache/samza/pull/940#discussion_r263628931
 
 

 ##
 File path: 
samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java
 ##
 @@ -86,7 +101,29 @@ public IncomingMessageEnvelope(SystemStreamPartition 
systemStreamPartition, Stri
*/
   public IncomingMessageEnvelope(SystemStreamPartition systemStreamPartition, 
String offset,
   Object key, Object message, int size, long eventTime, long arrivalTime) {
-this(systemStreamPartition, offset, key, message, size);
+this(systemStreamPartition, offset, offset, key, message, size, eventTime, 
arrivalTime);
+  }
+
+  /**
+   * Constructs a new IncomingMessageEnvelope from specified components
+   * @param systemStreamPartition The aggregate object representing the 
incoming stream name, the name of the cluster
+   * from which the stream came, and the partition of the stream from which 
the message was received.
+   * @param offset The offset in the partition that the message was received 
from.
+   * @param checkpointOffset offset that can be checkpointed when this {@link 
IncomingMessageEnvelope} is processed
+   * @param key A deserialized key received from the partition offset.
+   * @param message A deserialized message received from the partition offset.
+   * @param size size of the message and key in bytes.
+   * @param eventTime the timestamp (in epochMillis) of when this event 
happened
+   * @param arrivalTime the timestamp (in epochMillis) of when this event 
arrived to (i.e., was picked-up by) Samza
+   */
+  public IncomingMessageEnvelope(SystemStreamPartition systemStreamPartition, 
String offset, String checkpointOffset,
+  Object key, Object message, int size, long eventTime, long arrivalTime) {
+this.systemStreamPartition = systemStreamPartition;
+this.offset = offset;
+this.checkpointOffset = checkpointOffset;
 
 Review comment:
   1. Can you please share the rationale behind adding checkpoint offset to 
`IncomingMessageEnvelope` public API. 
   2. Can you please throw some light on the other options considered here? Is 
it not plausible to do this without making this change in public API. 
   
   IMHO, it'll be better not to proliferate the public API with these internal 
details which will be hard to remove later on. Anyone who reads this in the 
future, would wonder why there are two different offsets in 
`IncomingMessageEnvelope` contract. Is it possible to maintain the 
mapping(association we want for safe-checkpoint) internally within the samza 
framework side and not expose it to samza-users.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] shanthoosh commented on a change in pull request #942: Bugfix: Recent CSM refactor was causing some metrics to not be emitted. Fixed -restore-time metric.

2019-03-07 Thread GitBox
shanthoosh commented on a change in pull request #942: Bugfix: Recent CSM 
refactor was causing some metrics to not be emitted. Fixed -restore-time 
metric.
URL: https://github.com/apache/samza/pull/942#discussion_r263647669
 
 

 ##
 File path: 
samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
 ##
 @@ -573,6 +576,25 @@ private StorageEngine createStore(String storeName, 
TaskName taskName, TaskModel
 return 
this.sideInputStorageManagers.values().stream().collect(Collectors.toSet());
   }
 
+  /**
+   * Registers any CSM created metrics such as side-inputs related metrics, 
and standby-task related metrics.
+   * @param metricsReporters metrics reporters to use
+   */
+  public void registerMetrics(Map metricsReporters) {
 
 Review comment:
   > SystemConsumerMetrics to take a configurable group name
   
   Alerting in ingraphs and auto-scaling rules at LinkedIn are defined based 
upon the samza generated metric names. Group name is prefix and initial part of 
the generated metric name. It will be better to factor these use-cases into 
account and maintain backwards compatibility when changing the metric names. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] lakshmi-manasa-g commented on a change in pull request #938: SAMZA-1531: Support run.id in standalone for batch processing.

2019-03-07 Thread GitBox
lakshmi-manasa-g commented on a change in pull request #938: SAMZA-1531: 
Support run.id in standalone for batch processing.
URL: https://github.com/apache/samza/pull/938#discussion_r263661119
 
 

 ##
 File path: 
samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
 ##
 @@ -280,4 +368,47 @@ private void setApplicationFinalStatus() {
   }
 }
   }
+
+
+  /**
+   * Defines a specific implementation of {@link CoordinationSessionListener} 
for local {@link CoordinationUtils}
+   */
+  private final class LocalCoordinationSessionListener implements 
CoordinationSessionListener {
+
+/**
+ * If the coordination utils session has reconnected, check if global 
runid differs from local runid
+ * if it differs then shut down processor and throw exception
+ * else recreate ephemeral node corresponding to this processor inside the 
read write lock for runid
+ */
+@Override
+public void handleReconnect() {
+  LOG.info("Reconnected to coordination utils");
+  if(coordinationUtils == null) {
+return;
+  }
+  DistributedDataAccess runIdAccess = coordinationUtils.getDataAccess();
+  String globalRunId = (String) runIdAccess.readData(RUNID_PATH);
+  if( runId != globalRunId){
+processors.forEach(StreamProcessor::stop);
+cleanup();
+appStatus = ApplicationStatus.UnsuccessfulFinish;
+String msg = String.format("run.id %s on processor %s differs from the 
global run.id %s", runId, uid, globalRunId);
+throw new SamzaException(msg);
+  } else if(runIdLock != null) {
+String msg = String.format("Processor {} failed to get the lock for 
run.id", uid);
+try {
+  // acquire lock to recreate active processor ephemeral node
+  DistributedReadWriteLock.AccessType lockAccess = 
runIdLock.lock(LOCK_TIMEOUT, LOCK_TIMEOUT_UNIT);
+  if(lockAccess == DistributedReadWriteLock.AccessType.WRITE || 
lockAccess == DistributedReadWriteLock.AccessType.READ) {
+LOG.info("Processor {} creates its active processor ephemeral node 
in read write lock again" + uid);
+runIdLock.unlock();
+  } else {
+throw new SamzaException(msg);
+  }
+} catch (TimeoutException e) {
+  throw new SamzaException(msg, e);
 
 Review comment:
   Done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] lakshmi-manasa-g commented on a change in pull request #938: SAMZA-1531: Support run.id in standalone for batch processing.

2019-03-07 Thread GitBox
lakshmi-manasa-g commented on a change in pull request #938: SAMZA-1531: 
Support run.id in standalone for batch processing.
URL: https://github.com/apache/samza/pull/938#discussion_r263661134
 
 

 ##
 File path: 
samza-core/src/main/java/org/apache/samza/coordinator/CoordinationUtils.java
 ##
 @@ -39,6 +42,12 @@
 
   DistributedLockWithState getLockWithState(String lockId);
 
+  DistributedReadWriteLock getReadWriteLock();
+
+  DistributedDataAccess getDataAccess();
+
+  void setCoordinationSessionListener(CoordinationSessionListener 
sessionListener);
 
 Review comment:
   done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] lakshmi-manasa-g commented on a change in pull request #938: SAMZA-1531: Support run.id in standalone for batch processing.

2019-03-07 Thread GitBox
lakshmi-manasa-g commented on a change in pull request #938: SAMZA-1531: 
Support run.id in standalone for batch processing.
URL: https://github.com/apache/samza/pull/938#discussion_r26366
 
 

 ##
 File path: 
samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
 ##
 @@ -280,4 +368,47 @@ private void setApplicationFinalStatus() {
   }
 }
   }
+
+
+  /**
+   * Defines a specific implementation of {@link CoordinationSessionListener} 
for local {@link CoordinationUtils}
+   */
+  private final class LocalCoordinationSessionListener implements 
CoordinationSessionListener {
+
+/**
+ * If the coordination utils session has reconnected, check if global 
runid differs from local runid
+ * if it differs then shut down processor and throw exception
+ * else recreate ephemeral node corresponding to this processor inside the 
read write lock for runid
+ */
+@Override
+public void handleReconnect() {
+  LOG.info("Reconnected to coordination utils");
+  if(coordinationUtils == null) {
+return;
+  }
+  DistributedDataAccess runIdAccess = coordinationUtils.getDataAccess();
+  String globalRunId = (String) runIdAccess.readData(RUNID_PATH);
+  if( runId != globalRunId){
+processors.forEach(StreamProcessor::stop);
+cleanup();
+appStatus = ApplicationStatus.UnsuccessfulFinish;
+String msg = String.format("run.id %s on processor %s differs from the 
global run.id %s", runId, uid, globalRunId);
+throw new SamzaException(msg);
 
 Review comment:
   Done. doing shutdownLatch.countDown() also.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] lakshmi-manasa-g commented on a change in pull request #938: SAMZA-1531: Support run.id in standalone for batch processing.

2019-03-07 Thread GitBox
lakshmi-manasa-g commented on a change in pull request #938: SAMZA-1531: 
Support run.id in standalone for batch processing.
URL: https://github.com/apache/samza/pull/938#discussion_r263661126
 
 

 ##
 File path: 
samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
 ##
 @@ -280,4 +368,47 @@ private void setApplicationFinalStatus() {
   }
 }
   }
+
+
+  /**
+   * Defines a specific implementation of {@link CoordinationSessionListener} 
for local {@link CoordinationUtils}
+   */
+  private final class LocalCoordinationSessionListener implements 
CoordinationSessionListener {
+
+/**
+ * If the coordination utils session has reconnected, check if global 
runid differs from local runid
+ * if it differs then shut down processor and throw exception
+ * else recreate ephemeral node corresponding to this processor inside the 
read write lock for runid
+ */
+@Override
+public void handleReconnect() {
+  LOG.info("Reconnected to coordination utils");
+  if(coordinationUtils == null) {
+return;
+  }
+  DistributedDataAccess runIdAccess = coordinationUtils.getDataAccess();
+  String globalRunId = (String) runIdAccess.readData(RUNID_PATH);
+  if( runId != globalRunId){
+processors.forEach(StreamProcessor::stop);
+cleanup();
+appStatus = ApplicationStatus.UnsuccessfulFinish;
+String msg = String.format("run.id %s on processor %s differs from the 
global run.id %s", runId, uid, globalRunId);
+throw new SamzaException(msg);
+  } else if(runIdLock != null) {
+String msg = String.format("Processor {} failed to get the lock for 
run.id", uid);
+try {
+  // acquire lock to recreate active processor ephemeral node
+  DistributedReadWriteLock.AccessType lockAccess = 
runIdLock.lock(LOCK_TIMEOUT, LOCK_TIMEOUT_UNIT);
+  if(lockAccess == DistributedReadWriteLock.AccessType.WRITE || 
lockAccess == DistributedReadWriteLock.AccessType.READ) {
 
 Review comment:
   Session expiration does delete all the ephemeral nodes are cleaned up. But 
according to the ZkClient code, after an expiration two things can happen -- a 
successful reconnect in which case listener.handleNewSession is invoked or 
failure to reconnect in which case listener.handleSessionEstablishmentError is 
invoked. Handling both of these in LocalApplicationRunner now.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] lakshmi-manasa-g commented on a change in pull request #938: SAMZA-1531: Support run.id in standalone for batch processing.

2019-03-07 Thread GitBox
lakshmi-manasa-g commented on a change in pull request #938: SAMZA-1531: 
Support run.id in standalone for batch processing.
URL: https://github.com/apache/samza/pull/938#discussion_r263661144
 
 

 ##
 File path: 
samza-core/src/main/java/org/apache/samza/coordinator/CoordinationUtils.java
 ##
 @@ -39,6 +42,12 @@
 
   DistributedLockWithState getLockWithState(String lockId);
 
+  DistributedReadWriteLock getReadWriteLock();
 
 Review comment:
   Added lock id.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] rmatharu commented on a change in pull request #942: Bugfix: Recent CSM refactor was causing some metrics to not be emitted. Fixed -restore-time metric.

2019-03-07 Thread GitBox
rmatharu commented on a change in pull request #942: Bugfix: Recent CSM 
refactor was causing some metrics to not be emitted. Fixed -restore-time 
metric.
URL: https://github.com/apache/samza/pull/942#discussion_r263657205
 
 

 ##
 File path: 
samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
 ##
 @@ -573,6 +576,25 @@ private StorageEngine createStore(String storeName, 
TaskName taskName, TaskModel
 return 
this.sideInputStorageManagers.values().stream().collect(Collectors.toSet());
   }
 
+  /**
+   * Registers any CSM created metrics such as side-inputs related metrics, 
and standby-task related metrics.
+   * @param metricsReporters metrics reporters to use
+   */
+  public void registerMetrics(Map metricsReporters) {
 
 Review comment:
   Done. 
   It did not require a group-name change, so all existing metrics remain as is.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] sborya commented on a change in pull request #938: SAMZA-1531: Support run.id in standalone for batch processing.

2019-03-11 Thread GitBox
sborya commented on a change in pull request #938: SAMZA-1531: Support run.id 
in standalone for batch processing.
URL: https://github.com/apache/samza/pull/938#discussion_r264387433
 
 

 ##
 File path: 
samza-azure/src/main/java/org/apache/samza/coordinator/AzureCoordinationUtils.java
 ##
 @@ -52,6 +52,16 @@ public DistributedLockWithState getLockWithState(String 
lockId) {
 return new AzureLock(blob);
   }
 
+  @Override
+  public DistributedReadWriteLock getReadWriteLock(String lockId) throws 
UnsupportedOperationException {
 
 Review comment:
   you need to throw the exception here (instead of return null) and you may 
remove 'throws UOE' from the method signature.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] sborya commented on a change in pull request #938: SAMZA-1531: Support run.id in standalone for batch processing.

2019-03-11 Thread GitBox
sborya commented on a change in pull request #938: SAMZA-1531: Support run.id 
in standalone for batch processing.
URL: https://github.com/apache/samza/pull/938#discussion_r264475902
 
 

 ##
 File path: 
samza-core/src/main/java/org/apache/samza/zk/ZkDistributedReadWriteLock.java
 ##
 @@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.zk;
+
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.I0Itec.zkclient.IZkStateListener;
+import org.apache.samza.SamzaException;
+import org.apache.samza.coordinator.DistributedReadWriteLock;
+import org.apache.zookeeper.Watcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Distributed lock primitive for Zookeeper.
+ */
+public class ZkDistributedReadWriteLock implements DistributedReadWriteLock {
+
+  public static final Logger LOG = 
LoggerFactory.getLogger(ZkDistributedReadWriteLock.class);
+  private static final String PARTICIPANTS_PATH = "participants";
+  private static final String PROCESSORS_PATH = "processors";
+  private final ZkUtils zkUtils;
+  private final String lockPath;
+  private final String particpantsPath;
+  private final String processorsPath;
+  private final String participantId;
+  private final ZkKeyBuilder keyBuilder;
+  private final Random random = new Random();
+  private String activeParticipantPath = null;
+  private String activeProcessorPath = null;
+  private Object mutex;
+  private Boolean isInCriticalSection = false;
+  private Boolean isStateLost = false;
+
+  public ZkDistributedReadWriteLock(String participantId, ZkUtils zkUtils, 
String lockId) {
+if (zkUtils == null) {
+  throw new RuntimeException("Cannot operate ZkDistributedReadWriteLock 
without ZkUtils.");
+}
+this.zkUtils = zkUtils;
+this.participantId = participantId;
+this.keyBuilder = zkUtils.getKeyBuilder();
+lockPath = String.format("%s/readWriteLock-%s", 
keyBuilder.getRootPath(),lockId);
+particpantsPath = String.format("%s/%s", lockPath, PARTICIPANTS_PATH);
+processorsPath = String.format("%s/%s", lockPath, PROCESSORS_PATH);
+zkUtils.validatePaths(new String[] {lockPath, particpantsPath, 
processorsPath});
+mutex = new Object();
+zkUtils.getZkClient().subscribeChildChanges(particpantsPath, new 
ParticipantChangeHandler(zkUtils));
+zkUtils.getZkClient().subscribeStateChanges(new 
ZkSessionStateChangedListener());
+  }
+
+  /**
+   * Tries to acquire a lock in order to generate run.id. On failure to 
acquire lock, it keeps trying until the lock times out.
+   * Creates a sequential ephemeral node under "participants" to acquire the 
lock.
+   * If the path of this node has the lowest sequence number,
+   * it creates a sequential ephemeral node under "processors" and checks the 
number of nodes under "processors"
+   * if there is only one node under "processors", a WRITE access lock is 
acquired
+   * else a READ access lock is acquired
+   * @param timeout Duration of lock acquiring timeout.
+   * @param unit Unit of the timeout defined above.
+   * @return AccessType.READ/WRITE if lock is acquired successfully, 
AccessType.NONE if it times out.
+   */
+  @Override
+  public AccessType lock(long timeout, TimeUnit unit)
+  throws TimeoutException {
+
+activeParticipantPath = 
zkUtils.getZkClient().createEphemeralSequential(particpantsPath + "/", 
participantId);
+
+//Start timer for timeout
+long startTime = System.currentTimeMillis();
+long lockTimeout = TimeUnit.MILLISECONDS.convert(timeout, unit);
+
+while((System.currentTimeMillis() - startTime) < lockTimeout) {
+  synchronized (mutex) {
+AccessType accessType = checkAndAcquireLock();
+if(accessType != AccessType.NONE) {
+  isInCriticalSection = true;
+  if(isStateLost) {
+throw new SamzaException("Lock's state lost due to connection 
expiry");
+  }
+  return accessType;
+} else {
 
 Review comment:
   when this happens?


This is an automated message from the 

[GitHub] [samza] sborya commented on a change in pull request #938: SAMZA-1531: Support run.id in standalone for batch processing.

2019-03-11 Thread GitBox
sborya commented on a change in pull request #938: SAMZA-1531: Support run.id 
in standalone for batch processing.
URL: https://github.com/apache/samza/pull/938#discussion_r264387509
 
 

 ##
 File path: 
samza-azure/src/main/java/org/apache/samza/coordinator/AzureCoordinationUtils.java
 ##
 @@ -52,6 +52,16 @@ public DistributedLockWithState getLockWithState(String 
lockId) {
 return new AzureLock(blob);
   }
 
+  @Override
+  public DistributedReadWriteLock getReadWriteLock(String lockId) throws 
UnsupportedOperationException {
+return null;
+  }
+
+  @Override
+  public DistributedDataAccess getDataAccess(DistributedDataStateListener 
listener) throws UnsupportedOperationException {
 
 Review comment:
   see above.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] sborya commented on a change in pull request #938: SAMZA-1531: Support run.id in standalone for batch processing.

2019-03-11 Thread GitBox
sborya commented on a change in pull request #938: SAMZA-1531: Support run.id 
in standalone for batch processing.
URL: https://github.com/apache/samza/pull/938#discussion_r264473744
 
 

 ##
 File path: 
samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
 ##
 @@ -280,4 +376,68 @@ private void setApplicationFinalStatus() {
   }
 }
   }
+
+
+  /**
+   * Defines a specific implementation of {@link DistributedDataStateListener} 
for local {@link DistributedDataAccess}
+   */
+  private final class LocalDistributedDataStateListener implements 
DistributedDataStateListener {
+
+/**
+ * upon reconnect check if global runid differs from local runid
+ */
+@Override
+public void handleReconnect() {
+  if(coordinationUtils == null || runIdLock == null || runIdAccess == null 
) {
+LOG.warn("Stopping processor {} and shutting down due to failure 
reading global runid after reconnect", uid);
 
 Review comment:
   where is it reading the global runid?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] sborya commented on a change in pull request #938: SAMZA-1531: Support run.id in standalone for batch processing.

2019-03-11 Thread GitBox
sborya commented on a change in pull request #938: SAMZA-1531: Support run.id 
in standalone for batch processing.
URL: https://github.com/apache/samza/pull/938#discussion_r264468919
 
 

 ##
 File path: 
samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
 ##
 @@ -59,13 +67,23 @@
 public class LocalApplicationRunner implements ApplicationRunner {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(LocalApplicationRunner.class);
+  private static final String RUNID_PATH = "runId";
+  private static final String APPLICATION_RUNNER_PATH_SUFFIX = 
"/ApplicationRunnerData";
+  private static final String RUNID_LOCK_ID = "runId";
+  private static final int LOCK_TIMEOUT = 10;
+  private static final TimeUnit LOCK_TIMEOUT_UNIT = TimeUnit.MINUTES;
 
   private final ApplicationDescriptorImpl 
appDesc;
   private final LocalJobPlanner planner;
   private final Set processors = 
ConcurrentHashMap.newKeySet();
   private final CountDownLatch shutdownLatch = new CountDownLatch(1);
   private final AtomicInteger numProcessorsToStart = new AtomicInteger();
   private final AtomicReference failure = new AtomicReference<>();
+  private final String uid = UUID.randomUUID().toString();
+  private CoordinationUtils coordinationUtils = null;
 
 Review comment:
   make final.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] sborya commented on a change in pull request #938: SAMZA-1531: Support run.id in standalone for batch processing.

2019-03-11 Thread GitBox
sborya commented on a change in pull request #938: SAMZA-1531: Support run.id 
in standalone for batch processing.
URL: https://github.com/apache/samza/pull/938#discussion_r264474066
 
 

 ##
 File path: 
samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
 ##
 @@ -280,4 +376,68 @@ private void setApplicationFinalStatus() {
   }
 }
   }
+
+
+  /**
+   * Defines a specific implementation of {@link DistributedDataStateListener} 
for local {@link DistributedDataAccess}
+   */
+  private final class LocalDistributedDataStateListener implements 
DistributedDataStateListener {
+
+/**
+ * upon reconnect check if global runid differs from local runid
+ */
+@Override
+public void handleReconnect() {
+  if(coordinationUtils == null || runIdLock == null || runIdAccess == null 
) {
+LOG.warn("Stopping processor {} and shutting down due to failure 
reading global runid after reconnect", uid);
+stopProcessingAndShutDown();
+return;
+  }
+  try {
+// acquire lock to write or read run.id
+DistributedReadWriteLock.AccessType lockAccess = 
runIdLock.lock(LOCK_TIMEOUT, LOCK_TIMEOUT_UNIT);
+if(lockAccess != DistributedReadWriteLock.AccessType.NONE) {
+  String globalRunId = (String) runIdAccess.readData(RUNID_PATH, new 
LocalDistributedDataWatcher());
+  runIdLock.unlock();
+  if(runId != globalRunId) {
+LOG.warn("Stopping processor {} and shutting down as local runid 
{} differs from global runid {} after session reconnect.", uid, runId,
+globalRunId);
+stopProcessingAndShutDown();
+  }
+} else {
+  LOG.warn("Stopping processor {} and shutting down due to failure 
reading global runid after reconnect", uid);
+  stopProcessingAndShutDown();
+}
+  } catch (TimeoutException e) {
+LOG.warn("Stopping processor {} and shutting down due to failure 
reading global runid after reconnect", uid);
+stopProcessingAndShutDown();
+  }
+}
+
+@Override
+public void handleReconnectFailedError() {
+  LOG.warn("Stopping processor {} and shutting down due to failure to 
reconnect", uid);
+  stopProcessingAndShutDown();
+}
+  }
+
+  /**
+   * Defines a specific implementation of {@link DistributedDataWatcher} for 
local {@link DistributedDataAccess}
+   */
+  private final class LocalDistributedDataWatcher implements 
DistributedDataWatcher {
+@Override
+public void handleDataChange(Object newData) {
+  if(runId != (String) newData) {
 
 Review comment:
   you should use equal()


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] sborya commented on a change in pull request #938: SAMZA-1531: Support run.id in standalone for batch processing.

2019-03-11 Thread GitBox
sborya commented on a change in pull request #938: SAMZA-1531: Support run.id 
in standalone for batch processing.
URL: https://github.com/apache/samza/pull/938#discussion_r264469347
 
 

 ##
 File path: 
samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
 ##
 @@ -77,23 +95,80 @@
*/
   public LocalApplicationRunner(SamzaApplication app, Config config) {
 this.appDesc = ApplicationDescriptorUtil.getAppDescriptor(app, config);
-this.planner = new LocalJobPlanner(appDesc);
+this.coordinationUtils = getCoordinationUtils(config);
+getRunId();
+this.planner = new LocalJobPlanner(appDesc, coordinationUtils, uid, runId);
   }
 
   /**
* Constructor only used in unit test to allow injection of {@link 
LocalJobPlanner}
*/
   @VisibleForTesting
-  LocalApplicationRunner(ApplicationDescriptorImpl appDesc, LocalJobPlanner planner) {
+  LocalApplicationRunner(ApplicationDescriptorImpl appDesc, LocalJobPlanner planner, CoordinationUtils 
coordinationUtils) {
 this.appDesc = appDesc;
 this.planner = planner;
+this.coordinationUtils = coordinationUtils;
+  }
+
+  private CoordinationUtils getCoordinationUtils(Config config) {
+JobCoordinatorConfig jcConfig = new JobCoordinatorConfig(config);
+String coordinationId = new ApplicationConfig(config).getGlobalAppId() + 
APPLICATION_RUNNER_PATH_SUFFIX;
+return 
jcConfig.getCoordinationUtilsFactory().getCoordinationUtils(coordinationId, 
uid, config);
+  }
+
+  private void getRunId(){
+Boolean isAppModeBatch = new 
ApplicationConfig(appDesc.getConfig()).getAppMode() == 
ApplicationConfig.ApplicationMode.BATCH;
+if(coordinationUtils == null ||  !isAppModeBatch) {
+  return;
+}
+
+runIdLock = coordinationUtils.getReadWriteLock(RUNID_LOCK_ID);
+if(runIdLock == null) {
+  LOG.warn("Processor {} failed to create the lock for run.id generation", 
uid);
 
 Review comment:
   better to find more specific info than UID, uid is not very helpful.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] sborya commented on a change in pull request #938: SAMZA-1531: Support run.id in standalone for batch processing.

2019-03-11 Thread GitBox
sborya commented on a change in pull request #938: SAMZA-1531: Support run.id 
in standalone for batch processing.
URL: https://github.com/apache/samza/pull/938#discussion_r264474872
 
 

 ##
 File path: 
samza-core/src/main/java/org/apache/samza/zk/ZkDistributedDataAccess.java
 ##
 @@ -0,0 +1,61 @@
+package org.apache.samza.zk;
+
+import org.apache.samza.coordinator.DistributedDataAccess;
+import org.apache.samza.coordinator.DistributedDataWatcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class ZkDistributedDataAccess implements DistributedDataAccess {
+  public static final Logger LOG = 
LoggerFactory.getLogger(ZkDistributedDataAccess.class);
+  private final ZkUtils zkUtils;
+  private final ZkKeyBuilder keyBuilder;
+  private DistributedDataWatcher watcher;
+
+  public ZkDistributedDataAccess(ZkUtils zkUtils) {
+if (zkUtils == null) {
+  throw new RuntimeException("Cannot operate ZkDistributedDataAccess 
without ZkUtils.");
+}
+this.zkUtils = zkUtils;
+this.keyBuilder = zkUtils.getKeyBuilder();
+  }
+
+  public Object readData(String key, DistributedDataWatcher watcher) {
+this.watcher = watcher;
+String absolutePath = keyBuilder.getRootPath() + "/" + key;
+zkUtils.getZkClient().subscribeDataChanges(absolutePath, new 
ZkDistributedDataChangeHandler(zkUtils));
+if(!zkUtils.getZkClient().exists(absolutePath)) {
+  return null;
+}
+return zkUtils.getZkClient().readData(absolutePath);
+  }
+
+  public void writeData(String key, Object data, DistributedDataWatcher 
watcher) {
+this.watcher = watcher;
+String absolutePath = keyBuilder.getRootPath() + "/" + key;
+zkUtils.getZkClient().subscribeDataChanges(absolutePath, new 
ZkDistributedDataChangeHandler(zkUtils));
+zkUtils.validatePaths(new String[]{absolutePath});
+zkUtils.writeData(absolutePath, data);
+return;
+  }
+
+  class ZkDistributedDataChangeHandler extends 
ZkUtils.GenerationAwareZkDataListener {
+
+public ZkDistributedDataChangeHandler(ZkUtils zkUtils) {
+  super(zkUtils, "ZkDistributedDataChangeHandler");
+}
+
+/**
+ * Invoked when there is a change to the  data z-node.
 
 Review comment:
   nit. extra space.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] sborya commented on a change in pull request #938: SAMZA-1531: Support run.id in standalone for batch processing.

2019-03-11 Thread GitBox
sborya commented on a change in pull request #938: SAMZA-1531: Support run.id 
in standalone for batch processing.
URL: https://github.com/apache/samza/pull/938#discussion_r264474987
 
 

 ##
 File path: 
samza-core/src/main/java/org/apache/samza/zk/ZkDistributedReadWriteLock.java
 ##
 @@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.zk;
+
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.I0Itec.zkclient.IZkStateListener;
+import org.apache.samza.SamzaException;
+import org.apache.samza.coordinator.DistributedReadWriteLock;
+import org.apache.zookeeper.Watcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Distributed lock primitive for Zookeeper.
+ */
+public class ZkDistributedReadWriteLock implements DistributedReadWriteLock {
+
+  public static final Logger LOG = 
LoggerFactory.getLogger(ZkDistributedReadWriteLock.class);
+  private static final String PARTICIPANTS_PATH = "participants";
+  private static final String PROCESSORS_PATH = "processors";
+  private final ZkUtils zkUtils;
+  private final String lockPath;
+  private final String particpantsPath;
+  private final String processorsPath;
+  private final String participantId;
+  private final ZkKeyBuilder keyBuilder;
+  private final Random random = new Random();
+  private String activeParticipantPath = null;
+  private String activeProcessorPath = null;
+  private Object mutex;
 
 Review comment:
   final


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] sborya commented on a change in pull request #938: SAMZA-1531: Support run.id in standalone for batch processing.

2019-03-11 Thread GitBox
sborya commented on a change in pull request #938: SAMZA-1531: Support run.id 
in standalone for batch processing.
URL: https://github.com/apache/samza/pull/938#discussion_r264414547
 
 

 ##
 File path: 
samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
 ##
 @@ -59,13 +67,23 @@
 public class LocalApplicationRunner implements ApplicationRunner {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(LocalApplicationRunner.class);
+  private static final String RUNID_PATH = "runId";
+  private static final String APPLICATION_RUNNER_PATH_SUFFIX = 
"/ApplicationRunnerData";
+  private static final String RUNID_LOCK_ID = "runId";
+  private static final int LOCK_TIMEOUT = 10;
 
 Review comment:
   I prefer LOCK_TIMEOUT_MS here and use TimeUnit.MINUTES directly (no need for 
constant there)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] sborya commented on a change in pull request #938: SAMZA-1531: Support run.id in standalone for batch processing.

2019-03-11 Thread GitBox
sborya commented on a change in pull request #938: SAMZA-1531: Support run.id 
in standalone for batch processing.
URL: https://github.com/apache/samza/pull/938#discussion_r264474790
 
 

 ##
 File path: 
samza-core/src/main/java/org/apache/samza/zk/ZkDistributedDataAccess.java
 ##
 @@ -0,0 +1,61 @@
+package org.apache.samza.zk;
+
+import org.apache.samza.coordinator.DistributedDataAccess;
+import org.apache.samza.coordinator.DistributedDataWatcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class ZkDistributedDataAccess implements DistributedDataAccess {
+  public static final Logger LOG = 
LoggerFactory.getLogger(ZkDistributedDataAccess.class);
+  private final ZkUtils zkUtils;
+  private final ZkKeyBuilder keyBuilder;
+  private DistributedDataWatcher watcher;
+
+  public ZkDistributedDataAccess(ZkUtils zkUtils) {
+if (zkUtils == null) {
+  throw new RuntimeException("Cannot operate ZkDistributedDataAccess 
without ZkUtils.");
+}
+this.zkUtils = zkUtils;
+this.keyBuilder = zkUtils.getKeyBuilder();
+  }
+
+  public Object readData(String key, DistributedDataWatcher watcher) {
 
 Review comment:
   Won't watcher override the one set in writeData? If they are the same (or 
only one should be there) then make it final and pass it in the constructor.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] sborya commented on a change in pull request #938: SAMZA-1531: Support run.id in standalone for batch processing.

2019-03-11 Thread GitBox
sborya commented on a change in pull request #938: SAMZA-1531: Support run.id 
in standalone for batch processing.
URL: https://github.com/apache/samza/pull/938#discussion_r264472956
 
 

 ##
 File path: 
samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
 ##
 @@ -200,6 +279,22 @@ static String createProcessorId(ApplicationConfig 
appConfig) {
 }
   }
 
+  private void cleanup() {
+if(runIdLock != null) {
+  runIdLock.cleanState();
+}
+if(coordinationUtils != null) {
+  coordinationUtils.close();
+}
+  }
+
+  private void stopProcessingAndShutDown() {
+processors.forEach(StreamProcessor::stop);
+cleanup();
+appStatus = ApplicationStatus.UnsuccessfulFinish;
 
 Review comment:
   qq. why is it Unsuccessful?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] sborya commented on a change in pull request #938: SAMZA-1531: Support run.id in standalone for batch processing.

2019-03-11 Thread GitBox
sborya commented on a change in pull request #938: SAMZA-1531: Support run.id 
in standalone for batch processing.
URL: https://github.com/apache/samza/pull/938#discussion_r264469001
 
 

 ##
 File path: 
samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
 ##
 @@ -59,13 +67,23 @@
 public class LocalApplicationRunner implements ApplicationRunner {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(LocalApplicationRunner.class);
+  private static final String RUNID_PATH = "runId";
+  private static final String APPLICATION_RUNNER_PATH_SUFFIX = 
"/ApplicationRunnerData";
+  private static final String RUNID_LOCK_ID = "runId";
+  private static final int LOCK_TIMEOUT = 10;
+  private static final TimeUnit LOCK_TIMEOUT_UNIT = TimeUnit.MINUTES;
 
   private final ApplicationDescriptorImpl 
appDesc;
   private final LocalJobPlanner planner;
   private final Set processors = 
ConcurrentHashMap.newKeySet();
   private final CountDownLatch shutdownLatch = new CountDownLatch(1);
   private final AtomicInteger numProcessorsToStart = new AtomicInteger();
   private final AtomicReference failure = new AtomicReference<>();
+  private final String uid = UUID.randomUUID().toString();
+  private CoordinationUtils coordinationUtils = null;
+  private DistributedReadWriteLock runIdLock = null;
+  private DistributedDataAccess runIdAccess = null;
+  private String runId = null;
 
 Review comment:
   make final. Assign null if needed in the constructor.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] sborya commented on a change in pull request #938: SAMZA-1531: Support run.id in standalone for batch processing.

2019-03-11 Thread GitBox
sborya commented on a change in pull request #938: SAMZA-1531: Support run.id 
in standalone for batch processing.
URL: https://github.com/apache/samza/pull/938#discussion_r264388397
 
 

 ##
 File path: 
samza-core/src/main/java/org/apache/samza/coordinator/DistributedDataStateListener.java
 ##
 @@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator;
+
+/**
+ * Listen to changes in state of distributed data connection
+ */
+public interface DistributedDataStateListener {
+  /**
+   * Called when the connected has reconnected after a disconnect.
 
 Review comment:
   'connected'? please rephrase.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] sborya commented on a change in pull request #938: SAMZA-1531: Support run.id in standalone for batch processing.

2019-03-11 Thread GitBox
sborya commented on a change in pull request #938: SAMZA-1531: Support run.id 
in standalone for batch processing.
URL: https://github.com/apache/samza/pull/938#discussion_r264470992
 
 

 ##
 File path: 
samza-core/src/main/java/org/apache/samza/zk/ZkDistributedReadWriteLock.java
 ##
 @@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.zk;
+
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.I0Itec.zkclient.IZkStateListener;
+import org.apache.samza.SamzaException;
+import org.apache.samza.coordinator.DistributedReadWriteLock;
+import org.apache.zookeeper.Watcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Distributed lock primitive for Zookeeper.
+ */
+public class ZkDistributedReadWriteLock implements DistributedReadWriteLock {
 
 Review comment:
   Is this a new code or moved from somewhere?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] sborya commented on a change in pull request #938: SAMZA-1531: Support run.id in standalone for batch processing.

2019-03-11 Thread GitBox
sborya commented on a change in pull request #938: SAMZA-1531: Support run.id 
in standalone for batch processing.
URL: https://github.com/apache/samza/pull/938#discussion_r264467907
 
 

 ##
 File path: 
samza-core/src/main/java/org/apache/samza/execution/LocalJobPlanner.java
 ##
 @@ -119,7 +119,12 @@ private void createStreams(String planId, 
List intStreams, StreamMan
   return;
 }
 
-DistributedLockWithState lockWithState = 
coordinationUtils.getLockWithState(planId);
+Boolean isAppModeBatch = new ApplicationConfig(userConfig).getAppMode() == 
ApplicationConfig.ApplicationMode.BATCH;
+String lockId = planId;
+if(isAppModeBatch && runId != null) {
 
 Review comment:
   add comment


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] dnishimura opened a new pull request #964: SAMZA-2138: Incorrect logging when task to container mapping is written.

2019-03-21 Thread GitBox
dnishimura opened a new pull request #964: SAMZA-2138: Incorrect logging when 
task to container mapping is written.
URL: https://github.com/apache/samza/pull/964
 
 
   Example before this fix:
   ```
   2019-03-21 08:36:34.810 [main] JobModelManager$ [INFO] Storing ssp: 
Partition 1 and task: 0 into metadata store
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] dnishimura commented on issue #964: SAMZA-2138: Incorrect logging when task to container mapping is written.

2019-03-21 Thread GitBox
dnishimura commented on issue #964: SAMZA-2138: Incorrect logging when task to 
container mapping is written.
URL: https://github.com/apache/samza/pull/964#issuecomment-475287206
 
 
   @shanthoosh please take a look.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] mynameborat commented on a change in pull request #951: SAMZA-2127: Upgrade to Kafka 2.0

2019-03-21 Thread GitBox
mynameborat commented on a change in pull request #951: SAMZA-2127: Upgrade to 
Kafka 2.0
URL: https://github.com/apache/samza/pull/951#discussion_r267873046
 
 

 ##
 File path: 
samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
 ##
 @@ -301,12 +260,6 @@ class TestKafkaSystemAdmin {
 val spec = StreamSpec.createCoordinatorStreamSpec(topic, "kafka")
 systemAdmin.createStream(spec)
 validateTopic(topic, 1)
-val topicMetadataMap = TopicMetadataCache.getTopicMetadata(Set(topic), 
"kafka", metadataStore.getTopicInfo)
-assertTrue(topicMetadataMap.contains(topic))
-val topicMetadata = topicMetadataMap(topic)
-val partitionMetadata = topicMetadata.partitionsMetadata.head
-assertEquals(0, partitionMetadata.partitionId)
-assertEquals(3, partitionMetadata.replicas.size)
 
 Review comment:
   Agreed. Let me do the Mockito way  


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] cameronlee314 commented on a change in pull request #951: SAMZA-2127: Upgrade to Kafka 2.0

2019-03-21 Thread GitBox
cameronlee314 commented on a change in pull request #951: SAMZA-2127: Upgrade 
to Kafka 2.0
URL: https://github.com/apache/samza/pull/951#discussion_r267885428
 
 

 ##
 File path: 
samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
 ##
 @@ -133,43 +130,36 @@ object StreamTaskTestUtil {
 config.put(ProducerConfig.LINGER_MS_CONFIG, "0")
 val producerConfig = new KafkaProducerConfig("kafka", "i001", config)
 
+adminClient = AdminClient.create(config.asInstanceOf[util.Map[String, 
Object]])
 producer = new KafkaProducer[Array[Byte], 
Array[Byte]](producerConfig.getProducerProperties)
 
 createTopics
 validateTopics
   }
 
   def createTopics {
-AdminUtils.createTopic(
-  zkUtils,
-  INPUT_TOPIC,
-  TOTAL_TASK_NAMES,
-  REPLICATION_FACTOR)
+adminClient.createTopics(Collections.singleton(new NewTopic(INPUT_TOPIC, 
TOTAL_TASK_NAMES, REPLICATION_FACTOR.shortValue(
   }
 
   def validateTopics {
 
 Review comment:
   Not sure if this is feasible/practical to combine, but it looks like some of 
this code is similar to code from `TestKafkaSystemAdmin`, such as 
`validateTopics`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] cameronlee314 commented on a change in pull request #951: SAMZA-2127: Upgrade to Kafka 2.0

2019-03-21 Thread GitBox
cameronlee314 commented on a change in pull request #951: SAMZA-2127: Upgrade 
to Kafka 2.0
URL: https://github.com/apache/samza/pull/951#discussion_r267876940
 
 

 ##
 File path: 
samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
 ##
 @@ -89,49 +91,49 @@ object TestKafkaSystemAdmin extends KafkaServerTestHarness 
{
 
   @AfterClass
   override def tearDown() {
+adminClient.close()
 systemAdmin.stop()
 producer.close()
 super.tearDown()
   }
 
   def createTopic(topicName: String, partitionCount: Int) {
-createTopic(topicName, partitionCount, REPLICATION_FACTOR)
+val topicCreationFutures = 
adminClient.createTopics(Collections.singletonList(new NewTopic(topicName, 
partitionCount, REPLICATION_FACTOR.shortValue(
+topicCreationFutures.all().get() // wait on the future to make sure create 
topic call finishes
   }
 
   def validateTopic(topic: String, expectedPartitionCount: Int) {
 var done = false
 var retries = 0
 
-while (!done && retries < 100) {
+while (!done && retries < 10) {
   try {
-  if (!zkClient.topicExists(topic)) {
-System.err.println("Test topic %s not found. Waiting and 
retrying." format topic)
-retries += 1
-Thread.sleep(500)
-  }
-
-  val topicDescription =
-
adminClient.describeTopics(JavaConverters.asJavaCollectionConverter(Set(topic)).asJavaCollection)
-  .all().get().get(topic)
+val topicDescriptionFutures = adminClient.describeTopics(
+
JavaConverters.asJavaCollectionConverter(Set(topic)).asJavaCollection).all()
+val topicDescription = topicDescriptionFutures.get(500, 
TimeUnit.MILLISECONDS)
+.get(topic)
 
 done = expectedPartitionCount == topicDescription.partitions().size()
 
 Review comment:
   1. Can `topicDescription` be null?
   2. If done is true, then can we break out of this loop?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] cameronlee314 commented on a change in pull request #951: SAMZA-2127: Upgrade to Kafka 2.0

2019-03-21 Thread GitBox
cameronlee314 commented on a change in pull request #951: SAMZA-2127: Upgrade 
to Kafka 2.0
URL: https://github.com/apache/samza/pull/951#discussion_r267882631
 
 

 ##
 File path: 
samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
 ##
 @@ -159,42 +152,25 @@ public void setUp() {
 zkUtils = new ZkUtils(zkKeyBuilder, zkClient, ZK_CONNECTION_TIMEOUT_MS, 
ZK_SESSION_TIMEOUT_MS, new NoOpMetricsRegistry());
 zkUtils.connect();
 
-for (String kafkaTopic : ImmutableList.of(inputKafkaTopic, 
outputKafkaTopic)) {
-  LOGGER.info("Creating kafka topic: {}.", kafkaTopic);
-  TestUtils.createTopic(kafkaZkClient(), kafkaTopic, 5, 1, servers(), new 
Properties());
-  if (AdminUtils.topicExists(zkUtils(), kafkaTopic)) {
-LOGGER.info("Topic: {} was created", kafkaTopic);
-  } else {
-Assert.fail(String.format("Unable to create kafka topic: %s.", 
kafkaTopic));
-  }
-}
-for (String kafkaTopic : ImmutableList.of(inputSinglePartitionKafkaTopic, 
outputSinglePartitionKafkaTopic)) {
-  LOGGER.info("Creating kafka topic: {}.", kafkaTopic);
-  TestUtils.createTopic(kafkaZkClient(), kafkaTopic, 1, 1, servers(), new 
Properties());
-  if (AdminUtils.topicExists(zkUtils(), kafkaTopic)) {
-LOGGER.info("Topic: {} was created", kafkaTopic);
-  } else {
-Assert.fail(String.format("Unable to create kafka topic: %s.", 
kafkaTopic));
-  }
-}
+List newTopics =
+ImmutableList.of(inputKafkaTopic, outputKafkaTopic, 
inputSinglePartitionKafkaTopic, outputSinglePartitionKafkaTopic)
+.stream()
+.map(topic -> new NewTopic(topic, 5, (short) 1))
+.collect(Collectors.toList());
+
+createTopics(newTopics);
 
 Review comment:
   Should this fail if `createTopics` returns false?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] cameronlee314 commented on a change in pull request #951: SAMZA-2127: Upgrade to Kafka 2.0

2019-03-21 Thread GitBox
cameronlee314 commented on a change in pull request #951: SAMZA-2127: Upgrade 
to Kafka 2.0
URL: https://github.com/apache/samza/pull/951#discussion_r267881671
 
 

 ##
 File path: 
samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTestHarness.java
 ##
 @@ -295,4 +227,20 @@ public Config getConfig() {
   return this.config;
 }
   }
+
+  @Override
+  protected KafkaProducer createProducer() {
 
 Review comment:
   It seems a bit odd that this test harness needs slightly different 
serialization/deserialization for createProducer/createConsumer compared to 
`IntegrationTestHarness`. If it is difficult to make them consistent, could you 
please put a comment about why you have these overrides?
   You could also consider having helpers in `IntegrationTestHarness` to build 
the configs, just in case someone needs to add something in the future.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] dnishimura commented on issue #964: SAMZA-2138: Incorrect logging when task to container mapping is written.

2019-03-21 Thread GitBox
dnishimura commented on issue #964: SAMZA-2138: Incorrect logging when task to 
container mapping is written.
URL: https://github.com/apache/samza/pull/964#issuecomment-475401665
 
 
   It happened to me before. Not sure why. Thanks for merging it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] shanthoosh merged pull request #964: SAMZA-2138: Incorrect logging when task to container mapping is written.

2019-03-21 Thread GitBox
shanthoosh merged pull request #964: SAMZA-2138: Incorrect logging when task to 
container mapping is written.
URL: https://github.com/apache/samza/pull/964
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] shanthoosh edited a comment on issue #964: SAMZA-2138: Incorrect logging when task to container mapping is written.

2019-03-21 Thread GitBox
shanthoosh edited a comment on issue #964: SAMZA-2138: Incorrect logging when 
task to container mapping is written.
URL: https://github.com/apache/samza/pull/964#issuecomment-475401240
 
 
   @dnishimura 
   This build is green in travis. Here's the successful travis build associated 
with this PR: 
https://travis-ci.org/apache/samza/builds/509502538?utm_source=github_status_medium=notification
 . However, it is not updated here in Github. 
   
   I'm merging this PR to trunk.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] rmatharu commented on issue #952: Improved standby-aware container allocation for active-containers on job redeploys

2019-03-22 Thread GitBox
rmatharu commented on issue #952: Improved standby-aware container allocation 
for active-containers on job redeploys
URL: https://github.com/apache/samza/pull/952#issuecomment-475692992
 
 
   Addressed all comments. 
   thumbsup => comment addressed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] vjagadish1989 opened a new pull request #965: Fix a typo in LinkedIn case-study

2019-03-22 Thread GitBox
vjagadish1989 opened a new pull request #965: Fix a typo in LinkedIn case-study
URL: https://github.com/apache/samza/pull/965
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] asfgit closed pull request #965: Minor: Fix a typo in LinkedIn case-study

2019-03-22 Thread GitBox
asfgit closed pull request #965: Minor: Fix a typo in LinkedIn case-study
URL: https://github.com/apache/samza/pull/965
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] cameronlee314 commented on a change in pull request #951: SAMZA-2127: Upgrade to Kafka 2.0

2019-03-21 Thread GitBox
cameronlee314 commented on a change in pull request #951: SAMZA-2127: Upgrade 
to Kafka 2.0
URL: https://github.com/apache/samza/pull/951#discussion_r267871144
 
 

 ##
 File path: 
samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
 ##
 @@ -301,12 +260,6 @@ class TestKafkaSystemAdmin {
 val spec = StreamSpec.createCoordinatorStreamSpec(topic, "kafka")
 systemAdmin.createStream(spec)
 validateTopic(topic, 1)
-val topicMetadataMap = TopicMetadataCache.getTopicMetadata(Set(topic), 
"kafka", metadataStore.getTopicInfo)
-assertTrue(topicMetadataMap.contains(topic))
-val topicMetadata = topicMetadataMap(topic)
-val partitionMetadata = topicMetadata.partitionsMetadata.head
-assertEquals(0, partitionMetadata.partitionId)
-assertEquals(3, partitionMetadata.replicas.size)
 
 Review comment:
   The end-to-end check seems unnecessary anyways for a unit test.
   However, since you are removing that check, it might be good to have a test 
which uses Mockito to verify that the correct configurations are passed to the 
admin client when creating topics. If I recall correctly, in the past, we may 
have had some issues which involved replication factors (not sure if they were 
this exact flow though).


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] dnishimura commented on issue #964: SAMZA-2138: Incorrect logging when task to container mapping is written.

2019-03-21 Thread GitBox
dnishimura commented on issue #964: SAMZA-2138: Incorrect logging when task to 
container mapping is written.
URL: https://github.com/apache/samza/pull/964#issuecomment-475350150
 
 
   Thanks @shanthoosh. Please merge when you have some time.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] rmatharu commented on a change in pull request #952: Improved standby-aware container allocation for active-containers on job redeploys

2019-03-21 Thread GitBox
rmatharu commented on a change in pull request #952: Improved standby-aware 
container allocation for active-containers on job redeploys
URL: https://github.com/apache/samza/pull/952#discussion_r268030497
 
 

 ##
 File path: 
samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
 ##
 @@ -226,15 +244,35 @@ private void initiateActiveContainerFailover(String 
containerID, String resource
 // use this standby if there was no previous failover for which this 
standbyResource was used
 if (!(failoverMetadata.isPresent() && 
failoverMetadata.get().isStandbyResourceUsed(standbyContainerResource.getResourceID(
 {
 
-  log.info("Returning standby container {} in running state for active 
container {}", standbyContainerID,
-  activeContainerID);
-  return Optional.of(new Entry<>(standbyContainerID, 
standbyContainerResource));
+  log.info("Returning standby container {} in running state on host {} 
for active container {}", standbyContainerID, 
standbyContainerResource.getHost(), activeContainerID);
+  return standbyContainerResource.getHost();
 }
   }
 }
-
 log.info("Did not find any running standby container for active container 
{}", activeContainerID);
-return Optional.empty();
+
+// Now, we iterate over the list of last-known standbyHosts to check if 
anyone of them has not already been tried
+Map containerToHostMapping = 
this.samzaApplicationState.jobModelManager.jobModel().getAllContainerLocality();
+
+for (String standbyContainerID : 
this.standbyContainerConstraints.get(activeContainerID)) {
+  String standbyHost = containerToHostMapping.get(standbyContainerID);
+
+  // if there is no last-known host for the standby, continue
+  if (standbyHost == null) {
+continue;
 
 Review comment:
   i agree, simplified it a bit. 
   but we cant return any-host only once we're done iterating over all 
standbyContainerIDs.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] rmatharu commented on a change in pull request #952: Improved standby-aware container allocation for active-containers on job redeploys

2019-03-21 Thread GitBox
rmatharu commented on a change in pull request #952: Improved standby-aware 
container allocation for active-containers on job redeploys
URL: https://github.com/apache/samza/pull/952#discussion_r268030578
 
 

 ##
 File path: 
samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
 ##
 @@ -161,54 +161,72 @@ private void handleStandbyContainerStop(String 
standbyContainerID, String resour
 }
   }
 
-  /** Method to handle failover for an active container.
-   *  We try to find a standby for the active container, and issue a stop on 
it.
-   *  If we do not find a standby container, we simply issue an anyhost 
request to place it.
+  /** Method to handle standby-aware allocation for an active container.
+   *  We try to find a standby host for the active container, and issue a stop 
on any standby-containers running on it,
+   *  request resource to place the active on the standby's host, and one to 
place the standby elsewhere.
*
-   * @param containerID the samzaContainerID of the active-container
+   * @param activeContainerID the samzaContainerID of the active-container
* @param resourceID  the samza-resource-ID of the container when it failed 
(used to index failover-state)
*/
-  private void initiateActiveContainerFailover(String containerID, String 
resourceID,
+  private void initiateStandbyAwareAllocation(String activeContainerID, String 
resourceID,
   AbstractContainerAllocator containerAllocator) {
 
-Optional> standbyContainer = 
this.selectStandby(containerID, resourceID);
+String standbyHost = this.selectStandbyHost(activeContainerID, resourceID);
 
-// If we find a standbyContainer, we initiate a failover
-if (standbyContainer.isPresent()) {
+// Check if there is a running standby-container on that host that needs 
to be stopped
+List standbyContainers = 
this.standbyContainerConstraints.get(activeContainerID);
+Map runningStandbyContainersOnHost = 
this.samzaApplicationState.runningContainers.entrySet().stream().filter(x -> 
standbyContainers.contains(x.getKey()))
+.filter(x -> 
x.getValue().getHost().equals(standbyHost)).collect(Collectors.toMap(Map.Entry::getKey,
 Map.Entry::getValue));
 
-  String standbyContainerId = standbyContainer.get().getKey();
-  SamzaResource standbyResource = standbyContainer.get().getValue();
-  String standbyResourceID = standbyResource.getResourceID();
-  String standbyHost = standbyResource.getHost();
+// if the standbyHost returned is anyhost, we proceed with the request 
directly
+if (standbyHost.equals(ResourceRequestState.ANY_HOST)) {
+  log.info("No standby container found for active container {}, making a 
resource-request for placing {} on {}", activeContainerID, activeContainerID, 
ResourceRequestState.ANY_HOST);
+  samzaApplicationState.failoversToAnyHost.incrementAndGet();
+  containerAllocator.requestResource(activeContainerID, 
ResourceRequestState.ANY_HOST);
+
+} else if (runningStandbyContainersOnHost.isEmpty()) {
+  // if there are no running standby-containers on the standbyHost,  we 
proceed to directly to make a resource request
 
-  // update the state
-  FailoverMetadata failoverMetadata = 
this.registerActiveContainerFailure(containerID, resourceID);
-  failoverMetadata.updateStandbyContainer(standbyResourceID, standbyHost);
+  log.info("No running standby container to stop on host {}, making a 
resource-request for placing {} on {}", standbyHost, activeContainerID, 
standbyHost);
+  FailoverMetadata failoverMetadata = 
this.registerActiveContainerFailure(activeContainerID, resourceID);
 
-  log.info("Initiating failover and stopping standby container, found 
standbyContainer {} = resource {}, "
-  + "for active container {}", standbyContainerId, standbyResourceID, 
containerID);
+  // record the resource request, before issuing it to avoid race with 
allocation-thread
+  SamzaResourceRequest resourceRequestForActive = 
containerAllocator.getResourceRequest(activeContainerID, standbyHost);
+  failoverMetadata.recordResourceRequest(resourceRequestForActive);
+  containerAllocator.issueResourceRequest(resourceRequestForActive);
 
 Review comment:
   i need to recordResource requests, to handle expired requests, so i create 
one and then use
   issueResourceRequest to issue it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] mynameborat commented on a change in pull request #955: SAMZA-2131: [Scala cleanup] Convert FileSystemCheckpointManagerConfig.scala and SystemConfig.scala to Java

2019-03-21 Thread GitBox
mynameborat commented on a change in pull request #955: SAMZA-2131: [Scala 
cleanup] Convert FileSystemCheckpointManagerConfig.scala and SystemConfig.scala 
to Java
URL: https://github.com/apache/samza/pull/955#discussion_r267968434
 
 

 ##
 File path: 
samza-core/src/main/java/org/apache/samza/config/FileSystemCheckpointManagerConfig.java
 ##
 @@ -16,16 +16,22 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+package org.apache.samza.config;
 
-package org.apache.samza.config
+import java.util.Optional;
 
-object FileSystemCheckpointManagerConfig {
-  // file system checkpoint manager config constants
-  val CHECKPOINT_MANAGER_ROOT = "task.checkpoint.path" // system name to use 
when sending offset checkpoints
 
-  implicit def Config2FSCP(config: Config) = new 
FileSystemCheckpointManagerConfig(config)
-}
+public class FileSystemCheckpointManagerConfig extends MapConfig {
+  /**
+   * Path on local file system where checkpoints should be stored.
+   */
+  private static final String CHECKPOINT_MANAGER_ROOT = "task.checkpoint.path";
 
 Review comment:
   can we make this package private and use it in test below?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] cameronlee314 commented on a change in pull request #955: SAMZA-2131: [Scala cleanup] Convert FileSystemCheckpointManagerConfig.scala and SystemConfig.scala to Java

2019-03-21 Thread GitBox
cameronlee314 commented on a change in pull request #955: SAMZA-2131: [Scala 
cleanup] Convert FileSystemCheckpointManagerConfig.scala and SystemConfig.scala 
to Java
URL: https://github.com/apache/samza/pull/955#discussion_r268000138
 
 

 ##
 File path: 
samza-core/src/main/java/org/apache/samza/config/FileSystemCheckpointManagerConfig.java
 ##
 @@ -16,16 +16,22 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+package org.apache.samza.config;
 
-package org.apache.samza.config
+import java.util.Optional;
 
-object FileSystemCheckpointManagerConfig {
-  // file system checkpoint manager config constants
-  val CHECKPOINT_MANAGER_ROOT = "task.checkpoint.path" // system name to use 
when sending offset checkpoints
 
-  implicit def Config2FSCP(config: Config) = new 
FileSystemCheckpointManagerConfig(config)
-}
+public class FileSystemCheckpointManagerConfig extends MapConfig {
+  /**
+   * Path on local file system where checkpoints should be stored.
+   */
+  private static final String CHECKPOINT_MANAGER_ROOT = "task.checkpoint.path";
 
 Review comment:
   I asked myself this question too. I did it this way since then the test can 
help double check a change to a config key (which is kind of a public API) and 
make sure it is intentional.
   Do you think it is still better to just use the variable in the test?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] shanthoosh merged pull request #899: SAMZA-2091: Update ZkClient in samza-standalone to use string serializer

2019-03-20 Thread GitBox
shanthoosh merged pull request #899:  SAMZA-2091: Update ZkClient in 
samza-standalone to use string serializer
URL: https://github.com/apache/samza/pull/899
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] shanthoosh commented on issue #899: SAMZA-2091: Update ZkClient in samza-standalone to use string serializer

2019-03-20 Thread GitBox
shanthoosh commented on issue #899:  SAMZA-2091: Update ZkClient in 
samza-standalone to use string serializer
URL: https://github.com/apache/samza/pull/899#issuecomment-475051108
 
 
   @prateekm Apologies for the delay in merging this in. I was side-tracked 
with save-points and standalone -prod issues with on-boarding new user.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] dxichen opened a new pull request #963: SAMZA-2137: Update blog pages, remove unreleased version

2019-03-20 Thread GitBox
dxichen opened a new pull request #963: SAMZA-2137: Update blog pages, remove 
unreleased version
URL: https://github.com/apache/samza/pull/963
 
 
   @prateekm @jagadish-v0 please take a look.
   Would be nice to deploy this before the meetup talking about Samza 1.0  


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] asfgit closed pull request #963: SAMZA-2137: Update blog pages, remove unreleased version

2019-03-20 Thread GitBox
asfgit closed pull request #963: SAMZA-2137: Update blog pages, remove 
unreleased version
URL: https://github.com/apache/samza/pull/963
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] asfgit closed pull request #921: SAMZA-2105: [elasticsearch, hdfs, kafka] code cleanup and refactoring

2019-03-20 Thread GitBox
asfgit closed pull request #921: SAMZA-2105: [elasticsearch, hdfs, kafka] code 
cleanup and refactoring
URL: https://github.com/apache/samza/pull/921
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] srinipunuru merged pull request #909: Support for empty records

2019-02-07 Thread GitBox
srinipunuru merged pull request #909: Support for empty records
URL: https://github.com/apache/samza/pull/909
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] srinipunuru opened a new pull request #911: Support for types in Samza SQL UDF

2019-02-07 Thread GitBox
srinipunuru opened a new pull request #911: Support for types in Samza SQL UDF
URL: https://github.com/apache/samza/pull/911
 
 
   This PR adds following capabilities to UDF
   
   1. Adds typed UDFS : Previously samza sql udfs were un-typed all the params 
used to be Objects, Although this allowed flexibility of polymorphism without 
having to implement multiple methods. This was fragile. We couldn't do 
validations during initialization phase. This change adds types to the UDFs.
   3. Decouples name of the method from udf: previously all udf methods have to 
be named "execute". Now the Udfs methods can be named whatever the developer 
prefers. The UDFMethods are identified using the annotation. 
   2. Adds polymorphism to UDFs : Adds capability where you can have mulitple 
methods for the same UDF name. i.e. multiple methods within the class can be 
annotated as SamzaSQLUDFMethod.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] shenodaguirguis commented on a change in pull request #911: Support for types in Samza SQL UDF

2019-02-07 Thread GitBox
shenodaguirguis commented on a change in pull request #911: Support for types 
in Samza SQL UDF
URL: https://github.com/apache/samza/pull/911#discussion_r254915133
 
 

 ##
 File path: 
samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlExecutionContext.java
 ##
 @@ -52,16 +54,19 @@ private SamzaSqlExecutionContext(SamzaSqlExecutionContext 
other) {
 
   public SamzaSqlExecutionContext(SamzaSqlApplicationConfig config) {
 this.sqlConfig = config;
-udfMetadata =
-
this.sqlConfig.getUdfMetadata().stream().collect(Collectors.toMap(UdfMetadata::getName,
 Function.identity()));
+udfMetadata = new HashMap<>();
+for(UdfMetadata udf : this.sqlConfig.getUdfMetadata()) {
+  udfMetadata.putIfAbsent(udf.getName(), new ArrayList<>());
+  udfMetadata.get(udf.getName()).add(udf);
+}
   }
 
   public ScalarUdf getOrCreateUdf(String clazz, String udfName) {
 return udfInstances.computeIfAbsent(udfName, s -> createInstance(clazz, 
udfName));
   }
 
   public ScalarUdf createInstance(String clazz, String udfName) {
-Config udfConfig = udfMetadata.get(udfName).getUdfConfig();
+Config udfConfig = udfMetadata.get(udfName).get(0).getUdfConfig();
 
 Review comment:
   I see. Thanks for the clarification & the  comment. Don't we still need to 
guard code against erroneously empty list?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] srinipunuru commented on a change in pull request #911: Support for types in Samza SQL UDF

2019-02-07 Thread GitBox
srinipunuru commented on a change in pull request #911: Support for types in 
Samza SQL UDF
URL: https://github.com/apache/samza/pull/911#discussion_r254886952
 
 

 ##
 File path: 
samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlExecutionContext.java
 ##
 @@ -38,7 +40,7 @@
* The variables that are shared among all cloned instance of {@link 
SamzaSqlExecutionContext}
*/
   private final SamzaSqlApplicationConfig sqlConfig;
-  private final Map udfMetadata;
+  private final Map> udfMetadata;
 
 Review comment:
   Yes that is correct. I will add comments to clarify that


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] srinipunuru commented on a change in pull request #911: Support for types in Samza SQL UDF

2019-02-07 Thread GitBox
srinipunuru commented on a change in pull request #911: Support for types in 
Samza SQL UDF
URL: https://github.com/apache/samza/pull/911#discussion_r254888242
 
 

 ##
 File path: 
samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlExecutionContext.java
 ##
 @@ -52,16 +54,19 @@ private SamzaSqlExecutionContext(SamzaSqlExecutionContext 
other) {
 
   public SamzaSqlExecutionContext(SamzaSqlApplicationConfig config) {
 this.sqlConfig = config;
-udfMetadata =
-
this.sqlConfig.getUdfMetadata().stream().collect(Collectors.toMap(UdfMetadata::getName,
 Function.identity()));
+udfMetadata = new HashMap<>();
+for(UdfMetadata udf : this.sqlConfig.getUdfMetadata()) {
+  udfMetadata.putIfAbsent(udf.getName(), new ArrayList<>());
+  udfMetadata.get(udf.getName()).add(udf);
+}
   }
 
   public ScalarUdf getOrCreateUdf(String clazz, String udfName) {
 return udfInstances.computeIfAbsent(udfName, s -> createInstance(clazz, 
udfName));
   }
 
   public ScalarUdf createInstance(String clazz, String udfName) {
-Config udfConfig = udfMetadata.get(udfName).getUdfConfig();
+Config udfConfig = udfMetadata.get(udfName).get(0).getUdfConfig();
 
 Review comment:
   Please see @weiqingy 's comment and the corresponding answer. There can be 
several UDF methods within an UDF class config should be associated at the 
class level. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] rmatharu opened a new pull request #912: Test sideinputrefactor

2019-02-07 Thread GitBox
rmatharu opened a new pull request #912: Test sideinputrefactor
URL: https://github.com/apache/samza/pull/912
 
 
   DO NOT REVIEW


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] strkkk opened a new pull request #913: SAMZA-2102: [samza-azure] code cleanup and refactoring

2019-02-08 Thread GitBox
strkkk opened a new pull request #913: SAMZA-2102: [samza-azure] code cleanup 
and refactoring
URL: https://github.com/apache/samza/pull/913
 
 
   1. Removed deprecated junit.framework.Assert
   2. Add final modifiers
   3. Remove redundant things (L literal, modifiers, local variables)
   4. Optional.of -> Optional.ofNullable
   5. Few methods are simplified


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] strkkk opened a new pull request #914: SAMZA-2103: [samza-aws] code cleanup and refactoring

2019-02-08 Thread GitBox
strkkk opened a new pull request #914: SAMZA-2103: [samza-aws] code cleanup and 
refactoring
URL: https://github.com/apache/samza/pull/914
 
 
   1. Redundant things removed
   2. Several methods simplified
   3. Condition ``if (numRecordsPerShard > 0)`` in 
``TestKinesisSystemConsumer.java``  was removed because it is always true 
(there is same check at line 130 and return in else block in case it is false). 
   4. Deprecated ``onCheckpoint`` method was replaced with ``afterCheckpoint``


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] rmatharu commented on issue #876: SEP-19 : Basic Standby task/container implementation

2019-02-08 Thread GitBox
rmatharu commented on issue #876: SEP-19 : Basic Standby task/container 
implementation
URL: https://github.com/apache/samza/pull/876#issuecomment-461914315
 
 
   Closing this in lieu of https://github.com/apache/samza/pull/912 and a 
follow on to that.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] rmatharu closed pull request #876: SEP-19 : Basic Standby task/container implementation

2019-02-08 Thread GitBox
rmatharu closed pull request #876: SEP-19 : Basic Standby task/container 
implementation
URL: https://github.com/apache/samza/pull/876
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.

2019-02-17 Thread GitBox
shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement 
the StartpointVisitor for the KafkaSystemConsumer.
URL: https://github.com/apache/samza/pull/918#discussion_r257074119
 
 

 ##
 File path: 
samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java
 ##
 @@ -285,6 +321,27 @@ public void register(SystemStreamPartition 
systemStreamPartition, String offset)
 metrics.registerTopicAndPartition(toTopicAndPartition(tp));
   }
 
+  @Override
+  public void register(SystemStreamPartition systemStreamPartition, Startpoint 
startpoint) {
+if (started.get()) {
+  String exceptionMessage = String.format("KafkaSystemConsumer: %s had 
started. Registration of ssp: %s, startpoint: %s failed.", this, 
systemStreamPartition, startpoint);
+  throw new SamzaException(exceptionMessage);
+}
+
+if (!Objects.equals(systemStreamPartition.getSystem(), systemName)) {
+  LOG.warn("{}: ignoring SSP {}, because this consumer's system doesn't 
match.", this, systemStreamPartition);
+  return;
+}
+
+LOG.debug("Registering the ssp: {}, startpoint: {} with the consumer.", 
systemStreamPartition, startpoint);
+
+super.register(systemStreamPartition, startpoint);
+
+TopicPartition topicPartition = toTopicPartition(systemStreamPartition);
+topicPartitionToStartpointMap.put(topicPartition, startpoint);
 
 Review comment:
   I discussed this with you offline before about having a startpoint 
comparator to choose the lowest startpoint when there're multiple startpoints  
defined for a  SSP. 
   
   Example: Broadcast streams. Task1 defines lower startpoint and Task-2 
defines a higher startpoint for the SSP.
   
   It's already tracked here: SAMZA-2088.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.

2019-02-17 Thread GitBox
shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement 
the StartpointVisitor for the KafkaSystemConsumer.
URL: https://github.com/apache/samza/pull/918#discussion_r257522508
 
 

 ##
 File path: 
samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java
 ##
 @@ -330,6 +387,73 @@ public String getSystemName() {
 return systemName;
   }
 
+  @VisibleForTesting
+  static class KafkaStartpointRegistrationHandler implements StartpointVisitor 
{
+
+private final Consumer kafkaConsumer;
+
+KafkaStartpointRegistrationHandler(Consumer kafkaConsumer) {
+  this.kafkaConsumer = kafkaConsumer;
+}
+
+@Override
+public void visit(SystemStreamPartition systemStreamPartition, 
StartpointSpecific startpointSpecific) {
+  TopicPartition topicPartition = toTopicPartition(systemStreamPartition);
+  long offsetInStartpoint = 
Long.parseLong(startpointSpecific.getSpecificOffset());
+  LOG.info("Updating the consumer fetch offsets of topic partition: {} to 
{}.", topicPartition, offsetInStartpoint);
+
+  // KafkaConsumer is not thread-safe.
+  synchronized (kafkaConsumer) {
+kafkaConsumer.seek(topicPartition, offsetInStartpoint);
 
 Review comment:
   1. Upper layers which call this API already log any exception that is thrown 
back.
   2. Wrapping the exception to a SamzaException and throwing it back, IMHO 
doesn't add much value.
   
   What do you think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.

2019-02-17 Thread GitBox
shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement 
the StartpointVisitor for the KafkaSystemConsumer.
URL: https://github.com/apache/samza/pull/918#discussion_r257523191
 
 

 ##
 File path: 
samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java
 ##
 @@ -285,6 +321,27 @@ public void register(SystemStreamPartition 
systemStreamPartition, String offset)
 metrics.registerTopicAndPartition(toTopicAndPartition(tp));
   }
 
+  @Override
+  public void register(SystemStreamPartition systemStreamPartition, Startpoint 
startpoint) {
 
 Review comment:
   Yes. New introduced API: `register(ssp, startpoint)`  has guarantees that is 
a superset of the existing `register(ssp, offset)` API  in SystemConsumer.
   After 1.0.1, we would  remove the existing register(ssp, offset) method. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.

2019-02-17 Thread GitBox
shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement 
the StartpointVisitor for the KafkaSystemConsumer.
URL: https://github.com/apache/samza/pull/918#discussion_r257523873
 
 

 ##
 File path: 
samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java
 ##
 @@ -60,16 +74,22 @@
   private final Config config;
   private final boolean fetchThresholdBytesEnabled;
   private final KafkaSystemConsumerMetrics metrics;
+  private final KafkaStartpointRegistrationHandler 
kafkaStartpointRegistrationHandler;
 
   // This sink is used to transfer the messages from the proxy/consumer to the 
BlockingEnvelopeMap.
   final KafkaConsumerMessageSink messageSink;
 
   // This proxy contains a separate thread, which reads kafka messages (with 
consumer.poll()) and populates
   // BlockingEnvelopMap's buffers.
-  final private KafkaConsumerProxy proxy;
+  @VisibleForTesting
+  KafkaConsumerProxy proxy;
 
   // keep registration data until the start - mapping between registered SSPs 
and topicPartitions, and their offsets
   final Map topicPartitionsToOffset = new HashMap<>();
 
 Review comment:
   I'm aware that we'd to do this change. My initial plan was to do it after 
migrating all the system consumer to implement at-least the startpoint-specific 
visitor. 
   
   Discussed offline and we agreed to do this change in this patch itself. 
   1. Removed  `topicPartitionstoOffset`  and delegate `register(ssp, offset)` 
API calls to  start-point registration API.
   2. Moved the offset comparator logic to `SystemConsumers` layer. 
 A. Only few implementations of  SystemConsumer.register API 
currently use systemAdmin.offsetComparator to compare offsets. However, the 
comparator logic is common to all implementations of SystemConsumer.register 
API. Moving it to the upper-layer(`SystemConsumers`) would ensure functional 
correctness.
 B. All other upper-layers except `SystemConsumers` that invoke 
register(ssp, offset) API already pass the lowest offset for a SSP.  We don't 
have startpoint comparator abstraction yet. To delegate calls from 
register(ssp, offset) to register-startpoint API we had to get the lowest 
offset for a SSP from SystemConsumers layer.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.

2019-02-17 Thread GitBox
shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement 
the StartpointVisitor for the KafkaSystemConsumer.
URL: https://github.com/apache/samza/pull/918#discussion_r257524023
 
 

 ##
 File path: 
samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java
 ##
 @@ -141,11 +166,14 @@ public void start() {
 
   private void startSubscription() {
 //subscribe to all the registered TopicPartitions
-LOG.info("{}: Consumer subscribes to {}", this, 
topicPartitionsToSSP.keySet());
+Set registeredTopicPartitions = new HashSet<>();
+registeredTopicPartitions.addAll(topicPartitionsToSSP.keySet());
+registeredTopicPartitions.addAll(topicPartitionToStartpointMap.keySet());
 
 Review comment:
   This is no longer necessary, after we'd agreed offline to make the 
register(ssp, offset) API to delegate to startpoint-register API.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.

2019-02-17 Thread GitBox
shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement 
the StartpointVisitor for the KafkaSystemConsumer.
URL: https://github.com/apache/samza/pull/918#discussion_r257072998
 
 

 ##
 File path: 
samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java
 ##
 @@ -285,6 +321,27 @@ public void register(SystemStreamPartition 
systemStreamPartition, String offset)
 metrics.registerTopicAndPartition(toTopicAndPartition(tp));
   }
 
+  @Override
+  public void register(SystemStreamPartition systemStreamPartition, Startpoint 
startpoint) {
+if (started.get()) {
+  String exceptionMessage = String.format("KafkaSystemConsumer: %s had 
started. Registration of ssp: %s, startpoint: %s failed.", this, 
systemStreamPartition, startpoint);
+  throw new SamzaException(exceptionMessage);
+}
+
+if (!Objects.equals(systemStreamPartition.getSystem(), systemName)) {
+  LOG.warn("{}: ignoring SSP {}, because this consumer's system doesn't 
match.", this, systemStreamPartition);
+  return;
+}
+
+LOG.debug("Registering the ssp: {}, startpoint: {} with the consumer.", 
systemStreamPartition, startpoint);
+
+super.register(systemStreamPartition, startpoint);
 
 Review comment:
   It already does and is already added as a part of this PR. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.

2019-02-17 Thread GitBox
shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement 
the StartpointVisitor for the KafkaSystemConsumer.
URL: https://github.com/apache/samza/pull/918#discussion_r257523115
 
 

 ##
 File path: 
samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java
 ##
 @@ -205,6 +215,156 @@ public void testFetchThresholdBytesDiabled() {
 consumer.stop();
   }
 
+  @Test
+  public void 
testStartpointSpecificOffsetVisitorShouldUpdateTheFetchOffsetInConsumer() {
+// Define dummy variables for testing.
+final Integer testPartitionId = 0;
+final String offset = "0";
+final TopicPartition testTopicPartition = new TopicPartition(TEST_STREAM, 
testPartitionId);
+final Partition testPartition = new Partition(testPartitionId);
+final SystemStreamPartition testSystemStreamPartition = new 
SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, testPartition);
+
+final KafkaConsumer consumer = Mockito.mock(KafkaConsumer.class);
+KafkaStartpointRegistrationHandler
+kafkaStartpointRegistrationHandler = new 
KafkaSystemConsumer.KafkaStartpointRegistrationHandler(consumer);
+
+final StartpointSpecific testStartpointSpecific = new 
StartpointSpecific(offset);
+
+// Mock the consumer interactions.
+Mockito.doNothing().when(consumer).seek(testTopicPartition, 
Long.valueOf(offset));
+
+// Invoke the consumer with startpoint.
+kafkaStartpointRegistrationHandler.visit(testSystemStreamPartition, 
testStartpointSpecific);
+
+// Mock verifications.
+Mockito.verify(consumer).seek(testTopicPartition, Long.valueOf(offset));
+  }
+
+  @Test
+  public void 
testStartpointTimestampVisitorShouldUpdateTheFetchOffsetInConsumer() {
+// Define dummy variables for testing.
+final Integer testPartitionId = 0;
+final TopicPartition testTopicPartition = new TopicPartition(TEST_STREAM, 
testPartitionId);
+final Partition testPartition = new Partition(testPartitionId);
+final SystemStreamPartition testSystemStreamPartition = new 
SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, testPartition);
+final Long testTimeStamp = 10L;
+final String testOffset = "10";
+
+final KafkaConsumer consumer = Mockito.mock(KafkaConsumer.class);
+KafkaStartpointRegistrationHandler
+kafkaStartpointRegistrationHandler = new 
KafkaSystemConsumer.KafkaStartpointRegistrationHandler(consumer);
+
+final StartpointTimestamp startpointTimestamp = new 
StartpointTimestamp(testTimeStamp);
+final Map offsetForTimesResult = 
ImmutableMap.of(testTopicPartition, new 
OffsetAndTimestamp(Long.valueOf(testOffset), testTimeStamp));
+
+// Mock the consumer interactions.
+
Mockito.when(consumer.offsetsForTimes(Mockito.anyMap())).thenReturn(offsetForTimesResult);
+Mockito.doNothing().when(consumer).seek(testTopicPartition, 
Long.valueOf(testOffset));
+
+kafkaStartpointRegistrationHandler.visit(testSystemStreamPartition, 
startpointTimestamp);
+
+// Mock verifications.
+Mockito.verify(consumer).seek(testTopicPartition, 
Long.valueOf(testOffset));
+Mockito.verify(consumer).offsetsForTimes(Mockito.anyMap());
+  }
+
+  @Test
+  public void 
testStartpointOldestVisitorShouldUpdateTheFetchOffsetInConsumer() {
+// Define dummy variables for testing.
+final Integer testPartitionId = 0;
+final TopicPartition testTopicPartition = new TopicPartition(TEST_STREAM, 
testPartitionId);
+final Partition testPartition = new Partition(testPartitionId);
+final SystemStreamPartition testSystemStreamPartition = new 
SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, testPartition);
+
+final KafkaConsumer consumer = Mockito.mock(KafkaConsumer.class);
+final KafkaStartpointRegistrationHandler
+kafkaStartpointRegistrationHandler = new 
KafkaSystemConsumer.KafkaStartpointRegistrationHandler(consumer);
+
+final StartpointOldest testStartpointSpecific = new StartpointOldest();
+
+// Mock the consumer interactions.
+
Mockito.doNothing().when(consumer).seekToBeginning(ImmutableList.of(testTopicPartition));
+
+// Invoke the consumer with startpoint.
+kafkaStartpointRegistrationHandler.visit(testSystemStreamPartition, 
testStartpointSpecific);
+
+// Mock verifications.
+
Mockito.verify(consumer).seekToBeginning(ImmutableList.of(testTopicPartition));
+  }
+
+  @Test
+  public void 
testStartpointUpcomingVisitorShouldUpdateTheFetchOffsetInConsumer() {
+// Define dummy variables for testing.
+final Integer testPartitionId = 0;
+final TopicPartition testTopicPartition = new TopicPartition(TEST_STREAM, 
testPartitionId);
+final Partition testPartition = new Partition(testPartitionId);
+final SystemStreamPartition testSystemStreamPartition = new 
SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, testPartition);
+
+final KafkaConsumer consumer = Mockito.mock(KafkaConsumer.class);
+final KafkaSystemConsumer.KafkaStartpointRegistrationHandler
+

[GitHub] shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.

2019-02-17 Thread GitBox
shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement 
the StartpointVisitor for the KafkaSystemConsumer.
URL: https://github.com/apache/samza/pull/918#discussion_r257523065
 
 

 ##
 File path: 
samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java
 ##
 @@ -205,6 +215,156 @@ public void testFetchThresholdBytesDiabled() {
 consumer.stop();
   }
 
+  @Test
+  public void 
testStartpointSpecificOffsetVisitorShouldUpdateTheFetchOffsetInConsumer() {
+// Define dummy variables for testing.
+final Integer testPartitionId = 0;
 
 Review comment:
   Done. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] atoomula commented on a change in pull request #911: Support for types in Samza SQL UDF

2019-02-09 Thread GitBox
atoomula commented on a change in pull request #911: Support for types in Samza 
SQL UDF
URL: https://github.com/apache/samza/pull/911#discussion_r255300421
 
 

 ##
 File path: 
samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlScalarFunctionImpl.java
 ##
 @@ -61,17 +63,23 @@ public String getUdfName() {
 return udfName;
   }
 
+  public int numberArguments() {
 
 Review comment:
   typo: numberOfArguments


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] atoomula commented on a change in pull request #911: Support for types in Samza SQL UDF

2019-02-09 Thread GitBox
atoomula commented on a change in pull request #911: Support for types in Samza 
SQL UDF
URL: https://github.com/apache/samza/pull/911#discussion_r255300436
 
 

 ##
 File path: 
samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlExecutionContext.java
 ##
 @@ -38,7 +40,10 @@
* The variables that are shared among all cloned instance of {@link 
SamzaSqlExecutionContext}
*/
   private final SamzaSqlApplicationConfig sqlConfig;
-  private final Map udfMetadata;
+
+  // Maps the UDF name to list of all UDF methods associated with the name.
+  // Since we support polymorphism there can multiple udfMetadata associated 
with the single name.
 
 Review comment:
   How can these different methods be invoked via Udf in a sql statement ? Is 
there an example or a test ? I see one with overload but can we have different 
method names and make it work ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] atoomula commented on a change in pull request #911: Support for types in Samza SQL UDF

2019-02-09 Thread GitBox
atoomula commented on a change in pull request #911: Support for types in Samza 
SQL UDF
URL: https://github.com/apache/samza/pull/911#discussion_r255300768
 
 

 ##
 File path: samza-sql/src/main/java/org/apache/samza/sql/fn/GetSqlFieldUdf.java
 ##
 @@ -51,22 +52,21 @@
  *   - sessionKey (Scalar)
  *
  */
-@SamzaSqlUdf(name = "GetSqlField")
+@SamzaSqlUdf(name = "GetSqlField", description = "Get an element from complex 
Sql field.")
 
 Review comment:
   GetSqlField is kind of limiting as it returns only a String. For instance, 
we cannot get a sub-record or an array or any other type. Just wondering if 
other cases work if we return an object from execute ? Or since now we have 
ability to add more methods to an Udf (although I'm still not clear on how 
these different methods can be invoked in Udf unless they are fn overloads), is 
there any other way we could make it return different types ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] atoomula commented on a change in pull request #911: Support for types in Samza SQL UDF

2019-02-09 Thread GitBox
atoomula commented on a change in pull request #911: Support for types in Samza 
SQL UDF
URL: https://github.com/apache/samza/pull/911#discussion_r255299964
 
 

 ##
 File path: 
samza-sql/src/main/java/org/apache/samza/sql/fn/BuildOutputRecordUdf.java
 ##
 @@ -61,13 +61,13 @@
  * If no args is provided, it returns an empty SamzaSqlRelRecord (with empty 
field names and values list).
  */
 
-@SamzaSqlUdf(name="BuildOutputRecord")
+@SamzaSqlUdf(name="BuildOutputRecord" , description = "Creates an Output 
record.")
 
 Review comment:
   typo: space after name. name = "BuildOutputRecord"


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] atoomula commented on a change in pull request #911: Support for types in Samza SQL UDF

2019-02-09 Thread GitBox
atoomula commented on a change in pull request #911: Support for types in Samza 
SQL UDF
URL: https://github.com/apache/samza/pull/911#discussion_r255277549
 
 

 ##
 File path: samza-api/src/main/java/org/apache/samza/sql/udfs/ScalarUdf.java
 ##
 @@ -36,5 +36,4 @@
* @param udfConfig Config specific to the udf.
 
 Review comment:
   ScalarUdf class comments need to be updated to exclude any comments about 
"execute".


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] rmatharu commented on issue #915: Consolidating offset read and write for store-offsets and side-inputs, maintaining backward compatbility

2019-02-12 Thread GitBox
rmatharu commented on issue #915: Consolidating offset read and write for 
store-offsets and side-inputs, maintaining backward compatbility
URL: https://github.com/apache/samza/pull/915#issuecomment-462886896
 
 
   Addressed all comments.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] vjagadish1989 commented on a change in pull request #903: SEP-19: Allocator changes for standby-aware container allocation, and active container failover

2019-02-12 Thread GitBox
vjagadish1989 commented on a change in pull request #903: SEP-19: Allocator 
changes for standby-aware container allocation, and active container failover
URL: https://github.com/apache/samza/pull/903#discussion_r256082808
 
 

 ##
 File path: 
samza-core/src/main/java/org/apache/samza/clustermanager/HostAwareContainerAllocator.java
 ##
 @@ -119,4 +153,219 @@ private void updateExpiryMetrics(SamzaResourceRequest 
request) {
   state.expiredPreferredHostRequests.incrementAndGet();
 }
   }
-}
+
+  // Method to run a container on the given resource if it meets all standby 
constraints. If not, we re-request resource
+  // for the container (similar to the case when we re-request for a 
launch-fail or request expiry).
+  private boolean 
checkStandbyTaskConstraintsAndRunStreamProcessor(SamzaResourceRequest request, 
String preferredHost,
+  SamzaResource samzaResource, SamzaApplicationState state) {
+
+// If standby tasks are not enabled run streamprocessor and return true
+if (!new JobConfig(config).getStandbyTasksEnabled()) {
+  runStreamProcessor(request, preferredHost);
+  return true;
+}
+
+String containerID = request.getContainerID();
+
+if (checkStandbyConstraints(request, samzaResource, state)) {
+  // This resource can be used to launch this container
+  log.info("Running container {} on preferred host {} meets standby 
constraints, launching on {}", containerID,
+  preferredHost, samzaResource.getHost());
+  runStreamProcessor(request, preferredHost);
+  state.successfulStandbyAllocations.incrementAndGet();
+  return true;
+} else {
+  // This resource cannot be used to launch this container, so we treat it 
like a launch fail, and issue an ANY_HOST request
+  log.info("Running container {} on host {} does not meet standby 
constraints, cancelling resource request, releasing resource, and making a new 
ANY_HOST request",
+  containerID, samzaResource.getHost());
+  resourceRequestState.releaseUnstartableContainer(samzaResource, 
preferredHost);
+  resourceRequestState.cancelResourceRequest(request);
+  requestResourceDueToLaunchFailOrExpiredRequest(containerID);
+  state.failedStandbyAllocations.incrementAndGet();
+  return false;
+}
+  }
+
+  // Helper method to check if this SamzaResourceRequest for a container can 
be met on this resource, given standby
+  // container constraints, and the current set of pending and running 
containers
+  private boolean checkStandbyConstraints(SamzaResourceRequest request, 
SamzaResource samzaResource,
+  SamzaApplicationState samzaApplicationState) {
+String containerIDToStart = request.getContainerID();
+String host = samzaResource.getHost();
+List containerIDsForStandbyConstraints = 
this.standbyContainerConstraints.get(containerIDToStart);
+
+// Check if any of these conflicting containers are running/launching on 
host
+for (String containerID : containerIDsForStandbyConstraints) {
+  SamzaResource resource = 
samzaApplicationState.pendingContainers.get(containerID);
+
+  // return false if a conflicting container is pending for launch on the 
host
+  if (resource != null && resource.getHost().equals(host)) {
+log.info("Container {} cannot be started on host {} because container 
{} is already scheduled on this host",
+containerIDToStart, samzaResource.getHost(), containerID);
+return false;
+  }
+
+  // return false if a conflicting container is running on the host
+  resource = samzaApplicationState.runningContainers.get(containerID);
+  if (resource != null && resource.getHost().equals(host)) {
+log.info("Container {} cannot be started on host {} because container 
{} is already running on this host",
+containerIDToStart, samzaResource.getHost(), containerID);
+return false;
+  }
+}
+
+return true;
+  }
+
+  /**
+   * Intercept resource requests, which are due to either a launch-failure or 
resource-request expired or standby
+   * 1. a standby container, we proceed to make a anyhost request
+   * 2. an activeContainer, we try to fail-it-over to a standby
+   * @param containerID Identifier of the container that will be run when a 
resource is allocated
+   */
+  @Override
+  public void requestResourceDueToLaunchFailOrExpiredRequest(String 
containerID) {
+if (StandbyTaskUtil.isStandbyContainer(containerID)) {
+  log.info("Handling rerequesting for container {} using an any host 
request");
+  super.requestResource(containerID, ResourceRequestState.ANY_HOST); // 
proceed with a the anyhost request
+} else {
+  requestResource(containerID, ResourceRequestState.ANY_HOST); // invoke 
local method & select a new standby if possible
+}
+  }
+
+  // Intercept issuing of resource requests from the CPM
+  // 1. for ActiveContainers and instead choose a StandbyContainer to stop
+  // 2. for a 

[GitHub] vjagadish1989 commented on a change in pull request #903: SEP-19: Allocator changes for standby-aware container allocation, and active container failover

2019-02-12 Thread GitBox
vjagadish1989 commented on a change in pull request #903: SEP-19: Allocator 
changes for standby-aware container allocation, and active container failover
URL: https://github.com/apache/samza/pull/903#discussion_r255777131
 
 

 ##
 File path: 
samza-core/src/main/java/org/apache/samza/clustermanager/ContainerFailoverState.java
 ##
 @@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.clustermanager;
+
+import java.util.HashMap;
+import java.util.Map;
+
+
+/**
+ * Encapsulates metadata concerning the failover of an active container.
+ */
+public class ContainerFailoverState {
+  public final String activeContainerID;
+  public final String activeContainerResourceID;
+
+  // Map of samza-container-resource ID to host, for each standby container 
selected for failover
+  public final Map selectedStandbyContainers;
 
 Review comment:
   Make this class store and manage the mapping from:
   activeResource -> List[standbyResource]
   
   any reassignments/ changes to this mapping should update state in this class
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] vjagadish1989 commented on a change in pull request #903: SEP-19: Allocator changes for standby-aware container allocation, and active container failover

2019-02-12 Thread GitBox
vjagadish1989 commented on a change in pull request #903: SEP-19: Allocator 
changes for standby-aware container allocation, and active container failover
URL: https://github.com/apache/samza/pull/903#discussion_r255771228
 
 

 ##
 File path: 
samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java
 ##
 @@ -141,6 +143,36 @@
*/
   public final AtomicInteger redundantNotifications = new AtomicInteger(0);
 
+  /**
+   * Number of container allocations that proceeded because they met standby 
container constraints.
 
 Review comment:
   I would pick 2 metrics that we care about the most. Often, more metrics add 
too much noise. for eg: not sure if the metric for standByStopsComplete is 
necessary


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] vjagadish1989 commented on a change in pull request #903: SEP-19: Allocator changes for standby-aware container allocation, and active container failover

2019-02-12 Thread GitBox
vjagadish1989 commented on a change in pull request #903: SEP-19: Allocator 
changes for standby-aware container allocation, and active container failover
URL: https://github.com/apache/samza/pull/903#discussion_r256070415
 
 

 ##
 File path: 
samza-core/src/main/java/org/apache/samza/clustermanager/HostAwareContainerAllocator.java
 ##
 @@ -45,10 +53,22 @@
*/
   private final int requestTimeout;
 
+  // Map of activeContainerIDs to its corresponding standby containers, and 
standbyContainerIDs to its corresponding
+  // active container and other corresponding standbyContainers
+  private final Map> standbyContainerConstraints = new 
HashMap<>();
 
 Review comment:
   This map is un-necessary because it appears to be conflating too many roles..
   
   You can simplify / consolidate state in a single class with the following 
APIs:
   - List\ getStandbyContainers(activeResourceId)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] vjagadish1989 commented on a change in pull request #903: SEP-19: Allocator changes for standby-aware container allocation, and active container failover

2019-02-12 Thread GitBox
vjagadish1989 commented on a change in pull request #903: SEP-19: Allocator 
changes for standby-aware container allocation, and active container failover
URL: https://github.com/apache/samza/pull/903#discussion_r256081802
 
 

 ##
 File path: 
samza-core/src/main/java/org/apache/samza/clustermanager/HostAwareContainerAllocator.java
 ##
 @@ -119,4 +153,219 @@ private void updateExpiryMetrics(SamzaResourceRequest 
request) {
   state.expiredPreferredHostRequests.incrementAndGet();
 }
   }
-}
+
+  // Method to run a container on the given resource if it meets all standby 
constraints. If not, we re-request resource
+  // for the container (similar to the case when we re-request for a 
launch-fail or request expiry).
+  private boolean 
checkStandbyTaskConstraintsAndRunStreamProcessor(SamzaResourceRequest request, 
String preferredHost,
+  SamzaResource samzaResource, SamzaApplicationState state) {
+
+// If standby tasks are not enabled run streamprocessor and return true
+if (!new JobConfig(config).getStandbyTasksEnabled()) {
+  runStreamProcessor(request, preferredHost);
+  return true;
+}
+
+String containerID = request.getContainerID();
+
+if (checkStandbyConstraints(request, samzaResource, state)) {
+  // This resource can be used to launch this container
+  log.info("Running container {} on preferred host {} meets standby 
constraints, launching on {}", containerID,
+  preferredHost, samzaResource.getHost());
+  runStreamProcessor(request, preferredHost);
+  state.successfulStandbyAllocations.incrementAndGet();
+  return true;
+} else {
+  // This resource cannot be used to launch this container, so we treat it 
like a launch fail, and issue an ANY_HOST request
+  log.info("Running container {} on host {} does not meet standby 
constraints, cancelling resource request, releasing resource, and making a new 
ANY_HOST request",
+  containerID, samzaResource.getHost());
+  resourceRequestState.releaseUnstartableContainer(samzaResource, 
preferredHost);
+  resourceRequestState.cancelResourceRequest(request);
+  requestResourceDueToLaunchFailOrExpiredRequest(containerID);
+  state.failedStandbyAllocations.incrementAndGet();
+  return false;
+}
+  }
+
+  // Helper method to check if this SamzaResourceRequest for a container can 
be met on this resource, given standby
+  // container constraints, and the current set of pending and running 
containers
+  private boolean checkStandbyConstraints(SamzaResourceRequest request, 
SamzaResource samzaResource,
+  SamzaApplicationState samzaApplicationState) {
+String containerIDToStart = request.getContainerID();
+String host = samzaResource.getHost();
+List containerIDsForStandbyConstraints = 
this.standbyContainerConstraints.get(containerIDToStart);
+
+// Check if any of these conflicting containers are running/launching on 
host
+for (String containerID : containerIDsForStandbyConstraints) {
+  SamzaResource resource = 
samzaApplicationState.pendingContainers.get(containerID);
+
+  // return false if a conflicting container is pending for launch on the 
host
+  if (resource != null && resource.getHost().equals(host)) {
+log.info("Container {} cannot be started on host {} because container 
{} is already scheduled on this host",
+containerIDToStart, samzaResource.getHost(), containerID);
+return false;
+  }
+
+  // return false if a conflicting container is running on the host
+  resource = samzaApplicationState.runningContainers.get(containerID);
+  if (resource != null && resource.getHost().equals(host)) {
+log.info("Container {} cannot be started on host {} because container 
{} is already running on this host",
+containerIDToStart, samzaResource.getHost(), containerID);
+return false;
+  }
+}
+
+return true;
+  }
+
+  /**
+   * Intercept resource requests, which are due to either a launch-failure or 
resource-request expired or standby
+   * 1. a standby container, we proceed to make a anyhost request
+   * 2. an activeContainer, we try to fail-it-over to a standby
+   * @param containerID Identifier of the container that will be run when a 
resource is allocated
+   */
+  @Override
+  public void requestResourceDueToLaunchFailOrExpiredRequest(String 
containerID) {
 
 Review comment:
   shouldn't have to override this method  (or expose this method in the parent 
class). Instead it's cleaner to expose the most general API and special-case 
them when they're used


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] vjagadish1989 commented on a change in pull request #903: SEP-19: Allocator changes for standby-aware container allocation, and active container failover

2019-02-12 Thread GitBox
vjagadish1989 commented on a change in pull request #903: SEP-19: Allocator 
changes for standby-aware container allocation, and active container failover
URL: https://github.com/apache/samza/pull/903#discussion_r256084515
 
 

 ##
 File path: 
samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java
 ##
 @@ -141,6 +143,36 @@
*/
   public final AtomicInteger redundantNotifications = new AtomicInteger(0);
 
+  /**
+   * Number of container allocations that proceeded because they met standby 
container constraints.
+   */
+  public final AtomicInteger successfulStandbyAllocations = new 
AtomicInteger(0);
+
+  /**
+   * Number of container allocations that were dis-allowed because they
+   * did not meet standby container constraints.
+   */
+  public final AtomicInteger failedStandbyAllocations = new AtomicInteger(0);
+
+  /**
+   * Number of active container failovers initiated in which a standby 
container was found.
+   * If two standby containers had to selected for one failing active, it 
counts as two.
+   */
+  public final AtomicInteger failoversToStandby = new AtomicInteger(0);
+
+  /**
+   * Number of active container failovers initiated in which a standby 
container was NOT found.
+  */
+  public final AtomicInteger failoversToAnyHost = new AtomicInteger(0);
+
+  /**
+   * Number of stops of standby containers that completed.
+   */
+  public final AtomicInteger standbyStopsComplete = new AtomicInteger(0);
+
+  // Map of active containers that are in failover, indexed by the active 
container's resourceID (at the time of failure)
+  public final ConcurrentMap failovers = new 
ConcurrentHashMap(0);
 
 Review comment:
   move state that pertains to failover to a new class.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] vjagadish1989 commented on a change in pull request #903: SEP-19: Allocator changes for standby-aware container allocation, and active container failover

2019-02-12 Thread GitBox
vjagadish1989 commented on a change in pull request #903: SEP-19: Allocator 
changes for standby-aware container allocation, and active container failover
URL: https://github.com/apache/samza/pull/903#discussion_r255766345
 
 

 ##
 File path: 
samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java
 ##
 @@ -195,14 +195,25 @@ protected final SamzaResourceRequest 
peekPendingRequest() {
 return resourceRequestState.peekPendingRequest();
   }
 
+  /**
+   *  Method to handle re-requesting of resource for a given container, in 
cases where
+   *  a prior resource request for the container, expired or the launch failed 
(due to a startup-fail, or
+   *  standby constraints not being met).
+   * @param containerID Identifier of the container that will be run when a 
resource is allocated
+   *
+   */
+  public void requestResourceDueToLaunchFailOrExpiredRequest(String 
containerID) {
 
 Review comment:
   remove this method since it simply delegates to requestResource(containerID, 
ANY_HOST).. Instead can invoke requestResource(containerID, ANY_HOST) at the 
call-site directly


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] mynameborat commented on a change in pull request #915: Consolidating offset read and write for store-offsets and side-inputs, maintaining backward compatbility

2019-02-12 Thread GitBox
mynameborat commented on a change in pull request #915: Consolidating offset 
read and write for store-offsets and side-inputs, maintaining backward 
compatbility
URL: https://github.com/apache/samza/pull/915#discussion_r256068006
 
 

 ##
 File path: 
samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java
 ##
 @@ -129,26 +173,35 @@ public static boolean storeExists(File storeDir) {
* Read and return the contents of the offset file.
*
* @param storagePartitionDir the base directory of the store
-   * @param offsetFileName name of the offset file
* @return the content of the offset file if it exists for the store, null 
otherwise.
*/
-  public static String readOffsetFile(File storagePartitionDir, String 
offsetFileName) {
-String offset = null;
-File offsetFileRef = new File(storagePartitionDir, offsetFileName);
+  public static Map readOffsetFile(File 
storagePartitionDir, Set storeSSPs) {
+Map offsets = null;
+String fileContents;
+File offsetFileRef = new File(storagePartitionDir, OFFSET_FILE_NAME);
 String storePath = storagePartitionDir.getPath();
 
 if (offsetFileRef.exists()) {
   LOG.info("Found offset file in storage partition directory: {}", 
storePath);
   try {
-offset = FileUtil.readWithChecksum(offsetFileRef);
+fileContents = FileUtil.readWithChecksum(offsetFileRef);
+try {
+  offsets = OBJECT_MAPPER.readValue(fileContents, 
OFFSETS_TYPE_REFERENCE);
+} catch (JsonParseException | JsonMappingException e) {
+  LOG.info("Exception in json-parsing offset file {} {}, reading as 
string offset-value", storagePartitionDir.toPath(), OFFSET_FILE_NAME);
+  offsets = (storeSSPs.size() == 1) ? 
storeSSPs.stream().collect(Collectors.toMap(x -> x, y -> fileContents)) : null;
+} catch (IOException e) {
+  LOG.info("Exception in json-parsing offset file {} {}, reading as 
string offset-value", storagePartitionDir.toPath(), OFFSET_FILE_NAME);
+  offsets = (storeSSPs.size() == 1) ? 
storeSSPs.stream().collect(Collectors.toMap(x -> x, y -> fileContents)) : null;
 
 Review comment:
   If the contents are not proper json, wouldn't we get JsonParseException? I 
was referring to IOException block since that would mean either we couldn't 
read the file successfully or the file was not found. FileNotFoundException 
extends IOException.
   
   It is sufficient to have only two catch blocks one for parse exception and 
another to catch all other failure scenarios.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] mynameborat commented on a change in pull request #915: Consolidating offset read and write for store-offsets and side-inputs, maintaining backward compatbility

2019-02-12 Thread GitBox
mynameborat commented on a change in pull request #915: Consolidating offset 
read and write for store-offsets and side-inputs, maintaining backward 
compatbility
URL: https://github.com/apache/samza/pull/915#discussion_r256068006
 
 

 ##
 File path: 
samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java
 ##
 @@ -129,26 +173,35 @@ public static boolean storeExists(File storeDir) {
* Read and return the contents of the offset file.
*
* @param storagePartitionDir the base directory of the store
-   * @param offsetFileName name of the offset file
* @return the content of the offset file if it exists for the store, null 
otherwise.
*/
-  public static String readOffsetFile(File storagePartitionDir, String 
offsetFileName) {
-String offset = null;
-File offsetFileRef = new File(storagePartitionDir, offsetFileName);
+  public static Map readOffsetFile(File 
storagePartitionDir, Set storeSSPs) {
+Map offsets = null;
+String fileContents;
+File offsetFileRef = new File(storagePartitionDir, OFFSET_FILE_NAME);
 String storePath = storagePartitionDir.getPath();
 
 if (offsetFileRef.exists()) {
   LOG.info("Found offset file in storage partition directory: {}", 
storePath);
   try {
-offset = FileUtil.readWithChecksum(offsetFileRef);
+fileContents = FileUtil.readWithChecksum(offsetFileRef);
+try {
+  offsets = OBJECT_MAPPER.readValue(fileContents, 
OFFSETS_TYPE_REFERENCE);
+} catch (JsonParseException | JsonMappingException e) {
+  LOG.info("Exception in json-parsing offset file {} {}, reading as 
string offset-value", storagePartitionDir.toPath(), OFFSET_FILE_NAME);
+  offsets = (storeSSPs.size() == 1) ? 
storeSSPs.stream().collect(Collectors.toMap(x -> x, y -> fileContents)) : null;
+} catch (IOException e) {
+  LOG.info("Exception in json-parsing offset file {} {}, reading as 
string offset-value", storagePartitionDir.toPath(), OFFSET_FILE_NAME);
+  offsets = (storeSSPs.size() == 1) ? 
storeSSPs.stream().collect(Collectors.toMap(x -> x, y -> fileContents)) : null;
 
 Review comment:
   If the contents are not proper json, wouldn't we get JsonParseException? I 
was referring to IOException block since that would mean either we couldn't 
read the file successfully or the file was not found. FileNotFoundException 
extends IOException.
   
   It is sufficient to have only two catch blocks one for json parse failures 
and another to catch all other failure scenarios.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] rmatharu commented on a change in pull request #915: Consolidating offset read and write for store-offsets and side-inputs, maintaining backward compatbility

2019-02-12 Thread GitBox
rmatharu commented on a change in pull request #915: Consolidating offset read 
and write for store-offsets and side-inputs, maintaining backward compatbility
URL: https://github.com/apache/samza/pull/915#discussion_r256096861
 
 

 ##
 File path: 
samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java
 ##
 @@ -129,26 +173,35 @@ public static boolean storeExists(File storeDir) {
* Read and return the contents of the offset file.
*
* @param storagePartitionDir the base directory of the store
-   * @param offsetFileName name of the offset file
* @return the content of the offset file if it exists for the store, null 
otherwise.
*/
-  public static String readOffsetFile(File storagePartitionDir, String 
offsetFileName) {
-String offset = null;
-File offsetFileRef = new File(storagePartitionDir, offsetFileName);
+  public static Map readOffsetFile(File 
storagePartitionDir, Set storeSSPs) {
+Map offsets = null;
+String fileContents;
+File offsetFileRef = new File(storagePartitionDir, OFFSET_FILE_NAME);
 String storePath = storagePartitionDir.getPath();
 
 if (offsetFileRef.exists()) {
   LOG.info("Found offset file in storage partition directory: {}", 
storePath);
   try {
-offset = FileUtil.readWithChecksum(offsetFileRef);
+fileContents = FileUtil.readWithChecksum(offsetFileRef);
+try {
+  offsets = OBJECT_MAPPER.readValue(fileContents, 
OFFSETS_TYPE_REFERENCE);
+} catch (JsonParseException | JsonMappingException e) {
+  LOG.info("Exception in json-parsing offset file {} {}, reading as 
string offset-value", storagePartitionDir.toPath(), OFFSET_FILE_NAME);
+  offsets = (storeSSPs.size() == 1) ? 
storeSSPs.stream().collect(Collectors.toMap(x -> x, y -> fileContents)) : null;
+} catch (IOException e) {
+  LOG.info("Exception in json-parsing offset file {} {}, reading as 
string offset-value", storagePartitionDir.toPath(), OFFSET_FILE_NAME);
+  offsets = (storeSSPs.size() == 1) ? 
storeSSPs.stream().collect(Collectors.toMap(x -> x, y -> fileContents)) : null;
 
 Review comment:
   done


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] vjagadish1989 commented on a change in pull request #903: SEP-19: Allocator changes for standby-aware container allocation, and active container failover

2019-02-12 Thread GitBox
vjagadish1989 commented on a change in pull request #903: SEP-19: Allocator 
changes for standby-aware container allocation, and active container failover
URL: https://github.com/apache/samza/pull/903#discussion_r256072645
 
 

 ##
 File path: 
samza-core/src/main/java/org/apache/samza/clustermanager/HostAwareContainerAllocator.java
 ##
 @@ -81,12 +102,24 @@ public void assignResourceRequests()  {
 if (expired) {
   updateExpiryMetrics(request);
   if (resourceAvailableOnAnyHost) {
-log.info("Request for container: {} on {} has expired. Running on 
ANY_HOST", request.getContainerID(), request.getPreferredHost());
-runStreamProcessor(request, ResourceRequestState.ANY_HOST);
+// if standby is not enabled, request a anyhost request
+if (!new JobConfig(config).getStandbyTasksEnabled()) {
+  log.info("Request for container: {} on {} has expired. Running 
on ANY_HOST", request.getContainerID(),
+  request.getPreferredHost());
+  runStreamProcessor(request, ResourceRequestState.ANY_HOST);
+} else if (StandbyTaskUtil.isStandbyContainer(containerID)) {
+  // only standby resources can be on anyhost rightaway
+  checkStandbyTaskConstraintsAndRunStreamProcessor(request, 
ResourceRequestState.ANY_HOST,
+  peekAllocatedResource(ResourceRequestState.ANY_HOST), state);
+} else {
+  // re-requesting resource for active container
 
 Review comment:
   this looks a little bit complex. Is this else block needed - esp., when we 
are doing a similar action in L:120-122


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] vjagadish1989 commented on a change in pull request #903: SEP-19: Allocator changes for standby-aware container allocation, and active container failover

2019-02-12 Thread GitBox
vjagadish1989 commented on a change in pull request #903: SEP-19: Allocator 
changes for standby-aware container allocation, and active container failover
URL: https://github.com/apache/samza/pull/903#discussion_r255769800
 
 

 ##
 File path: 
samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java
 ##
 @@ -78,9 +79,10 @@
   public final AtomicInteger releasedContainers = new AtomicInteger(0);
 
   /**
-   * ContainerStatus of failed containers.
+   * ContainerStatuses of failed containers indexed by samzaContainerId.
+   * Written by the AMRMCallbackThread, read by the ContainerAllocator thread 
when performing active-standby container failover.
*/
-  public final ConcurrentMap 
failedContainersStatus = new ConcurrentHashMap();
+  public final ConcurrentMap> 
failedContainersStatus = new ConcurrentHashMap>();
 
 Review comment:
   Why do you need this - ConcurrentMap>.. It 
seems like the usage is to determine the physical-id corresponding to a given 
logical-id. If so, the existing data-structures should be sufficient?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] vjagadish1989 commented on a change in pull request #903: SEP-19: Allocator changes for standby-aware container allocation, and active container failover

2019-02-12 Thread GitBox
vjagadish1989 commented on a change in pull request #903: SEP-19: Allocator 
changes for standby-aware container allocation, and active container failover
URL: https://github.com/apache/samza/pull/903#discussion_r256083123
 
 

 ##
 File path: 
samza-core/src/main/java/org/apache/samza/clustermanager/HostAwareContainerAllocator.java
 ##
 @@ -119,4 +153,219 @@ private void updateExpiryMetrics(SamzaResourceRequest 
request) {
   state.expiredPreferredHostRequests.incrementAndGet();
 }
   }
-}
+
+  // Method to run a container on the given resource if it meets all standby 
constraints. If not, we re-request resource
+  // for the container (similar to the case when we re-request for a 
launch-fail or request expiry).
+  private boolean 
checkStandbyTaskConstraintsAndRunStreamProcessor(SamzaResourceRequest request, 
String preferredHost,
+  SamzaResource samzaResource, SamzaApplicationState state) {
+
+// If standby tasks are not enabled run streamprocessor and return true
+if (!new JobConfig(config).getStandbyTasksEnabled()) {
+  runStreamProcessor(request, preferredHost);
+  return true;
+}
+
+String containerID = request.getContainerID();
+
+if (checkStandbyConstraints(request, samzaResource, state)) {
+  // This resource can be used to launch this container
+  log.info("Running container {} on preferred host {} meets standby 
constraints, launching on {}", containerID,
+  preferredHost, samzaResource.getHost());
+  runStreamProcessor(request, preferredHost);
+  state.successfulStandbyAllocations.incrementAndGet();
+  return true;
+} else {
+  // This resource cannot be used to launch this container, so we treat it 
like a launch fail, and issue an ANY_HOST request
+  log.info("Running container {} on host {} does not meet standby 
constraints, cancelling resource request, releasing resource, and making a new 
ANY_HOST request",
+  containerID, samzaResource.getHost());
+  resourceRequestState.releaseUnstartableContainer(samzaResource, 
preferredHost);
+  resourceRequestState.cancelResourceRequest(request);
+  requestResourceDueToLaunchFailOrExpiredRequest(containerID);
+  state.failedStandbyAllocations.incrementAndGet();
+  return false;
+}
+  }
+
+  // Helper method to check if this SamzaResourceRequest for a container can 
be met on this resource, given standby
+  // container constraints, and the current set of pending and running 
containers
+  private boolean checkStandbyConstraints(SamzaResourceRequest request, 
SamzaResource samzaResource,
+  SamzaApplicationState samzaApplicationState) {
+String containerIDToStart = request.getContainerID();
+String host = samzaResource.getHost();
+List containerIDsForStandbyConstraints = 
this.standbyContainerConstraints.get(containerIDToStart);
+
+// Check if any of these conflicting containers are running/launching on 
host
+for (String containerID : containerIDsForStandbyConstraints) {
+  SamzaResource resource = 
samzaApplicationState.pendingContainers.get(containerID);
+
+  // return false if a conflicting container is pending for launch on the 
host
+  if (resource != null && resource.getHost().equals(host)) {
+log.info("Container {} cannot be started on host {} because container 
{} is already scheduled on this host",
+containerIDToStart, samzaResource.getHost(), containerID);
+return false;
+  }
+
+  // return false if a conflicting container is running on the host
+  resource = samzaApplicationState.runningContainers.get(containerID);
+  if (resource != null && resource.getHost().equals(host)) {
+log.info("Container {} cannot be started on host {} because container 
{} is already running on this host",
+containerIDToStart, samzaResource.getHost(), containerID);
+return false;
+  }
+}
+
+return true;
+  }
+
+  /**
+   * Intercept resource requests, which are due to either a launch-failure or 
resource-request expired or standby
+   * 1. a standby container, we proceed to make a anyhost request
+   * 2. an activeContainer, we try to fail-it-over to a standby
+   * @param containerID Identifier of the container that will be run when a 
resource is allocated
+   */
+  @Override
+  public void requestResourceDueToLaunchFailOrExpiredRequest(String 
containerID) {
+if (StandbyTaskUtil.isStandbyContainer(containerID)) {
+  log.info("Handling rerequesting for container {} using an any host 
request");
+  super.requestResource(containerID, ResourceRequestState.ANY_HOST); // 
proceed with a the anyhost request
+} else {
+  requestResource(containerID, ResourceRequestState.ANY_HOST); // invoke 
local method & select a new standby if possible
+}
+  }
+
+  // Intercept issuing of resource requests from the CPM
+  // 1. for ActiveContainers and instead choose a StandbyContainer to stop
+  // 2. for a 

[GitHub] vjagadish1989 commented on a change in pull request #903: SEP-19: Allocator changes for standby-aware container allocation, and active container failover

2019-02-12 Thread GitBox
vjagadish1989 commented on a change in pull request #903: SEP-19: Allocator 
changes for standby-aware container allocation, and active container failover
URL: https://github.com/apache/samza/pull/903#discussion_r254810012
 
 

 ##
 File path: 
samza-core/src/main/java/org/apache/samza/clustermanager/ClusterResourceManager.java
 ##
 @@ -122,6 +122,10 @@ public ClusterResourceManager(Callback callback) {
   public abstract void launchStreamProcessor(SamzaResource resource, 
CommandBuilder builder);
 
 
+  // Requests the stopping of a StreamProcessor, identified by the given 
resource
+  public abstract void stopStreamProcessor(SamzaResource resource);
+
+
   public abstract void stop(SamzaApplicationState.SamzaAppStatus status);
 
 Review comment:
   tie this interface with the onProcessorComplete() callback in the docs


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] shanthoosh merged pull request #874: SAMZA-2058: Integrate the input stream expansion aware SystemStreamGrouper to JobModel generation flow.

2019-02-12 Thread GitBox
shanthoosh merged pull request #874: SAMZA-2058: Integrate the input stream 
expansion aware SystemStreamGrouper to JobModel generation flow.
URL: https://github.com/apache/samza/pull/874
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] rmatharu commented on a change in pull request #915: Consolidating offset read and write for store-offsets and side-inputs, maintaining backward compatbility

2019-02-12 Thread GitBox
rmatharu commented on a change in pull request #915: Consolidating offset read 
and write for store-offsets and side-inputs, maintaining backward compatbility
URL: https://github.com/apache/samza/pull/915#discussion_r256213346
 
 

 ##
 File path: 
samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
 ##
 @@ -562,9 +566,14 @@ private boolean isLoggedStoreValid(String storeName, File 
loggedStoreDir) {
 (long) new 
StorageConfig(config).getChangeLogDeleteRetentionsInMs().get(storeName).get();
   }
 
+  // if the store has no changelogSSP, simply use null
+  SystemStreamPartition changelogSSP = null;
 
 Review comment:
   done


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


<    2   3   4   5   6   7   8   9   10   >