[GitHub] [samza] cameronlee314 commented on a change in pull request #947: SAMZA-2120: Enable custom handling of ConsumerRecords consumed by Kafka
cameronlee314 commented on a change in pull request #947: SAMZA-2120: Enable custom handling of ConsumerRecords consumed by Kafka URL: https://github.com/apache/samza/pull/947#discussion_r264368204 ## File path: samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java ## @@ -51,7 +53,7 @@ * This class is not thread safe. There will be only one instance of this class per KafkaSystemConsumer object. * We still need some synchronization around kafkaConsumer. See pollConsumer() method for details. */ -class KafkaConsumerProxy { +public class KafkaConsumerProxy { Review comment: The does get used for the kafka `Consumer` which is wired in. Ultimately, it doesn't get used since `IncomingMessageEnvelope` doesn't have type params, but it looks like it helps to ensure the wiring matches up for the proxy and Samza/Kafka consumer objects. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] shanthoosh opened a new pull request #949: Remove scala 2.10 version from documentation.
shanthoosh opened a new pull request #949: Remove scala 2.10 version from documentation. URL: https://github.com/apache/samza/pull/949 Post 1.0, samza does not support scala 2.10 version. This patch removes the references of the scala 2.10 version from the documentation. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] rmatharu commented on a change in pull request #941: SAMZA-2120: Enable custom handling of ConsumerRecords consumed by Kafka
rmatharu commented on a change in pull request #941: SAMZA-2120: Enable custom handling of ConsumerRecords consumed by Kafka URL: https://github.com/apache/samza/pull/941#discussion_r263553107 ## File path: samza-core/src/main/java/org/apache/samza/storage/TaskSideInputStorageManager.java ## @@ -74,7 +74,7 @@ private final SystemAdmins systemAdmins; private final TaskName taskName; private final TaskMode taskMode; - private final Map lastProcessedOffsets = new ConcurrentHashMap<>(); Review comment: Perhaps undo this change. Since this map actually stores the "last processed offset for the given side input" (which is used to emit a metric). SideInputs are only checkpointed locally (see writeOffsetFiles). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] rmatharu commented on a change in pull request #941: SAMZA-2120: Enable custom handling of ConsumerRecords consumed by Kafka
rmatharu commented on a change in pull request #941: SAMZA-2120: Enable custom handling of ConsumerRecords consumed by Kafka URL: https://github.com/apache/samza/pull/941#discussion_r263554894 ## File path: samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java ## @@ -611,10 +611,12 @@ public void run() { List callbacksToUpdate = callbackManager.updateCallback(callbackImpl); for (TaskCallbackImpl callbackToUpdate : callbacksToUpdate) { IncomingMessageEnvelope envelope = callbackToUpdate.envelope; - log.trace("Update offset for ssp {}, offset {}", envelope.getSystemStreamPartition(), envelope.getOffset()); + log.trace("Update offset for ssp {}, checkpoint offset {}", envelope.getSystemStreamPartition(), Review comment: log.trace("Update offset for ssp {}, offset {}, checkpoint offset {}", envelope.getSystemStreamPartition(), envelope.getOffset(), envelope.getCheckpointOffset()); This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] dxichen opened a new pull request #944: Release version updates
dxichen opened a new pull request #944: Release version updates URL: https://github.com/apache/samza/pull/944 Updated hard coded version for the upcoming release This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] xinyuiscool merged pull request #943: Update Samza version for 1.1.0 release branch
xinyuiscool merged pull request #943: Update Samza version for 1.1.0 release branch URL: https://github.com/apache/samza/pull/943 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] mynameborat commented on a change in pull request #942: Bugfix: Recent CSM refactor was causing some metrics to not be emitted. Fixed -restore-time metric.
mynameborat commented on a change in pull request #942: Bugfix: Recent CSM refactor was causing some metrics to not be emitted. Fixed -restore-time metric. URL: https://github.com/apache/samza/pull/942#discussion_r263637333 ## File path: samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java ## @@ -573,6 +576,25 @@ private StorageEngine createStore(String storeName, TaskName taskName, TaskModel return this.sideInputStorageManagers.values().stream().collect(Collectors.toSet()); } + /** + * Registers any CSM created metrics such as side-inputs related metrics, and standby-task related metrics. + * @param metricsReporters metrics reporters to use + */ + public void registerMetrics(Map metricsReporters) { Review comment: +1, we would need to change the `SystemConsumerMetrics` to take a configurable group name instead of using the default class name as the group id. I feel that change is much simpler compared to copying over the boiler plate code for metrics registration to CSM. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] cameronlee314 commented on a change in pull request #940: SAMZA-2121: Add checkpoint offset field to IncomingMessageEnvelope
cameronlee314 commented on a change in pull request #940: SAMZA-2121: Add checkpoint offset field to IncomingMessageEnvelope URL: https://github.com/apache/samza/pull/940#discussion_r263639549 ## File path: samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java ## @@ -86,7 +101,29 @@ public IncomingMessageEnvelope(SystemStreamPartition systemStreamPartition, Stri */ public IncomingMessageEnvelope(SystemStreamPartition systemStreamPartition, String offset, Object key, Object message, int size, long eventTime, long arrivalTime) { -this(systemStreamPartition, offset, key, message, size); +this(systemStreamPartition, offset, offset, key, message, size, eventTime, arrivalTime); + } + + /** + * Constructs a new IncomingMessageEnvelope from specified components + * @param systemStreamPartition The aggregate object representing the incoming stream name, the name of the cluster + * from which the stream came, and the partition of the stream from which the message was received. + * @param offset The offset in the partition that the message was received from. + * @param checkpointOffset offset that can be checkpointed when this {@link IncomingMessageEnvelope} is processed + * @param key A deserialized key received from the partition offset. + * @param message A deserialized message received from the partition offset. + * @param size size of the message and key in bytes. + * @param eventTime the timestamp (in epochMillis) of when this event happened + * @param arrivalTime the timestamp (in epochMillis) of when this event arrived to (i.e., was picked-up by) Samza + */ + public IncomingMessageEnvelope(SystemStreamPartition systemStreamPartition, String offset, String checkpointOffset, + Object key, Object message, int size, long eventTime, long arrivalTime) { +this.systemStreamPartition = systemStreamPartition; +this.offset = offset; +this.checkpointOffset = checkpointOffset; Review comment: 1. Some more details and an example are described at https://issues.apache.org/jira/browse/SAMZA-2120. 2. We could make this part of the internal API by creating an `InternalIncomingMessageEnvelope` which is passed around internally and extends `IncomingMessageEnvelope`. Would that help avoid complicating the public API? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] vjagadish1989 edited a comment on issue #905: SAMZA-2055: [WIP] Async high level api
vjagadish1989 edited a comment on issue #905: SAMZA-2055: [WIP] Async high level api URL: https://github.com/apache/samza/pull/905#issuecomment-470775503 also adding @xinyuiscool in case he has additional feedback on the async-api This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] dxichen opened a new pull request #943: Update Samza version for 1.1.0 release branch
dxichen opened a new pull request #943: Update Samza version for 1.1.0 release branch URL: https://github.com/apache/samza/pull/943 Change versions as per RELEASE.md This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] shanthoosh commented on a change in pull request #940: SAMZA-2121: Add checkpoint offset field to IncomingMessageEnvelope
shanthoosh commented on a change in pull request #940: SAMZA-2121: Add checkpoint offset field to IncomingMessageEnvelope URL: https://github.com/apache/samza/pull/940#discussion_r263628931 ## File path: samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java ## @@ -86,7 +101,29 @@ public IncomingMessageEnvelope(SystemStreamPartition systemStreamPartition, Stri */ public IncomingMessageEnvelope(SystemStreamPartition systemStreamPartition, String offset, Object key, Object message, int size, long eventTime, long arrivalTime) { -this(systemStreamPartition, offset, key, message, size); +this(systemStreamPartition, offset, offset, key, message, size, eventTime, arrivalTime); + } + + /** + * Constructs a new IncomingMessageEnvelope from specified components + * @param systemStreamPartition The aggregate object representing the incoming stream name, the name of the cluster + * from which the stream came, and the partition of the stream from which the message was received. + * @param offset The offset in the partition that the message was received from. + * @param checkpointOffset offset that can be checkpointed when this {@link IncomingMessageEnvelope} is processed + * @param key A deserialized key received from the partition offset. + * @param message A deserialized message received from the partition offset. + * @param size size of the message and key in bytes. + * @param eventTime the timestamp (in epochMillis) of when this event happened + * @param arrivalTime the timestamp (in epochMillis) of when this event arrived to (i.e., was picked-up by) Samza + */ + public IncomingMessageEnvelope(SystemStreamPartition systemStreamPartition, String offset, String checkpointOffset, + Object key, Object message, int size, long eventTime, long arrivalTime) { +this.systemStreamPartition = systemStreamPartition; +this.offset = offset; +this.checkpointOffset = checkpointOffset; Review comment: 1. Can you please share the rationale behind adding checkpoint offset to `IncomingMessageEnvelope` public API. 2. Can you please throw some light on the other options considered here? Is it not plausible to do this making this change in public API. IMHO, it'll be better not to proliferate the public API with these internal details which will be hard to remove later on. Anyone who reads this in the future, would wonder why there are two different offsets in `IncomingMessageEnvelope` contract. Is it possible to maintain the mapping(association we want for safe-checkpoint) internally within the samza framework side and expose it to users. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] shanthoosh commented on a change in pull request #940: SAMZA-2121: Add checkpoint offset field to IncomingMessageEnvelope
shanthoosh commented on a change in pull request #940: SAMZA-2121: Add checkpoint offset field to IncomingMessageEnvelope URL: https://github.com/apache/samza/pull/940#discussion_r263628931 ## File path: samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java ## @@ -86,7 +101,29 @@ public IncomingMessageEnvelope(SystemStreamPartition systemStreamPartition, Stri */ public IncomingMessageEnvelope(SystemStreamPartition systemStreamPartition, String offset, Object key, Object message, int size, long eventTime, long arrivalTime) { -this(systemStreamPartition, offset, key, message, size); +this(systemStreamPartition, offset, offset, key, message, size, eventTime, arrivalTime); + } + + /** + * Constructs a new IncomingMessageEnvelope from specified components + * @param systemStreamPartition The aggregate object representing the incoming stream name, the name of the cluster + * from which the stream came, and the partition of the stream from which the message was received. + * @param offset The offset in the partition that the message was received from. + * @param checkpointOffset offset that can be checkpointed when this {@link IncomingMessageEnvelope} is processed + * @param key A deserialized key received from the partition offset. + * @param message A deserialized message received from the partition offset. + * @param size size of the message and key in bytes. + * @param eventTime the timestamp (in epochMillis) of when this event happened + * @param arrivalTime the timestamp (in epochMillis) of when this event arrived to (i.e., was picked-up by) Samza + */ + public IncomingMessageEnvelope(SystemStreamPartition systemStreamPartition, String offset, String checkpointOffset, + Object key, Object message, int size, long eventTime, long arrivalTime) { +this.systemStreamPartition = systemStreamPartition; +this.offset = offset; +this.checkpointOffset = checkpointOffset; Review comment: 1. Can you please share the rationale behind adding checkpoint offset to `IncomingMessageEnvelope` public API. 2. Can you please throw some light on the other options considered here? Is it not plausible to do this making this change in public API. IMHO, it'll be better not to proliferate the public API with these internal details which will be hard to remove later on. Anyone who reads this in the future, would wonder why there are two different offsets in `IncomingMessageEnvelope` contract. Is it possible to maintain the mapping(association we want for safe-checkpoint) internally within the samza framework side and not expose it to samza-users. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] shanthoosh commented on a change in pull request #940: SAMZA-2121: Add checkpoint offset field to IncomingMessageEnvelope
shanthoosh commented on a change in pull request #940: SAMZA-2121: Add checkpoint offset field to IncomingMessageEnvelope URL: https://github.com/apache/samza/pull/940#discussion_r263628931 ## File path: samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java ## @@ -86,7 +101,29 @@ public IncomingMessageEnvelope(SystemStreamPartition systemStreamPartition, Stri */ public IncomingMessageEnvelope(SystemStreamPartition systemStreamPartition, String offset, Object key, Object message, int size, long eventTime, long arrivalTime) { -this(systemStreamPartition, offset, key, message, size); +this(systemStreamPartition, offset, offset, key, message, size, eventTime, arrivalTime); + } + + /** + * Constructs a new IncomingMessageEnvelope from specified components + * @param systemStreamPartition The aggregate object representing the incoming stream name, the name of the cluster + * from which the stream came, and the partition of the stream from which the message was received. + * @param offset The offset in the partition that the message was received from. + * @param checkpointOffset offset that can be checkpointed when this {@link IncomingMessageEnvelope} is processed + * @param key A deserialized key received from the partition offset. + * @param message A deserialized message received from the partition offset. + * @param size size of the message and key in bytes. + * @param eventTime the timestamp (in epochMillis) of when this event happened + * @param arrivalTime the timestamp (in epochMillis) of when this event arrived to (i.e., was picked-up by) Samza + */ + public IncomingMessageEnvelope(SystemStreamPartition systemStreamPartition, String offset, String checkpointOffset, + Object key, Object message, int size, long eventTime, long arrivalTime) { +this.systemStreamPartition = systemStreamPartition; +this.offset = offset; +this.checkpointOffset = checkpointOffset; Review comment: 1. Can you please share the rationale behind adding checkpoint offset to `IncomingMessageEnvelope` public API. 2. Can you please throw some light on the other options considered here? Is it not plausible to do this without making this change in public API. IMHO, it'll be better not to proliferate the public API with these internal details which will be hard to remove later on. Anyone who reads this in the future, would wonder why there are two different offsets in `IncomingMessageEnvelope` contract. Is it possible to maintain the mapping(association we want for safe-checkpoint) internally within the samza framework side and not expose it to samza-users. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] shanthoosh commented on a change in pull request #942: Bugfix: Recent CSM refactor was causing some metrics to not be emitted. Fixed -restore-time metric.
shanthoosh commented on a change in pull request #942: Bugfix: Recent CSM refactor was causing some metrics to not be emitted. Fixed -restore-time metric. URL: https://github.com/apache/samza/pull/942#discussion_r263647669 ## File path: samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java ## @@ -573,6 +576,25 @@ private StorageEngine createStore(String storeName, TaskName taskName, TaskModel return this.sideInputStorageManagers.values().stream().collect(Collectors.toSet()); } + /** + * Registers any CSM created metrics such as side-inputs related metrics, and standby-task related metrics. + * @param metricsReporters metrics reporters to use + */ + public void registerMetrics(Map metricsReporters) { Review comment: > SystemConsumerMetrics to take a configurable group name Alerting in ingraphs and auto-scaling rules at LinkedIn are defined based upon the samza generated metric names. Group name is prefix and initial part of the generated metric name. It will be better to factor these use-cases into account and maintain backwards compatibility when changing the metric names. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] lakshmi-manasa-g commented on a change in pull request #938: SAMZA-1531: Support run.id in standalone for batch processing.
lakshmi-manasa-g commented on a change in pull request #938: SAMZA-1531: Support run.id in standalone for batch processing. URL: https://github.com/apache/samza/pull/938#discussion_r263661119 ## File path: samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java ## @@ -280,4 +368,47 @@ private void setApplicationFinalStatus() { } } } + + + /** + * Defines a specific implementation of {@link CoordinationSessionListener} for local {@link CoordinationUtils} + */ + private final class LocalCoordinationSessionListener implements CoordinationSessionListener { + +/** + * If the coordination utils session has reconnected, check if global runid differs from local runid + * if it differs then shut down processor and throw exception + * else recreate ephemeral node corresponding to this processor inside the read write lock for runid + */ +@Override +public void handleReconnect() { + LOG.info("Reconnected to coordination utils"); + if(coordinationUtils == null) { +return; + } + DistributedDataAccess runIdAccess = coordinationUtils.getDataAccess(); + String globalRunId = (String) runIdAccess.readData(RUNID_PATH); + if( runId != globalRunId){ +processors.forEach(StreamProcessor::stop); +cleanup(); +appStatus = ApplicationStatus.UnsuccessfulFinish; +String msg = String.format("run.id %s on processor %s differs from the global run.id %s", runId, uid, globalRunId); +throw new SamzaException(msg); + } else if(runIdLock != null) { +String msg = String.format("Processor {} failed to get the lock for run.id", uid); +try { + // acquire lock to recreate active processor ephemeral node + DistributedReadWriteLock.AccessType lockAccess = runIdLock.lock(LOCK_TIMEOUT, LOCK_TIMEOUT_UNIT); + if(lockAccess == DistributedReadWriteLock.AccessType.WRITE || lockAccess == DistributedReadWriteLock.AccessType.READ) { +LOG.info("Processor {} creates its active processor ephemeral node in read write lock again" + uid); +runIdLock.unlock(); + } else { +throw new SamzaException(msg); + } +} catch (TimeoutException e) { + throw new SamzaException(msg, e); Review comment: Done. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] lakshmi-manasa-g commented on a change in pull request #938: SAMZA-1531: Support run.id in standalone for batch processing.
lakshmi-manasa-g commented on a change in pull request #938: SAMZA-1531: Support run.id in standalone for batch processing. URL: https://github.com/apache/samza/pull/938#discussion_r263661134 ## File path: samza-core/src/main/java/org/apache/samza/coordinator/CoordinationUtils.java ## @@ -39,6 +42,12 @@ DistributedLockWithState getLockWithState(String lockId); + DistributedReadWriteLock getReadWriteLock(); + + DistributedDataAccess getDataAccess(); + + void setCoordinationSessionListener(CoordinationSessionListener sessionListener); Review comment: done. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] lakshmi-manasa-g commented on a change in pull request #938: SAMZA-1531: Support run.id in standalone for batch processing.
lakshmi-manasa-g commented on a change in pull request #938: SAMZA-1531: Support run.id in standalone for batch processing. URL: https://github.com/apache/samza/pull/938#discussion_r26366 ## File path: samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java ## @@ -280,4 +368,47 @@ private void setApplicationFinalStatus() { } } } + + + /** + * Defines a specific implementation of {@link CoordinationSessionListener} for local {@link CoordinationUtils} + */ + private final class LocalCoordinationSessionListener implements CoordinationSessionListener { + +/** + * If the coordination utils session has reconnected, check if global runid differs from local runid + * if it differs then shut down processor and throw exception + * else recreate ephemeral node corresponding to this processor inside the read write lock for runid + */ +@Override +public void handleReconnect() { + LOG.info("Reconnected to coordination utils"); + if(coordinationUtils == null) { +return; + } + DistributedDataAccess runIdAccess = coordinationUtils.getDataAccess(); + String globalRunId = (String) runIdAccess.readData(RUNID_PATH); + if( runId != globalRunId){ +processors.forEach(StreamProcessor::stop); +cleanup(); +appStatus = ApplicationStatus.UnsuccessfulFinish; +String msg = String.format("run.id %s on processor %s differs from the global run.id %s", runId, uid, globalRunId); +throw new SamzaException(msg); Review comment: Done. doing shutdownLatch.countDown() also. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] lakshmi-manasa-g commented on a change in pull request #938: SAMZA-1531: Support run.id in standalone for batch processing.
lakshmi-manasa-g commented on a change in pull request #938: SAMZA-1531: Support run.id in standalone for batch processing. URL: https://github.com/apache/samza/pull/938#discussion_r263661126 ## File path: samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java ## @@ -280,4 +368,47 @@ private void setApplicationFinalStatus() { } } } + + + /** + * Defines a specific implementation of {@link CoordinationSessionListener} for local {@link CoordinationUtils} + */ + private final class LocalCoordinationSessionListener implements CoordinationSessionListener { + +/** + * If the coordination utils session has reconnected, check if global runid differs from local runid + * if it differs then shut down processor and throw exception + * else recreate ephemeral node corresponding to this processor inside the read write lock for runid + */ +@Override +public void handleReconnect() { + LOG.info("Reconnected to coordination utils"); + if(coordinationUtils == null) { +return; + } + DistributedDataAccess runIdAccess = coordinationUtils.getDataAccess(); + String globalRunId = (String) runIdAccess.readData(RUNID_PATH); + if( runId != globalRunId){ +processors.forEach(StreamProcessor::stop); +cleanup(); +appStatus = ApplicationStatus.UnsuccessfulFinish; +String msg = String.format("run.id %s on processor %s differs from the global run.id %s", runId, uid, globalRunId); +throw new SamzaException(msg); + } else if(runIdLock != null) { +String msg = String.format("Processor {} failed to get the lock for run.id", uid); +try { + // acquire lock to recreate active processor ephemeral node + DistributedReadWriteLock.AccessType lockAccess = runIdLock.lock(LOCK_TIMEOUT, LOCK_TIMEOUT_UNIT); + if(lockAccess == DistributedReadWriteLock.AccessType.WRITE || lockAccess == DistributedReadWriteLock.AccessType.READ) { Review comment: Session expiration does delete all the ephemeral nodes are cleaned up. But according to the ZkClient code, after an expiration two things can happen -- a successful reconnect in which case listener.handleNewSession is invoked or failure to reconnect in which case listener.handleSessionEstablishmentError is invoked. Handling both of these in LocalApplicationRunner now. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] lakshmi-manasa-g commented on a change in pull request #938: SAMZA-1531: Support run.id in standalone for batch processing.
lakshmi-manasa-g commented on a change in pull request #938: SAMZA-1531: Support run.id in standalone for batch processing. URL: https://github.com/apache/samza/pull/938#discussion_r263661144 ## File path: samza-core/src/main/java/org/apache/samza/coordinator/CoordinationUtils.java ## @@ -39,6 +42,12 @@ DistributedLockWithState getLockWithState(String lockId); + DistributedReadWriteLock getReadWriteLock(); Review comment: Added lock id. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] rmatharu commented on a change in pull request #942: Bugfix: Recent CSM refactor was causing some metrics to not be emitted. Fixed -restore-time metric.
rmatharu commented on a change in pull request #942: Bugfix: Recent CSM refactor was causing some metrics to not be emitted. Fixed -restore-time metric. URL: https://github.com/apache/samza/pull/942#discussion_r263657205 ## File path: samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java ## @@ -573,6 +576,25 @@ private StorageEngine createStore(String storeName, TaskName taskName, TaskModel return this.sideInputStorageManagers.values().stream().collect(Collectors.toSet()); } + /** + * Registers any CSM created metrics such as side-inputs related metrics, and standby-task related metrics. + * @param metricsReporters metrics reporters to use + */ + public void registerMetrics(Map metricsReporters) { Review comment: Done. It did not require a group-name change, so all existing metrics remain as is. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] sborya commented on a change in pull request #938: SAMZA-1531: Support run.id in standalone for batch processing.
sborya commented on a change in pull request #938: SAMZA-1531: Support run.id in standalone for batch processing. URL: https://github.com/apache/samza/pull/938#discussion_r264387433 ## File path: samza-azure/src/main/java/org/apache/samza/coordinator/AzureCoordinationUtils.java ## @@ -52,6 +52,16 @@ public DistributedLockWithState getLockWithState(String lockId) { return new AzureLock(blob); } + @Override + public DistributedReadWriteLock getReadWriteLock(String lockId) throws UnsupportedOperationException { Review comment: you need to throw the exception here (instead of return null) and you may remove 'throws UOE' from the method signature. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] sborya commented on a change in pull request #938: SAMZA-1531: Support run.id in standalone for batch processing.
sborya commented on a change in pull request #938: SAMZA-1531: Support run.id in standalone for batch processing. URL: https://github.com/apache/samza/pull/938#discussion_r264475902 ## File path: samza-core/src/main/java/org/apache/samza/zk/ZkDistributedReadWriteLock.java ## @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.zk; + +import java.util.List; +import java.util.Random; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import org.I0Itec.zkclient.IZkStateListener; +import org.apache.samza.SamzaException; +import org.apache.samza.coordinator.DistributedReadWriteLock; +import org.apache.zookeeper.Watcher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Distributed lock primitive for Zookeeper. + */ +public class ZkDistributedReadWriteLock implements DistributedReadWriteLock { + + public static final Logger LOG = LoggerFactory.getLogger(ZkDistributedReadWriteLock.class); + private static final String PARTICIPANTS_PATH = "participants"; + private static final String PROCESSORS_PATH = "processors"; + private final ZkUtils zkUtils; + private final String lockPath; + private final String particpantsPath; + private final String processorsPath; + private final String participantId; + private final ZkKeyBuilder keyBuilder; + private final Random random = new Random(); + private String activeParticipantPath = null; + private String activeProcessorPath = null; + private Object mutex; + private Boolean isInCriticalSection = false; + private Boolean isStateLost = false; + + public ZkDistributedReadWriteLock(String participantId, ZkUtils zkUtils, String lockId) { +if (zkUtils == null) { + throw new RuntimeException("Cannot operate ZkDistributedReadWriteLock without ZkUtils."); +} +this.zkUtils = zkUtils; +this.participantId = participantId; +this.keyBuilder = zkUtils.getKeyBuilder(); +lockPath = String.format("%s/readWriteLock-%s", keyBuilder.getRootPath(),lockId); +particpantsPath = String.format("%s/%s", lockPath, PARTICIPANTS_PATH); +processorsPath = String.format("%s/%s", lockPath, PROCESSORS_PATH); +zkUtils.validatePaths(new String[] {lockPath, particpantsPath, processorsPath}); +mutex = new Object(); +zkUtils.getZkClient().subscribeChildChanges(particpantsPath, new ParticipantChangeHandler(zkUtils)); +zkUtils.getZkClient().subscribeStateChanges(new ZkSessionStateChangedListener()); + } + + /** + * Tries to acquire a lock in order to generate run.id. On failure to acquire lock, it keeps trying until the lock times out. + * Creates a sequential ephemeral node under "participants" to acquire the lock. + * If the path of this node has the lowest sequence number, + * it creates a sequential ephemeral node under "processors" and checks the number of nodes under "processors" + * if there is only one node under "processors", a WRITE access lock is acquired + * else a READ access lock is acquired + * @param timeout Duration of lock acquiring timeout. + * @param unit Unit of the timeout defined above. + * @return AccessType.READ/WRITE if lock is acquired successfully, AccessType.NONE if it times out. + */ + @Override + public AccessType lock(long timeout, TimeUnit unit) + throws TimeoutException { + +activeParticipantPath = zkUtils.getZkClient().createEphemeralSequential(particpantsPath + "/", participantId); + +//Start timer for timeout +long startTime = System.currentTimeMillis(); +long lockTimeout = TimeUnit.MILLISECONDS.convert(timeout, unit); + +while((System.currentTimeMillis() - startTime) < lockTimeout) { + synchronized (mutex) { +AccessType accessType = checkAndAcquireLock(); +if(accessType != AccessType.NONE) { + isInCriticalSection = true; + if(isStateLost) { +throw new SamzaException("Lock's state lost due to connection expiry"); + } + return accessType; +} else { Review comment: when this happens? This is an automated message from the
[GitHub] [samza] sborya commented on a change in pull request #938: SAMZA-1531: Support run.id in standalone for batch processing.
sborya commented on a change in pull request #938: SAMZA-1531: Support run.id in standalone for batch processing. URL: https://github.com/apache/samza/pull/938#discussion_r264387509 ## File path: samza-azure/src/main/java/org/apache/samza/coordinator/AzureCoordinationUtils.java ## @@ -52,6 +52,16 @@ public DistributedLockWithState getLockWithState(String lockId) { return new AzureLock(blob); } + @Override + public DistributedReadWriteLock getReadWriteLock(String lockId) throws UnsupportedOperationException { +return null; + } + + @Override + public DistributedDataAccess getDataAccess(DistributedDataStateListener listener) throws UnsupportedOperationException { Review comment: see above. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] sborya commented on a change in pull request #938: SAMZA-1531: Support run.id in standalone for batch processing.
sborya commented on a change in pull request #938: SAMZA-1531: Support run.id in standalone for batch processing. URL: https://github.com/apache/samza/pull/938#discussion_r264473744 ## File path: samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java ## @@ -280,4 +376,68 @@ private void setApplicationFinalStatus() { } } } + + + /** + * Defines a specific implementation of {@link DistributedDataStateListener} for local {@link DistributedDataAccess} + */ + private final class LocalDistributedDataStateListener implements DistributedDataStateListener { + +/** + * upon reconnect check if global runid differs from local runid + */ +@Override +public void handleReconnect() { + if(coordinationUtils == null || runIdLock == null || runIdAccess == null ) { +LOG.warn("Stopping processor {} and shutting down due to failure reading global runid after reconnect", uid); Review comment: where is it reading the global runid? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] sborya commented on a change in pull request #938: SAMZA-1531: Support run.id in standalone for batch processing.
sborya commented on a change in pull request #938: SAMZA-1531: Support run.id in standalone for batch processing. URL: https://github.com/apache/samza/pull/938#discussion_r264468919 ## File path: samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java ## @@ -59,13 +67,23 @@ public class LocalApplicationRunner implements ApplicationRunner { private static final Logger LOG = LoggerFactory.getLogger(LocalApplicationRunner.class); + private static final String RUNID_PATH = "runId"; + private static final String APPLICATION_RUNNER_PATH_SUFFIX = "/ApplicationRunnerData"; + private static final String RUNID_LOCK_ID = "runId"; + private static final int LOCK_TIMEOUT = 10; + private static final TimeUnit LOCK_TIMEOUT_UNIT = TimeUnit.MINUTES; private final ApplicationDescriptorImpl appDesc; private final LocalJobPlanner planner; private final Set processors = ConcurrentHashMap.newKeySet(); private final CountDownLatch shutdownLatch = new CountDownLatch(1); private final AtomicInteger numProcessorsToStart = new AtomicInteger(); private final AtomicReference failure = new AtomicReference<>(); + private final String uid = UUID.randomUUID().toString(); + private CoordinationUtils coordinationUtils = null; Review comment: make final. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] sborya commented on a change in pull request #938: SAMZA-1531: Support run.id in standalone for batch processing.
sborya commented on a change in pull request #938: SAMZA-1531: Support run.id in standalone for batch processing. URL: https://github.com/apache/samza/pull/938#discussion_r264474066 ## File path: samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java ## @@ -280,4 +376,68 @@ private void setApplicationFinalStatus() { } } } + + + /** + * Defines a specific implementation of {@link DistributedDataStateListener} for local {@link DistributedDataAccess} + */ + private final class LocalDistributedDataStateListener implements DistributedDataStateListener { + +/** + * upon reconnect check if global runid differs from local runid + */ +@Override +public void handleReconnect() { + if(coordinationUtils == null || runIdLock == null || runIdAccess == null ) { +LOG.warn("Stopping processor {} and shutting down due to failure reading global runid after reconnect", uid); +stopProcessingAndShutDown(); +return; + } + try { +// acquire lock to write or read run.id +DistributedReadWriteLock.AccessType lockAccess = runIdLock.lock(LOCK_TIMEOUT, LOCK_TIMEOUT_UNIT); +if(lockAccess != DistributedReadWriteLock.AccessType.NONE) { + String globalRunId = (String) runIdAccess.readData(RUNID_PATH, new LocalDistributedDataWatcher()); + runIdLock.unlock(); + if(runId != globalRunId) { +LOG.warn("Stopping processor {} and shutting down as local runid {} differs from global runid {} after session reconnect.", uid, runId, +globalRunId); +stopProcessingAndShutDown(); + } +} else { + LOG.warn("Stopping processor {} and shutting down due to failure reading global runid after reconnect", uid); + stopProcessingAndShutDown(); +} + } catch (TimeoutException e) { +LOG.warn("Stopping processor {} and shutting down due to failure reading global runid after reconnect", uid); +stopProcessingAndShutDown(); + } +} + +@Override +public void handleReconnectFailedError() { + LOG.warn("Stopping processor {} and shutting down due to failure to reconnect", uid); + stopProcessingAndShutDown(); +} + } + + /** + * Defines a specific implementation of {@link DistributedDataWatcher} for local {@link DistributedDataAccess} + */ + private final class LocalDistributedDataWatcher implements DistributedDataWatcher { +@Override +public void handleDataChange(Object newData) { + if(runId != (String) newData) { Review comment: you should use equal() This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] sborya commented on a change in pull request #938: SAMZA-1531: Support run.id in standalone for batch processing.
sborya commented on a change in pull request #938: SAMZA-1531: Support run.id in standalone for batch processing. URL: https://github.com/apache/samza/pull/938#discussion_r264469347 ## File path: samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java ## @@ -77,23 +95,80 @@ */ public LocalApplicationRunner(SamzaApplication app, Config config) { this.appDesc = ApplicationDescriptorUtil.getAppDescriptor(app, config); -this.planner = new LocalJobPlanner(appDesc); +this.coordinationUtils = getCoordinationUtils(config); +getRunId(); +this.planner = new LocalJobPlanner(appDesc, coordinationUtils, uid, runId); } /** * Constructor only used in unit test to allow injection of {@link LocalJobPlanner} */ @VisibleForTesting - LocalApplicationRunner(ApplicationDescriptorImpl appDesc, LocalJobPlanner planner) { + LocalApplicationRunner(ApplicationDescriptorImpl appDesc, LocalJobPlanner planner, CoordinationUtils coordinationUtils) { this.appDesc = appDesc; this.planner = planner; +this.coordinationUtils = coordinationUtils; + } + + private CoordinationUtils getCoordinationUtils(Config config) { +JobCoordinatorConfig jcConfig = new JobCoordinatorConfig(config); +String coordinationId = new ApplicationConfig(config).getGlobalAppId() + APPLICATION_RUNNER_PATH_SUFFIX; +return jcConfig.getCoordinationUtilsFactory().getCoordinationUtils(coordinationId, uid, config); + } + + private void getRunId(){ +Boolean isAppModeBatch = new ApplicationConfig(appDesc.getConfig()).getAppMode() == ApplicationConfig.ApplicationMode.BATCH; +if(coordinationUtils == null || !isAppModeBatch) { + return; +} + +runIdLock = coordinationUtils.getReadWriteLock(RUNID_LOCK_ID); +if(runIdLock == null) { + LOG.warn("Processor {} failed to create the lock for run.id generation", uid); Review comment: better to find more specific info than UID, uid is not very helpful. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] sborya commented on a change in pull request #938: SAMZA-1531: Support run.id in standalone for batch processing.
sborya commented on a change in pull request #938: SAMZA-1531: Support run.id in standalone for batch processing. URL: https://github.com/apache/samza/pull/938#discussion_r264474872 ## File path: samza-core/src/main/java/org/apache/samza/zk/ZkDistributedDataAccess.java ## @@ -0,0 +1,61 @@ +package org.apache.samza.zk; + +import org.apache.samza.coordinator.DistributedDataAccess; +import org.apache.samza.coordinator.DistributedDataWatcher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class ZkDistributedDataAccess implements DistributedDataAccess { + public static final Logger LOG = LoggerFactory.getLogger(ZkDistributedDataAccess.class); + private final ZkUtils zkUtils; + private final ZkKeyBuilder keyBuilder; + private DistributedDataWatcher watcher; + + public ZkDistributedDataAccess(ZkUtils zkUtils) { +if (zkUtils == null) { + throw new RuntimeException("Cannot operate ZkDistributedDataAccess without ZkUtils."); +} +this.zkUtils = zkUtils; +this.keyBuilder = zkUtils.getKeyBuilder(); + } + + public Object readData(String key, DistributedDataWatcher watcher) { +this.watcher = watcher; +String absolutePath = keyBuilder.getRootPath() + "/" + key; +zkUtils.getZkClient().subscribeDataChanges(absolutePath, new ZkDistributedDataChangeHandler(zkUtils)); +if(!zkUtils.getZkClient().exists(absolutePath)) { + return null; +} +return zkUtils.getZkClient().readData(absolutePath); + } + + public void writeData(String key, Object data, DistributedDataWatcher watcher) { +this.watcher = watcher; +String absolutePath = keyBuilder.getRootPath() + "/" + key; +zkUtils.getZkClient().subscribeDataChanges(absolutePath, new ZkDistributedDataChangeHandler(zkUtils)); +zkUtils.validatePaths(new String[]{absolutePath}); +zkUtils.writeData(absolutePath, data); +return; + } + + class ZkDistributedDataChangeHandler extends ZkUtils.GenerationAwareZkDataListener { + +public ZkDistributedDataChangeHandler(ZkUtils zkUtils) { + super(zkUtils, "ZkDistributedDataChangeHandler"); +} + +/** + * Invoked when there is a change to the data z-node. Review comment: nit. extra space. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] sborya commented on a change in pull request #938: SAMZA-1531: Support run.id in standalone for batch processing.
sborya commented on a change in pull request #938: SAMZA-1531: Support run.id in standalone for batch processing. URL: https://github.com/apache/samza/pull/938#discussion_r264474987 ## File path: samza-core/src/main/java/org/apache/samza/zk/ZkDistributedReadWriteLock.java ## @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.zk; + +import java.util.List; +import java.util.Random; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import org.I0Itec.zkclient.IZkStateListener; +import org.apache.samza.SamzaException; +import org.apache.samza.coordinator.DistributedReadWriteLock; +import org.apache.zookeeper.Watcher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Distributed lock primitive for Zookeeper. + */ +public class ZkDistributedReadWriteLock implements DistributedReadWriteLock { + + public static final Logger LOG = LoggerFactory.getLogger(ZkDistributedReadWriteLock.class); + private static final String PARTICIPANTS_PATH = "participants"; + private static final String PROCESSORS_PATH = "processors"; + private final ZkUtils zkUtils; + private final String lockPath; + private final String particpantsPath; + private final String processorsPath; + private final String participantId; + private final ZkKeyBuilder keyBuilder; + private final Random random = new Random(); + private String activeParticipantPath = null; + private String activeProcessorPath = null; + private Object mutex; Review comment: final This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] sborya commented on a change in pull request #938: SAMZA-1531: Support run.id in standalone for batch processing.
sborya commented on a change in pull request #938: SAMZA-1531: Support run.id in standalone for batch processing. URL: https://github.com/apache/samza/pull/938#discussion_r264414547 ## File path: samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java ## @@ -59,13 +67,23 @@ public class LocalApplicationRunner implements ApplicationRunner { private static final Logger LOG = LoggerFactory.getLogger(LocalApplicationRunner.class); + private static final String RUNID_PATH = "runId"; + private static final String APPLICATION_RUNNER_PATH_SUFFIX = "/ApplicationRunnerData"; + private static final String RUNID_LOCK_ID = "runId"; + private static final int LOCK_TIMEOUT = 10; Review comment: I prefer LOCK_TIMEOUT_MS here and use TimeUnit.MINUTES directly (no need for constant there) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] sborya commented on a change in pull request #938: SAMZA-1531: Support run.id in standalone for batch processing.
sborya commented on a change in pull request #938: SAMZA-1531: Support run.id in standalone for batch processing. URL: https://github.com/apache/samza/pull/938#discussion_r264474790 ## File path: samza-core/src/main/java/org/apache/samza/zk/ZkDistributedDataAccess.java ## @@ -0,0 +1,61 @@ +package org.apache.samza.zk; + +import org.apache.samza.coordinator.DistributedDataAccess; +import org.apache.samza.coordinator.DistributedDataWatcher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class ZkDistributedDataAccess implements DistributedDataAccess { + public static final Logger LOG = LoggerFactory.getLogger(ZkDistributedDataAccess.class); + private final ZkUtils zkUtils; + private final ZkKeyBuilder keyBuilder; + private DistributedDataWatcher watcher; + + public ZkDistributedDataAccess(ZkUtils zkUtils) { +if (zkUtils == null) { + throw new RuntimeException("Cannot operate ZkDistributedDataAccess without ZkUtils."); +} +this.zkUtils = zkUtils; +this.keyBuilder = zkUtils.getKeyBuilder(); + } + + public Object readData(String key, DistributedDataWatcher watcher) { Review comment: Won't watcher override the one set in writeData? If they are the same (or only one should be there) then make it final and pass it in the constructor. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] sborya commented on a change in pull request #938: SAMZA-1531: Support run.id in standalone for batch processing.
sborya commented on a change in pull request #938: SAMZA-1531: Support run.id in standalone for batch processing. URL: https://github.com/apache/samza/pull/938#discussion_r264472956 ## File path: samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java ## @@ -200,6 +279,22 @@ static String createProcessorId(ApplicationConfig appConfig) { } } + private void cleanup() { +if(runIdLock != null) { + runIdLock.cleanState(); +} +if(coordinationUtils != null) { + coordinationUtils.close(); +} + } + + private void stopProcessingAndShutDown() { +processors.forEach(StreamProcessor::stop); +cleanup(); +appStatus = ApplicationStatus.UnsuccessfulFinish; Review comment: qq. why is it Unsuccessful? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] sborya commented on a change in pull request #938: SAMZA-1531: Support run.id in standalone for batch processing.
sborya commented on a change in pull request #938: SAMZA-1531: Support run.id in standalone for batch processing. URL: https://github.com/apache/samza/pull/938#discussion_r264469001 ## File path: samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java ## @@ -59,13 +67,23 @@ public class LocalApplicationRunner implements ApplicationRunner { private static final Logger LOG = LoggerFactory.getLogger(LocalApplicationRunner.class); + private static final String RUNID_PATH = "runId"; + private static final String APPLICATION_RUNNER_PATH_SUFFIX = "/ApplicationRunnerData"; + private static final String RUNID_LOCK_ID = "runId"; + private static final int LOCK_TIMEOUT = 10; + private static final TimeUnit LOCK_TIMEOUT_UNIT = TimeUnit.MINUTES; private final ApplicationDescriptorImpl appDesc; private final LocalJobPlanner planner; private final Set processors = ConcurrentHashMap.newKeySet(); private final CountDownLatch shutdownLatch = new CountDownLatch(1); private final AtomicInteger numProcessorsToStart = new AtomicInteger(); private final AtomicReference failure = new AtomicReference<>(); + private final String uid = UUID.randomUUID().toString(); + private CoordinationUtils coordinationUtils = null; + private DistributedReadWriteLock runIdLock = null; + private DistributedDataAccess runIdAccess = null; + private String runId = null; Review comment: make final. Assign null if needed in the constructor. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] sborya commented on a change in pull request #938: SAMZA-1531: Support run.id in standalone for batch processing.
sborya commented on a change in pull request #938: SAMZA-1531: Support run.id in standalone for batch processing. URL: https://github.com/apache/samza/pull/938#discussion_r264388397 ## File path: samza-core/src/main/java/org/apache/samza/coordinator/DistributedDataStateListener.java ## @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.coordinator; + +/** + * Listen to changes in state of distributed data connection + */ +public interface DistributedDataStateListener { + /** + * Called when the connected has reconnected after a disconnect. Review comment: 'connected'? please rephrase. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] sborya commented on a change in pull request #938: SAMZA-1531: Support run.id in standalone for batch processing.
sborya commented on a change in pull request #938: SAMZA-1531: Support run.id in standalone for batch processing. URL: https://github.com/apache/samza/pull/938#discussion_r264470992 ## File path: samza-core/src/main/java/org/apache/samza/zk/ZkDistributedReadWriteLock.java ## @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.zk; + +import java.util.List; +import java.util.Random; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import org.I0Itec.zkclient.IZkStateListener; +import org.apache.samza.SamzaException; +import org.apache.samza.coordinator.DistributedReadWriteLock; +import org.apache.zookeeper.Watcher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Distributed lock primitive for Zookeeper. + */ +public class ZkDistributedReadWriteLock implements DistributedReadWriteLock { Review comment: Is this a new code or moved from somewhere? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] sborya commented on a change in pull request #938: SAMZA-1531: Support run.id in standalone for batch processing.
sborya commented on a change in pull request #938: SAMZA-1531: Support run.id in standalone for batch processing. URL: https://github.com/apache/samza/pull/938#discussion_r264467907 ## File path: samza-core/src/main/java/org/apache/samza/execution/LocalJobPlanner.java ## @@ -119,7 +119,12 @@ private void createStreams(String planId, List intStreams, StreamMan return; } -DistributedLockWithState lockWithState = coordinationUtils.getLockWithState(planId); +Boolean isAppModeBatch = new ApplicationConfig(userConfig).getAppMode() == ApplicationConfig.ApplicationMode.BATCH; +String lockId = planId; +if(isAppModeBatch && runId != null) { Review comment: add comment This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] dnishimura opened a new pull request #964: SAMZA-2138: Incorrect logging when task to container mapping is written.
dnishimura opened a new pull request #964: SAMZA-2138: Incorrect logging when task to container mapping is written. URL: https://github.com/apache/samza/pull/964 Example before this fix: ``` 2019-03-21 08:36:34.810 [main] JobModelManager$ [INFO] Storing ssp: Partition 1 and task: 0 into metadata store ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] dnishimura commented on issue #964: SAMZA-2138: Incorrect logging when task to container mapping is written.
dnishimura commented on issue #964: SAMZA-2138: Incorrect logging when task to container mapping is written. URL: https://github.com/apache/samza/pull/964#issuecomment-475287206 @shanthoosh please take a look. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] mynameborat commented on a change in pull request #951: SAMZA-2127: Upgrade to Kafka 2.0
mynameborat commented on a change in pull request #951: SAMZA-2127: Upgrade to Kafka 2.0 URL: https://github.com/apache/samza/pull/951#discussion_r267873046 ## File path: samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala ## @@ -301,12 +260,6 @@ class TestKafkaSystemAdmin { val spec = StreamSpec.createCoordinatorStreamSpec(topic, "kafka") systemAdmin.createStream(spec) validateTopic(topic, 1) -val topicMetadataMap = TopicMetadataCache.getTopicMetadata(Set(topic), "kafka", metadataStore.getTopicInfo) -assertTrue(topicMetadataMap.contains(topic)) -val topicMetadata = topicMetadataMap(topic) -val partitionMetadata = topicMetadata.partitionsMetadata.head -assertEquals(0, partitionMetadata.partitionId) -assertEquals(3, partitionMetadata.replicas.size) Review comment: Agreed. Let me do the Mockito way This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] cameronlee314 commented on a change in pull request #951: SAMZA-2127: Upgrade to Kafka 2.0
cameronlee314 commented on a change in pull request #951: SAMZA-2127: Upgrade to Kafka 2.0 URL: https://github.com/apache/samza/pull/951#discussion_r267885428 ## File path: samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala ## @@ -133,43 +130,36 @@ object StreamTaskTestUtil { config.put(ProducerConfig.LINGER_MS_CONFIG, "0") val producerConfig = new KafkaProducerConfig("kafka", "i001", config) +adminClient = AdminClient.create(config.asInstanceOf[util.Map[String, Object]]) producer = new KafkaProducer[Array[Byte], Array[Byte]](producerConfig.getProducerProperties) createTopics validateTopics } def createTopics { -AdminUtils.createTopic( - zkUtils, - INPUT_TOPIC, - TOTAL_TASK_NAMES, - REPLICATION_FACTOR) +adminClient.createTopics(Collections.singleton(new NewTopic(INPUT_TOPIC, TOTAL_TASK_NAMES, REPLICATION_FACTOR.shortValue( } def validateTopics { Review comment: Not sure if this is feasible/practical to combine, but it looks like some of this code is similar to code from `TestKafkaSystemAdmin`, such as `validateTopics`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] cameronlee314 commented on a change in pull request #951: SAMZA-2127: Upgrade to Kafka 2.0
cameronlee314 commented on a change in pull request #951: SAMZA-2127: Upgrade to Kafka 2.0 URL: https://github.com/apache/samza/pull/951#discussion_r267876940 ## File path: samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala ## @@ -89,49 +91,49 @@ object TestKafkaSystemAdmin extends KafkaServerTestHarness { @AfterClass override def tearDown() { +adminClient.close() systemAdmin.stop() producer.close() super.tearDown() } def createTopic(topicName: String, partitionCount: Int) { -createTopic(topicName, partitionCount, REPLICATION_FACTOR) +val topicCreationFutures = adminClient.createTopics(Collections.singletonList(new NewTopic(topicName, partitionCount, REPLICATION_FACTOR.shortValue( +topicCreationFutures.all().get() // wait on the future to make sure create topic call finishes } def validateTopic(topic: String, expectedPartitionCount: Int) { var done = false var retries = 0 -while (!done && retries < 100) { +while (!done && retries < 10) { try { - if (!zkClient.topicExists(topic)) { -System.err.println("Test topic %s not found. Waiting and retrying." format topic) -retries += 1 -Thread.sleep(500) - } - - val topicDescription = - adminClient.describeTopics(JavaConverters.asJavaCollectionConverter(Set(topic)).asJavaCollection) - .all().get().get(topic) +val topicDescriptionFutures = adminClient.describeTopics( + JavaConverters.asJavaCollectionConverter(Set(topic)).asJavaCollection).all() +val topicDescription = topicDescriptionFutures.get(500, TimeUnit.MILLISECONDS) +.get(topic) done = expectedPartitionCount == topicDescription.partitions().size() Review comment: 1. Can `topicDescription` be null? 2. If done is true, then can we break out of this loop? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] cameronlee314 commented on a change in pull request #951: SAMZA-2127: Upgrade to Kafka 2.0
cameronlee314 commented on a change in pull request #951: SAMZA-2127: Upgrade to Kafka 2.0 URL: https://github.com/apache/samza/pull/951#discussion_r267882631 ## File path: samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java ## @@ -159,42 +152,25 @@ public void setUp() { zkUtils = new ZkUtils(zkKeyBuilder, zkClient, ZK_CONNECTION_TIMEOUT_MS, ZK_SESSION_TIMEOUT_MS, new NoOpMetricsRegistry()); zkUtils.connect(); -for (String kafkaTopic : ImmutableList.of(inputKafkaTopic, outputKafkaTopic)) { - LOGGER.info("Creating kafka topic: {}.", kafkaTopic); - TestUtils.createTopic(kafkaZkClient(), kafkaTopic, 5, 1, servers(), new Properties()); - if (AdminUtils.topicExists(zkUtils(), kafkaTopic)) { -LOGGER.info("Topic: {} was created", kafkaTopic); - } else { -Assert.fail(String.format("Unable to create kafka topic: %s.", kafkaTopic)); - } -} -for (String kafkaTopic : ImmutableList.of(inputSinglePartitionKafkaTopic, outputSinglePartitionKafkaTopic)) { - LOGGER.info("Creating kafka topic: {}.", kafkaTopic); - TestUtils.createTopic(kafkaZkClient(), kafkaTopic, 1, 1, servers(), new Properties()); - if (AdminUtils.topicExists(zkUtils(), kafkaTopic)) { -LOGGER.info("Topic: {} was created", kafkaTopic); - } else { -Assert.fail(String.format("Unable to create kafka topic: %s.", kafkaTopic)); - } -} +List newTopics = +ImmutableList.of(inputKafkaTopic, outputKafkaTopic, inputSinglePartitionKafkaTopic, outputSinglePartitionKafkaTopic) +.stream() +.map(topic -> new NewTopic(topic, 5, (short) 1)) +.collect(Collectors.toList()); + +createTopics(newTopics); Review comment: Should this fail if `createTopics` returns false? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] cameronlee314 commented on a change in pull request #951: SAMZA-2127: Upgrade to Kafka 2.0
cameronlee314 commented on a change in pull request #951: SAMZA-2127: Upgrade to Kafka 2.0 URL: https://github.com/apache/samza/pull/951#discussion_r267881671 ## File path: samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTestHarness.java ## @@ -295,4 +227,20 @@ public Config getConfig() { return this.config; } } + + @Override + protected KafkaProducer createProducer() { Review comment: It seems a bit odd that this test harness needs slightly different serialization/deserialization for createProducer/createConsumer compared to `IntegrationTestHarness`. If it is difficult to make them consistent, could you please put a comment about why you have these overrides? You could also consider having helpers in `IntegrationTestHarness` to build the configs, just in case someone needs to add something in the future. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] dnishimura commented on issue #964: SAMZA-2138: Incorrect logging when task to container mapping is written.
dnishimura commented on issue #964: SAMZA-2138: Incorrect logging when task to container mapping is written. URL: https://github.com/apache/samza/pull/964#issuecomment-475401665 It happened to me before. Not sure why. Thanks for merging it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] shanthoosh merged pull request #964: SAMZA-2138: Incorrect logging when task to container mapping is written.
shanthoosh merged pull request #964: SAMZA-2138: Incorrect logging when task to container mapping is written. URL: https://github.com/apache/samza/pull/964 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] shanthoosh edited a comment on issue #964: SAMZA-2138: Incorrect logging when task to container mapping is written.
shanthoosh edited a comment on issue #964: SAMZA-2138: Incorrect logging when task to container mapping is written. URL: https://github.com/apache/samza/pull/964#issuecomment-475401240 @dnishimura This build is green in travis. Here's the successful travis build associated with this PR: https://travis-ci.org/apache/samza/builds/509502538?utm_source=github_status_medium=notification . However, it is not updated here in Github. I'm merging this PR to trunk. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] rmatharu commented on issue #952: Improved standby-aware container allocation for active-containers on job redeploys
rmatharu commented on issue #952: Improved standby-aware container allocation for active-containers on job redeploys URL: https://github.com/apache/samza/pull/952#issuecomment-475692992 Addressed all comments. thumbsup => comment addressed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] vjagadish1989 opened a new pull request #965: Fix a typo in LinkedIn case-study
vjagadish1989 opened a new pull request #965: Fix a typo in LinkedIn case-study URL: https://github.com/apache/samza/pull/965 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] asfgit closed pull request #965: Minor: Fix a typo in LinkedIn case-study
asfgit closed pull request #965: Minor: Fix a typo in LinkedIn case-study URL: https://github.com/apache/samza/pull/965 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] cameronlee314 commented on a change in pull request #951: SAMZA-2127: Upgrade to Kafka 2.0
cameronlee314 commented on a change in pull request #951: SAMZA-2127: Upgrade to Kafka 2.0 URL: https://github.com/apache/samza/pull/951#discussion_r267871144 ## File path: samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala ## @@ -301,12 +260,6 @@ class TestKafkaSystemAdmin { val spec = StreamSpec.createCoordinatorStreamSpec(topic, "kafka") systemAdmin.createStream(spec) validateTopic(topic, 1) -val topicMetadataMap = TopicMetadataCache.getTopicMetadata(Set(topic), "kafka", metadataStore.getTopicInfo) -assertTrue(topicMetadataMap.contains(topic)) -val topicMetadata = topicMetadataMap(topic) -val partitionMetadata = topicMetadata.partitionsMetadata.head -assertEquals(0, partitionMetadata.partitionId) -assertEquals(3, partitionMetadata.replicas.size) Review comment: The end-to-end check seems unnecessary anyways for a unit test. However, since you are removing that check, it might be good to have a test which uses Mockito to verify that the correct configurations are passed to the admin client when creating topics. If I recall correctly, in the past, we may have had some issues which involved replication factors (not sure if they were this exact flow though). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] dnishimura commented on issue #964: SAMZA-2138: Incorrect logging when task to container mapping is written.
dnishimura commented on issue #964: SAMZA-2138: Incorrect logging when task to container mapping is written. URL: https://github.com/apache/samza/pull/964#issuecomment-475350150 Thanks @shanthoosh. Please merge when you have some time. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] rmatharu commented on a change in pull request #952: Improved standby-aware container allocation for active-containers on job redeploys
rmatharu commented on a change in pull request #952: Improved standby-aware container allocation for active-containers on job redeploys URL: https://github.com/apache/samza/pull/952#discussion_r268030497 ## File path: samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java ## @@ -226,15 +244,35 @@ private void initiateActiveContainerFailover(String containerID, String resource // use this standby if there was no previous failover for which this standbyResource was used if (!(failoverMetadata.isPresent() && failoverMetadata.get().isStandbyResourceUsed(standbyContainerResource.getResourceID( { - log.info("Returning standby container {} in running state for active container {}", standbyContainerID, - activeContainerID); - return Optional.of(new Entry<>(standbyContainerID, standbyContainerResource)); + log.info("Returning standby container {} in running state on host {} for active container {}", standbyContainerID, standbyContainerResource.getHost(), activeContainerID); + return standbyContainerResource.getHost(); } } } - log.info("Did not find any running standby container for active container {}", activeContainerID); -return Optional.empty(); + +// Now, we iterate over the list of last-known standbyHosts to check if anyone of them has not already been tried +Map containerToHostMapping = this.samzaApplicationState.jobModelManager.jobModel().getAllContainerLocality(); + +for (String standbyContainerID : this.standbyContainerConstraints.get(activeContainerID)) { + String standbyHost = containerToHostMapping.get(standbyContainerID); + + // if there is no last-known host for the standby, continue + if (standbyHost == null) { +continue; Review comment: i agree, simplified it a bit. but we cant return any-host only once we're done iterating over all standbyContainerIDs. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] rmatharu commented on a change in pull request #952: Improved standby-aware container allocation for active-containers on job redeploys
rmatharu commented on a change in pull request #952: Improved standby-aware container allocation for active-containers on job redeploys URL: https://github.com/apache/samza/pull/952#discussion_r268030578 ## File path: samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java ## @@ -161,54 +161,72 @@ private void handleStandbyContainerStop(String standbyContainerID, String resour } } - /** Method to handle failover for an active container. - * We try to find a standby for the active container, and issue a stop on it. - * If we do not find a standby container, we simply issue an anyhost request to place it. + /** Method to handle standby-aware allocation for an active container. + * We try to find a standby host for the active container, and issue a stop on any standby-containers running on it, + * request resource to place the active on the standby's host, and one to place the standby elsewhere. * - * @param containerID the samzaContainerID of the active-container + * @param activeContainerID the samzaContainerID of the active-container * @param resourceID the samza-resource-ID of the container when it failed (used to index failover-state) */ - private void initiateActiveContainerFailover(String containerID, String resourceID, + private void initiateStandbyAwareAllocation(String activeContainerID, String resourceID, AbstractContainerAllocator containerAllocator) { -Optional> standbyContainer = this.selectStandby(containerID, resourceID); +String standbyHost = this.selectStandbyHost(activeContainerID, resourceID); -// If we find a standbyContainer, we initiate a failover -if (standbyContainer.isPresent()) { +// Check if there is a running standby-container on that host that needs to be stopped +List standbyContainers = this.standbyContainerConstraints.get(activeContainerID); +Map runningStandbyContainersOnHost = this.samzaApplicationState.runningContainers.entrySet().stream().filter(x -> standbyContainers.contains(x.getKey())) +.filter(x -> x.getValue().getHost().equals(standbyHost)).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); - String standbyContainerId = standbyContainer.get().getKey(); - SamzaResource standbyResource = standbyContainer.get().getValue(); - String standbyResourceID = standbyResource.getResourceID(); - String standbyHost = standbyResource.getHost(); +// if the standbyHost returned is anyhost, we proceed with the request directly +if (standbyHost.equals(ResourceRequestState.ANY_HOST)) { + log.info("No standby container found for active container {}, making a resource-request for placing {} on {}", activeContainerID, activeContainerID, ResourceRequestState.ANY_HOST); + samzaApplicationState.failoversToAnyHost.incrementAndGet(); + containerAllocator.requestResource(activeContainerID, ResourceRequestState.ANY_HOST); + +} else if (runningStandbyContainersOnHost.isEmpty()) { + // if there are no running standby-containers on the standbyHost, we proceed to directly to make a resource request - // update the state - FailoverMetadata failoverMetadata = this.registerActiveContainerFailure(containerID, resourceID); - failoverMetadata.updateStandbyContainer(standbyResourceID, standbyHost); + log.info("No running standby container to stop on host {}, making a resource-request for placing {} on {}", standbyHost, activeContainerID, standbyHost); + FailoverMetadata failoverMetadata = this.registerActiveContainerFailure(activeContainerID, resourceID); - log.info("Initiating failover and stopping standby container, found standbyContainer {} = resource {}, " - + "for active container {}", standbyContainerId, standbyResourceID, containerID); + // record the resource request, before issuing it to avoid race with allocation-thread + SamzaResourceRequest resourceRequestForActive = containerAllocator.getResourceRequest(activeContainerID, standbyHost); + failoverMetadata.recordResourceRequest(resourceRequestForActive); + containerAllocator.issueResourceRequest(resourceRequestForActive); Review comment: i need to recordResource requests, to handle expired requests, so i create one and then use issueResourceRequest to issue it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] mynameborat commented on a change in pull request #955: SAMZA-2131: [Scala cleanup] Convert FileSystemCheckpointManagerConfig.scala and SystemConfig.scala to Java
mynameborat commented on a change in pull request #955: SAMZA-2131: [Scala cleanup] Convert FileSystemCheckpointManagerConfig.scala and SystemConfig.scala to Java URL: https://github.com/apache/samza/pull/955#discussion_r267968434 ## File path: samza-core/src/main/java/org/apache/samza/config/FileSystemCheckpointManagerConfig.java ## @@ -16,16 +16,22 @@ * specific language governing permissions and limitations * under the License. */ +package org.apache.samza.config; -package org.apache.samza.config +import java.util.Optional; -object FileSystemCheckpointManagerConfig { - // file system checkpoint manager config constants - val CHECKPOINT_MANAGER_ROOT = "task.checkpoint.path" // system name to use when sending offset checkpoints - implicit def Config2FSCP(config: Config) = new FileSystemCheckpointManagerConfig(config) -} +public class FileSystemCheckpointManagerConfig extends MapConfig { + /** + * Path on local file system where checkpoints should be stored. + */ + private static final String CHECKPOINT_MANAGER_ROOT = "task.checkpoint.path"; Review comment: can we make this package private and use it in test below? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] cameronlee314 commented on a change in pull request #955: SAMZA-2131: [Scala cleanup] Convert FileSystemCheckpointManagerConfig.scala and SystemConfig.scala to Java
cameronlee314 commented on a change in pull request #955: SAMZA-2131: [Scala cleanup] Convert FileSystemCheckpointManagerConfig.scala and SystemConfig.scala to Java URL: https://github.com/apache/samza/pull/955#discussion_r268000138 ## File path: samza-core/src/main/java/org/apache/samza/config/FileSystemCheckpointManagerConfig.java ## @@ -16,16 +16,22 @@ * specific language governing permissions and limitations * under the License. */ +package org.apache.samza.config; -package org.apache.samza.config +import java.util.Optional; -object FileSystemCheckpointManagerConfig { - // file system checkpoint manager config constants - val CHECKPOINT_MANAGER_ROOT = "task.checkpoint.path" // system name to use when sending offset checkpoints - implicit def Config2FSCP(config: Config) = new FileSystemCheckpointManagerConfig(config) -} +public class FileSystemCheckpointManagerConfig extends MapConfig { + /** + * Path on local file system where checkpoints should be stored. + */ + private static final String CHECKPOINT_MANAGER_ROOT = "task.checkpoint.path"; Review comment: I asked myself this question too. I did it this way since then the test can help double check a change to a config key (which is kind of a public API) and make sure it is intentional. Do you think it is still better to just use the variable in the test? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] shanthoosh merged pull request #899: SAMZA-2091: Update ZkClient in samza-standalone to use string serializer
shanthoosh merged pull request #899: SAMZA-2091: Update ZkClient in samza-standalone to use string serializer URL: https://github.com/apache/samza/pull/899 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] shanthoosh commented on issue #899: SAMZA-2091: Update ZkClient in samza-standalone to use string serializer
shanthoosh commented on issue #899: SAMZA-2091: Update ZkClient in samza-standalone to use string serializer URL: https://github.com/apache/samza/pull/899#issuecomment-475051108 @prateekm Apologies for the delay in merging this in. I was side-tracked with save-points and standalone -prod issues with on-boarding new user. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] dxichen opened a new pull request #963: SAMZA-2137: Update blog pages, remove unreleased version
dxichen opened a new pull request #963: SAMZA-2137: Update blog pages, remove unreleased version URL: https://github.com/apache/samza/pull/963 @prateekm @jagadish-v0 please take a look. Would be nice to deploy this before the meetup talking about Samza 1.0 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] asfgit closed pull request #963: SAMZA-2137: Update blog pages, remove unreleased version
asfgit closed pull request #963: SAMZA-2137: Update blog pages, remove unreleased version URL: https://github.com/apache/samza/pull/963 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] asfgit closed pull request #921: SAMZA-2105: [elasticsearch, hdfs, kafka] code cleanup and refactoring
asfgit closed pull request #921: SAMZA-2105: [elasticsearch, hdfs, kafka] code cleanup and refactoring URL: https://github.com/apache/samza/pull/921 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] srinipunuru merged pull request #909: Support for empty records
srinipunuru merged pull request #909: Support for empty records URL: https://github.com/apache/samza/pull/909 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] srinipunuru opened a new pull request #911: Support for types in Samza SQL UDF
srinipunuru opened a new pull request #911: Support for types in Samza SQL UDF URL: https://github.com/apache/samza/pull/911 This PR adds following capabilities to UDF 1. Adds typed UDFS : Previously samza sql udfs were un-typed all the params used to be Objects, Although this allowed flexibility of polymorphism without having to implement multiple methods. This was fragile. We couldn't do validations during initialization phase. This change adds types to the UDFs. 3. Decouples name of the method from udf: previously all udf methods have to be named "execute". Now the Udfs methods can be named whatever the developer prefers. The UDFMethods are identified using the annotation. 2. Adds polymorphism to UDFs : Adds capability where you can have mulitple methods for the same UDF name. i.e. multiple methods within the class can be annotated as SamzaSQLUDFMethod. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] shenodaguirguis commented on a change in pull request #911: Support for types in Samza SQL UDF
shenodaguirguis commented on a change in pull request #911: Support for types in Samza SQL UDF URL: https://github.com/apache/samza/pull/911#discussion_r254915133 ## File path: samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlExecutionContext.java ## @@ -52,16 +54,19 @@ private SamzaSqlExecutionContext(SamzaSqlExecutionContext other) { public SamzaSqlExecutionContext(SamzaSqlApplicationConfig config) { this.sqlConfig = config; -udfMetadata = - this.sqlConfig.getUdfMetadata().stream().collect(Collectors.toMap(UdfMetadata::getName, Function.identity())); +udfMetadata = new HashMap<>(); +for(UdfMetadata udf : this.sqlConfig.getUdfMetadata()) { + udfMetadata.putIfAbsent(udf.getName(), new ArrayList<>()); + udfMetadata.get(udf.getName()).add(udf); +} } public ScalarUdf getOrCreateUdf(String clazz, String udfName) { return udfInstances.computeIfAbsent(udfName, s -> createInstance(clazz, udfName)); } public ScalarUdf createInstance(String clazz, String udfName) { -Config udfConfig = udfMetadata.get(udfName).getUdfConfig(); +Config udfConfig = udfMetadata.get(udfName).get(0).getUdfConfig(); Review comment: I see. Thanks for the clarification & the comment. Don't we still need to guard code against erroneously empty list? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] srinipunuru commented on a change in pull request #911: Support for types in Samza SQL UDF
srinipunuru commented on a change in pull request #911: Support for types in Samza SQL UDF URL: https://github.com/apache/samza/pull/911#discussion_r254886952 ## File path: samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlExecutionContext.java ## @@ -38,7 +40,7 @@ * The variables that are shared among all cloned instance of {@link SamzaSqlExecutionContext} */ private final SamzaSqlApplicationConfig sqlConfig; - private final Map udfMetadata; + private final Map> udfMetadata; Review comment: Yes that is correct. I will add comments to clarify that This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] srinipunuru commented on a change in pull request #911: Support for types in Samza SQL UDF
srinipunuru commented on a change in pull request #911: Support for types in Samza SQL UDF URL: https://github.com/apache/samza/pull/911#discussion_r254888242 ## File path: samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlExecutionContext.java ## @@ -52,16 +54,19 @@ private SamzaSqlExecutionContext(SamzaSqlExecutionContext other) { public SamzaSqlExecutionContext(SamzaSqlApplicationConfig config) { this.sqlConfig = config; -udfMetadata = - this.sqlConfig.getUdfMetadata().stream().collect(Collectors.toMap(UdfMetadata::getName, Function.identity())); +udfMetadata = new HashMap<>(); +for(UdfMetadata udf : this.sqlConfig.getUdfMetadata()) { + udfMetadata.putIfAbsent(udf.getName(), new ArrayList<>()); + udfMetadata.get(udf.getName()).add(udf); +} } public ScalarUdf getOrCreateUdf(String clazz, String udfName) { return udfInstances.computeIfAbsent(udfName, s -> createInstance(clazz, udfName)); } public ScalarUdf createInstance(String clazz, String udfName) { -Config udfConfig = udfMetadata.get(udfName).getUdfConfig(); +Config udfConfig = udfMetadata.get(udfName).get(0).getUdfConfig(); Review comment: Please see @weiqingy 's comment and the corresponding answer. There can be several UDF methods within an UDF class config should be associated at the class level. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] rmatharu opened a new pull request #912: Test sideinputrefactor
rmatharu opened a new pull request #912: Test sideinputrefactor URL: https://github.com/apache/samza/pull/912 DO NOT REVIEW This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] strkkk opened a new pull request #913: SAMZA-2102: [samza-azure] code cleanup and refactoring
strkkk opened a new pull request #913: SAMZA-2102: [samza-azure] code cleanup and refactoring URL: https://github.com/apache/samza/pull/913 1. Removed deprecated junit.framework.Assert 2. Add final modifiers 3. Remove redundant things (L literal, modifiers, local variables) 4. Optional.of -> Optional.ofNullable 5. Few methods are simplified This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] strkkk opened a new pull request #914: SAMZA-2103: [samza-aws] code cleanup and refactoring
strkkk opened a new pull request #914: SAMZA-2103: [samza-aws] code cleanup and refactoring URL: https://github.com/apache/samza/pull/914 1. Redundant things removed 2. Several methods simplified 3. Condition ``if (numRecordsPerShard > 0)`` in ``TestKinesisSystemConsumer.java`` was removed because it is always true (there is same check at line 130 and return in else block in case it is false). 4. Deprecated ``onCheckpoint`` method was replaced with ``afterCheckpoint`` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] rmatharu commented on issue #876: SEP-19 : Basic Standby task/container implementation
rmatharu commented on issue #876: SEP-19 : Basic Standby task/container implementation URL: https://github.com/apache/samza/pull/876#issuecomment-461914315 Closing this in lieu of https://github.com/apache/samza/pull/912 and a follow on to that. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] rmatharu closed pull request #876: SEP-19 : Basic Standby task/container implementation
rmatharu closed pull request #876: SEP-19 : Basic Standby task/container implementation URL: https://github.com/apache/samza/pull/876 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.
shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer. URL: https://github.com/apache/samza/pull/918#discussion_r257074119 ## File path: samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java ## @@ -285,6 +321,27 @@ public void register(SystemStreamPartition systemStreamPartition, String offset) metrics.registerTopicAndPartition(toTopicAndPartition(tp)); } + @Override + public void register(SystemStreamPartition systemStreamPartition, Startpoint startpoint) { +if (started.get()) { + String exceptionMessage = String.format("KafkaSystemConsumer: %s had started. Registration of ssp: %s, startpoint: %s failed.", this, systemStreamPartition, startpoint); + throw new SamzaException(exceptionMessage); +} + +if (!Objects.equals(systemStreamPartition.getSystem(), systemName)) { + LOG.warn("{}: ignoring SSP {}, because this consumer's system doesn't match.", this, systemStreamPartition); + return; +} + +LOG.debug("Registering the ssp: {}, startpoint: {} with the consumer.", systemStreamPartition, startpoint); + +super.register(systemStreamPartition, startpoint); + +TopicPartition topicPartition = toTopicPartition(systemStreamPartition); +topicPartitionToStartpointMap.put(topicPartition, startpoint); Review comment: I discussed this with you offline before about having a startpoint comparator to choose the lowest startpoint when there're multiple startpoints defined for a SSP. Example: Broadcast streams. Task1 defines lower startpoint and Task-2 defines a higher startpoint for the SSP. It's already tracked here: SAMZA-2088. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.
shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer. URL: https://github.com/apache/samza/pull/918#discussion_r257522508 ## File path: samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java ## @@ -330,6 +387,73 @@ public String getSystemName() { return systemName; } + @VisibleForTesting + static class KafkaStartpointRegistrationHandler implements StartpointVisitor { + +private final Consumer kafkaConsumer; + +KafkaStartpointRegistrationHandler(Consumer kafkaConsumer) { + this.kafkaConsumer = kafkaConsumer; +} + +@Override +public void visit(SystemStreamPartition systemStreamPartition, StartpointSpecific startpointSpecific) { + TopicPartition topicPartition = toTopicPartition(systemStreamPartition); + long offsetInStartpoint = Long.parseLong(startpointSpecific.getSpecificOffset()); + LOG.info("Updating the consumer fetch offsets of topic partition: {} to {}.", topicPartition, offsetInStartpoint); + + // KafkaConsumer is not thread-safe. + synchronized (kafkaConsumer) { +kafkaConsumer.seek(topicPartition, offsetInStartpoint); Review comment: 1. Upper layers which call this API already log any exception that is thrown back. 2. Wrapping the exception to a SamzaException and throwing it back, IMHO doesn't add much value. What do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.
shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer. URL: https://github.com/apache/samza/pull/918#discussion_r257523191 ## File path: samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java ## @@ -285,6 +321,27 @@ public void register(SystemStreamPartition systemStreamPartition, String offset) metrics.registerTopicAndPartition(toTopicAndPartition(tp)); } + @Override + public void register(SystemStreamPartition systemStreamPartition, Startpoint startpoint) { Review comment: Yes. New introduced API: `register(ssp, startpoint)` has guarantees that is a superset of the existing `register(ssp, offset)` API in SystemConsumer. After 1.0.1, we would remove the existing register(ssp, offset) method. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.
shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer. URL: https://github.com/apache/samza/pull/918#discussion_r257523873 ## File path: samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java ## @@ -60,16 +74,22 @@ private final Config config; private final boolean fetchThresholdBytesEnabled; private final KafkaSystemConsumerMetrics metrics; + private final KafkaStartpointRegistrationHandler kafkaStartpointRegistrationHandler; // This sink is used to transfer the messages from the proxy/consumer to the BlockingEnvelopeMap. final KafkaConsumerMessageSink messageSink; // This proxy contains a separate thread, which reads kafka messages (with consumer.poll()) and populates // BlockingEnvelopMap's buffers. - final private KafkaConsumerProxy proxy; + @VisibleForTesting + KafkaConsumerProxy proxy; // keep registration data until the start - mapping between registered SSPs and topicPartitions, and their offsets final Map topicPartitionsToOffset = new HashMap<>(); Review comment: I'm aware that we'd to do this change. My initial plan was to do it after migrating all the system consumer to implement at-least the startpoint-specific visitor. Discussed offline and we agreed to do this change in this patch itself. 1. Removed `topicPartitionstoOffset` and delegate `register(ssp, offset)` API calls to start-point registration API. 2. Moved the offset comparator logic to `SystemConsumers` layer. A. Only few implementations of SystemConsumer.register API currently use systemAdmin.offsetComparator to compare offsets. However, the comparator logic is common to all implementations of SystemConsumer.register API. Moving it to the upper-layer(`SystemConsumers`) would ensure functional correctness. B. All other upper-layers except `SystemConsumers` that invoke register(ssp, offset) API already pass the lowest offset for a SSP. We don't have startpoint comparator abstraction yet. To delegate calls from register(ssp, offset) to register-startpoint API we had to get the lowest offset for a SSP from SystemConsumers layer. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.
shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer. URL: https://github.com/apache/samza/pull/918#discussion_r257524023 ## File path: samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java ## @@ -141,11 +166,14 @@ public void start() { private void startSubscription() { //subscribe to all the registered TopicPartitions -LOG.info("{}: Consumer subscribes to {}", this, topicPartitionsToSSP.keySet()); +Set registeredTopicPartitions = new HashSet<>(); +registeredTopicPartitions.addAll(topicPartitionsToSSP.keySet()); +registeredTopicPartitions.addAll(topicPartitionToStartpointMap.keySet()); Review comment: This is no longer necessary, after we'd agreed offline to make the register(ssp, offset) API to delegate to startpoint-register API. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.
shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer. URL: https://github.com/apache/samza/pull/918#discussion_r257072998 ## File path: samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java ## @@ -285,6 +321,27 @@ public void register(SystemStreamPartition systemStreamPartition, String offset) metrics.registerTopicAndPartition(toTopicAndPartition(tp)); } + @Override + public void register(SystemStreamPartition systemStreamPartition, Startpoint startpoint) { +if (started.get()) { + String exceptionMessage = String.format("KafkaSystemConsumer: %s had started. Registration of ssp: %s, startpoint: %s failed.", this, systemStreamPartition, startpoint); + throw new SamzaException(exceptionMessage); +} + +if (!Objects.equals(systemStreamPartition.getSystem(), systemName)) { + LOG.warn("{}: ignoring SSP {}, because this consumer's system doesn't match.", this, systemStreamPartition); + return; +} + +LOG.debug("Registering the ssp: {}, startpoint: {} with the consumer.", systemStreamPartition, startpoint); + +super.register(systemStreamPartition, startpoint); Review comment: It already does and is already added as a part of this PR. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.
shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer. URL: https://github.com/apache/samza/pull/918#discussion_r257523115 ## File path: samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java ## @@ -205,6 +215,156 @@ public void testFetchThresholdBytesDiabled() { consumer.stop(); } + @Test + public void testStartpointSpecificOffsetVisitorShouldUpdateTheFetchOffsetInConsumer() { +// Define dummy variables for testing. +final Integer testPartitionId = 0; +final String offset = "0"; +final TopicPartition testTopicPartition = new TopicPartition(TEST_STREAM, testPartitionId); +final Partition testPartition = new Partition(testPartitionId); +final SystemStreamPartition testSystemStreamPartition = new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, testPartition); + +final KafkaConsumer consumer = Mockito.mock(KafkaConsumer.class); +KafkaStartpointRegistrationHandler +kafkaStartpointRegistrationHandler = new KafkaSystemConsumer.KafkaStartpointRegistrationHandler(consumer); + +final StartpointSpecific testStartpointSpecific = new StartpointSpecific(offset); + +// Mock the consumer interactions. +Mockito.doNothing().when(consumer).seek(testTopicPartition, Long.valueOf(offset)); + +// Invoke the consumer with startpoint. +kafkaStartpointRegistrationHandler.visit(testSystemStreamPartition, testStartpointSpecific); + +// Mock verifications. +Mockito.verify(consumer).seek(testTopicPartition, Long.valueOf(offset)); + } + + @Test + public void testStartpointTimestampVisitorShouldUpdateTheFetchOffsetInConsumer() { +// Define dummy variables for testing. +final Integer testPartitionId = 0; +final TopicPartition testTopicPartition = new TopicPartition(TEST_STREAM, testPartitionId); +final Partition testPartition = new Partition(testPartitionId); +final SystemStreamPartition testSystemStreamPartition = new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, testPartition); +final Long testTimeStamp = 10L; +final String testOffset = "10"; + +final KafkaConsumer consumer = Mockito.mock(KafkaConsumer.class); +KafkaStartpointRegistrationHandler +kafkaStartpointRegistrationHandler = new KafkaSystemConsumer.KafkaStartpointRegistrationHandler(consumer); + +final StartpointTimestamp startpointTimestamp = new StartpointTimestamp(testTimeStamp); +final Map offsetForTimesResult = ImmutableMap.of(testTopicPartition, new OffsetAndTimestamp(Long.valueOf(testOffset), testTimeStamp)); + +// Mock the consumer interactions. + Mockito.when(consumer.offsetsForTimes(Mockito.anyMap())).thenReturn(offsetForTimesResult); +Mockito.doNothing().when(consumer).seek(testTopicPartition, Long.valueOf(testOffset)); + +kafkaStartpointRegistrationHandler.visit(testSystemStreamPartition, startpointTimestamp); + +// Mock verifications. +Mockito.verify(consumer).seek(testTopicPartition, Long.valueOf(testOffset)); +Mockito.verify(consumer).offsetsForTimes(Mockito.anyMap()); + } + + @Test + public void testStartpointOldestVisitorShouldUpdateTheFetchOffsetInConsumer() { +// Define dummy variables for testing. +final Integer testPartitionId = 0; +final TopicPartition testTopicPartition = new TopicPartition(TEST_STREAM, testPartitionId); +final Partition testPartition = new Partition(testPartitionId); +final SystemStreamPartition testSystemStreamPartition = new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, testPartition); + +final KafkaConsumer consumer = Mockito.mock(KafkaConsumer.class); +final KafkaStartpointRegistrationHandler +kafkaStartpointRegistrationHandler = new KafkaSystemConsumer.KafkaStartpointRegistrationHandler(consumer); + +final StartpointOldest testStartpointSpecific = new StartpointOldest(); + +// Mock the consumer interactions. + Mockito.doNothing().when(consumer).seekToBeginning(ImmutableList.of(testTopicPartition)); + +// Invoke the consumer with startpoint. +kafkaStartpointRegistrationHandler.visit(testSystemStreamPartition, testStartpointSpecific); + +// Mock verifications. + Mockito.verify(consumer).seekToBeginning(ImmutableList.of(testTopicPartition)); + } + + @Test + public void testStartpointUpcomingVisitorShouldUpdateTheFetchOffsetInConsumer() { +// Define dummy variables for testing. +final Integer testPartitionId = 0; +final TopicPartition testTopicPartition = new TopicPartition(TEST_STREAM, testPartitionId); +final Partition testPartition = new Partition(testPartitionId); +final SystemStreamPartition testSystemStreamPartition = new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, testPartition); + +final KafkaConsumer consumer = Mockito.mock(KafkaConsumer.class); +final KafkaSystemConsumer.KafkaStartpointRegistrationHandler +
[GitHub] shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.
shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer. URL: https://github.com/apache/samza/pull/918#discussion_r257523065 ## File path: samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java ## @@ -205,6 +215,156 @@ public void testFetchThresholdBytesDiabled() { consumer.stop(); } + @Test + public void testStartpointSpecificOffsetVisitorShouldUpdateTheFetchOffsetInConsumer() { +// Define dummy variables for testing. +final Integer testPartitionId = 0; Review comment: Done. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] atoomula commented on a change in pull request #911: Support for types in Samza SQL UDF
atoomula commented on a change in pull request #911: Support for types in Samza SQL UDF URL: https://github.com/apache/samza/pull/911#discussion_r255300421 ## File path: samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlScalarFunctionImpl.java ## @@ -61,17 +63,23 @@ public String getUdfName() { return udfName; } + public int numberArguments() { Review comment: typo: numberOfArguments This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] atoomula commented on a change in pull request #911: Support for types in Samza SQL UDF
atoomula commented on a change in pull request #911: Support for types in Samza SQL UDF URL: https://github.com/apache/samza/pull/911#discussion_r255300436 ## File path: samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlExecutionContext.java ## @@ -38,7 +40,10 @@ * The variables that are shared among all cloned instance of {@link SamzaSqlExecutionContext} */ private final SamzaSqlApplicationConfig sqlConfig; - private final Map udfMetadata; + + // Maps the UDF name to list of all UDF methods associated with the name. + // Since we support polymorphism there can multiple udfMetadata associated with the single name. Review comment: How can these different methods be invoked via Udf in a sql statement ? Is there an example or a test ? I see one with overload but can we have different method names and make it work ? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] atoomula commented on a change in pull request #911: Support for types in Samza SQL UDF
atoomula commented on a change in pull request #911: Support for types in Samza SQL UDF URL: https://github.com/apache/samza/pull/911#discussion_r255300768 ## File path: samza-sql/src/main/java/org/apache/samza/sql/fn/GetSqlFieldUdf.java ## @@ -51,22 +52,21 @@ * - sessionKey (Scalar) * */ -@SamzaSqlUdf(name = "GetSqlField") +@SamzaSqlUdf(name = "GetSqlField", description = "Get an element from complex Sql field.") Review comment: GetSqlField is kind of limiting as it returns only a String. For instance, we cannot get a sub-record or an array or any other type. Just wondering if other cases work if we return an object from execute ? Or since now we have ability to add more methods to an Udf (although I'm still not clear on how these different methods can be invoked in Udf unless they are fn overloads), is there any other way we could make it return different types ? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] atoomula commented on a change in pull request #911: Support for types in Samza SQL UDF
atoomula commented on a change in pull request #911: Support for types in Samza SQL UDF URL: https://github.com/apache/samza/pull/911#discussion_r255299964 ## File path: samza-sql/src/main/java/org/apache/samza/sql/fn/BuildOutputRecordUdf.java ## @@ -61,13 +61,13 @@ * If no args is provided, it returns an empty SamzaSqlRelRecord (with empty field names and values list). */ -@SamzaSqlUdf(name="BuildOutputRecord") +@SamzaSqlUdf(name="BuildOutputRecord" , description = "Creates an Output record.") Review comment: typo: space after name. name = "BuildOutputRecord" This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] atoomula commented on a change in pull request #911: Support for types in Samza SQL UDF
atoomula commented on a change in pull request #911: Support for types in Samza SQL UDF URL: https://github.com/apache/samza/pull/911#discussion_r255277549 ## File path: samza-api/src/main/java/org/apache/samza/sql/udfs/ScalarUdf.java ## @@ -36,5 +36,4 @@ * @param udfConfig Config specific to the udf. Review comment: ScalarUdf class comments need to be updated to exclude any comments about "execute". This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] rmatharu commented on issue #915: Consolidating offset read and write for store-offsets and side-inputs, maintaining backward compatbility
rmatharu commented on issue #915: Consolidating offset read and write for store-offsets and side-inputs, maintaining backward compatbility URL: https://github.com/apache/samza/pull/915#issuecomment-462886896 Addressed all comments. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] vjagadish1989 commented on a change in pull request #903: SEP-19: Allocator changes for standby-aware container allocation, and active container failover
vjagadish1989 commented on a change in pull request #903: SEP-19: Allocator changes for standby-aware container allocation, and active container failover URL: https://github.com/apache/samza/pull/903#discussion_r256082808 ## File path: samza-core/src/main/java/org/apache/samza/clustermanager/HostAwareContainerAllocator.java ## @@ -119,4 +153,219 @@ private void updateExpiryMetrics(SamzaResourceRequest request) { state.expiredPreferredHostRequests.incrementAndGet(); } } -} + + // Method to run a container on the given resource if it meets all standby constraints. If not, we re-request resource + // for the container (similar to the case when we re-request for a launch-fail or request expiry). + private boolean checkStandbyTaskConstraintsAndRunStreamProcessor(SamzaResourceRequest request, String preferredHost, + SamzaResource samzaResource, SamzaApplicationState state) { + +// If standby tasks are not enabled run streamprocessor and return true +if (!new JobConfig(config).getStandbyTasksEnabled()) { + runStreamProcessor(request, preferredHost); + return true; +} + +String containerID = request.getContainerID(); + +if (checkStandbyConstraints(request, samzaResource, state)) { + // This resource can be used to launch this container + log.info("Running container {} on preferred host {} meets standby constraints, launching on {}", containerID, + preferredHost, samzaResource.getHost()); + runStreamProcessor(request, preferredHost); + state.successfulStandbyAllocations.incrementAndGet(); + return true; +} else { + // This resource cannot be used to launch this container, so we treat it like a launch fail, and issue an ANY_HOST request + log.info("Running container {} on host {} does not meet standby constraints, cancelling resource request, releasing resource, and making a new ANY_HOST request", + containerID, samzaResource.getHost()); + resourceRequestState.releaseUnstartableContainer(samzaResource, preferredHost); + resourceRequestState.cancelResourceRequest(request); + requestResourceDueToLaunchFailOrExpiredRequest(containerID); + state.failedStandbyAllocations.incrementAndGet(); + return false; +} + } + + // Helper method to check if this SamzaResourceRequest for a container can be met on this resource, given standby + // container constraints, and the current set of pending and running containers + private boolean checkStandbyConstraints(SamzaResourceRequest request, SamzaResource samzaResource, + SamzaApplicationState samzaApplicationState) { +String containerIDToStart = request.getContainerID(); +String host = samzaResource.getHost(); +List containerIDsForStandbyConstraints = this.standbyContainerConstraints.get(containerIDToStart); + +// Check if any of these conflicting containers are running/launching on host +for (String containerID : containerIDsForStandbyConstraints) { + SamzaResource resource = samzaApplicationState.pendingContainers.get(containerID); + + // return false if a conflicting container is pending for launch on the host + if (resource != null && resource.getHost().equals(host)) { +log.info("Container {} cannot be started on host {} because container {} is already scheduled on this host", +containerIDToStart, samzaResource.getHost(), containerID); +return false; + } + + // return false if a conflicting container is running on the host + resource = samzaApplicationState.runningContainers.get(containerID); + if (resource != null && resource.getHost().equals(host)) { +log.info("Container {} cannot be started on host {} because container {} is already running on this host", +containerIDToStart, samzaResource.getHost(), containerID); +return false; + } +} + +return true; + } + + /** + * Intercept resource requests, which are due to either a launch-failure or resource-request expired or standby + * 1. a standby container, we proceed to make a anyhost request + * 2. an activeContainer, we try to fail-it-over to a standby + * @param containerID Identifier of the container that will be run when a resource is allocated + */ + @Override + public void requestResourceDueToLaunchFailOrExpiredRequest(String containerID) { +if (StandbyTaskUtil.isStandbyContainer(containerID)) { + log.info("Handling rerequesting for container {} using an any host request"); + super.requestResource(containerID, ResourceRequestState.ANY_HOST); // proceed with a the anyhost request +} else { + requestResource(containerID, ResourceRequestState.ANY_HOST); // invoke local method & select a new standby if possible +} + } + + // Intercept issuing of resource requests from the CPM + // 1. for ActiveContainers and instead choose a StandbyContainer to stop + // 2. for a
[GitHub] vjagadish1989 commented on a change in pull request #903: SEP-19: Allocator changes for standby-aware container allocation, and active container failover
vjagadish1989 commented on a change in pull request #903: SEP-19: Allocator changes for standby-aware container allocation, and active container failover URL: https://github.com/apache/samza/pull/903#discussion_r255777131 ## File path: samza-core/src/main/java/org/apache/samza/clustermanager/ContainerFailoverState.java ## @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.clustermanager; + +import java.util.HashMap; +import java.util.Map; + + +/** + * Encapsulates metadata concerning the failover of an active container. + */ +public class ContainerFailoverState { + public final String activeContainerID; + public final String activeContainerResourceID; + + // Map of samza-container-resource ID to host, for each standby container selected for failover + public final Map selectedStandbyContainers; Review comment: Make this class store and manage the mapping from: activeResource -> List[standbyResource] any reassignments/ changes to this mapping should update state in this class This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] vjagadish1989 commented on a change in pull request #903: SEP-19: Allocator changes for standby-aware container allocation, and active container failover
vjagadish1989 commented on a change in pull request #903: SEP-19: Allocator changes for standby-aware container allocation, and active container failover URL: https://github.com/apache/samza/pull/903#discussion_r255771228 ## File path: samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java ## @@ -141,6 +143,36 @@ */ public final AtomicInteger redundantNotifications = new AtomicInteger(0); + /** + * Number of container allocations that proceeded because they met standby container constraints. Review comment: I would pick 2 metrics that we care about the most. Often, more metrics add too much noise. for eg: not sure if the metric for standByStopsComplete is necessary This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] vjagadish1989 commented on a change in pull request #903: SEP-19: Allocator changes for standby-aware container allocation, and active container failover
vjagadish1989 commented on a change in pull request #903: SEP-19: Allocator changes for standby-aware container allocation, and active container failover URL: https://github.com/apache/samza/pull/903#discussion_r256070415 ## File path: samza-core/src/main/java/org/apache/samza/clustermanager/HostAwareContainerAllocator.java ## @@ -45,10 +53,22 @@ */ private final int requestTimeout; + // Map of activeContainerIDs to its corresponding standby containers, and standbyContainerIDs to its corresponding + // active container and other corresponding standbyContainers + private final Map> standbyContainerConstraints = new HashMap<>(); Review comment: This map is un-necessary because it appears to be conflating too many roles.. You can simplify / consolidate state in a single class with the following APIs: - List\ getStandbyContainers(activeResourceId) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] vjagadish1989 commented on a change in pull request #903: SEP-19: Allocator changes for standby-aware container allocation, and active container failover
vjagadish1989 commented on a change in pull request #903: SEP-19: Allocator changes for standby-aware container allocation, and active container failover URL: https://github.com/apache/samza/pull/903#discussion_r256081802 ## File path: samza-core/src/main/java/org/apache/samza/clustermanager/HostAwareContainerAllocator.java ## @@ -119,4 +153,219 @@ private void updateExpiryMetrics(SamzaResourceRequest request) { state.expiredPreferredHostRequests.incrementAndGet(); } } -} + + // Method to run a container on the given resource if it meets all standby constraints. If not, we re-request resource + // for the container (similar to the case when we re-request for a launch-fail or request expiry). + private boolean checkStandbyTaskConstraintsAndRunStreamProcessor(SamzaResourceRequest request, String preferredHost, + SamzaResource samzaResource, SamzaApplicationState state) { + +// If standby tasks are not enabled run streamprocessor and return true +if (!new JobConfig(config).getStandbyTasksEnabled()) { + runStreamProcessor(request, preferredHost); + return true; +} + +String containerID = request.getContainerID(); + +if (checkStandbyConstraints(request, samzaResource, state)) { + // This resource can be used to launch this container + log.info("Running container {} on preferred host {} meets standby constraints, launching on {}", containerID, + preferredHost, samzaResource.getHost()); + runStreamProcessor(request, preferredHost); + state.successfulStandbyAllocations.incrementAndGet(); + return true; +} else { + // This resource cannot be used to launch this container, so we treat it like a launch fail, and issue an ANY_HOST request + log.info("Running container {} on host {} does not meet standby constraints, cancelling resource request, releasing resource, and making a new ANY_HOST request", + containerID, samzaResource.getHost()); + resourceRequestState.releaseUnstartableContainer(samzaResource, preferredHost); + resourceRequestState.cancelResourceRequest(request); + requestResourceDueToLaunchFailOrExpiredRequest(containerID); + state.failedStandbyAllocations.incrementAndGet(); + return false; +} + } + + // Helper method to check if this SamzaResourceRequest for a container can be met on this resource, given standby + // container constraints, and the current set of pending and running containers + private boolean checkStandbyConstraints(SamzaResourceRequest request, SamzaResource samzaResource, + SamzaApplicationState samzaApplicationState) { +String containerIDToStart = request.getContainerID(); +String host = samzaResource.getHost(); +List containerIDsForStandbyConstraints = this.standbyContainerConstraints.get(containerIDToStart); + +// Check if any of these conflicting containers are running/launching on host +for (String containerID : containerIDsForStandbyConstraints) { + SamzaResource resource = samzaApplicationState.pendingContainers.get(containerID); + + // return false if a conflicting container is pending for launch on the host + if (resource != null && resource.getHost().equals(host)) { +log.info("Container {} cannot be started on host {} because container {} is already scheduled on this host", +containerIDToStart, samzaResource.getHost(), containerID); +return false; + } + + // return false if a conflicting container is running on the host + resource = samzaApplicationState.runningContainers.get(containerID); + if (resource != null && resource.getHost().equals(host)) { +log.info("Container {} cannot be started on host {} because container {} is already running on this host", +containerIDToStart, samzaResource.getHost(), containerID); +return false; + } +} + +return true; + } + + /** + * Intercept resource requests, which are due to either a launch-failure or resource-request expired or standby + * 1. a standby container, we proceed to make a anyhost request + * 2. an activeContainer, we try to fail-it-over to a standby + * @param containerID Identifier of the container that will be run when a resource is allocated + */ + @Override + public void requestResourceDueToLaunchFailOrExpiredRequest(String containerID) { Review comment: shouldn't have to override this method (or expose this method in the parent class). Instead it's cleaner to expose the most general API and special-case them when they're used This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] vjagadish1989 commented on a change in pull request #903: SEP-19: Allocator changes for standby-aware container allocation, and active container failover
vjagadish1989 commented on a change in pull request #903: SEP-19: Allocator changes for standby-aware container allocation, and active container failover URL: https://github.com/apache/samza/pull/903#discussion_r256084515 ## File path: samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java ## @@ -141,6 +143,36 @@ */ public final AtomicInteger redundantNotifications = new AtomicInteger(0); + /** + * Number of container allocations that proceeded because they met standby container constraints. + */ + public final AtomicInteger successfulStandbyAllocations = new AtomicInteger(0); + + /** + * Number of container allocations that were dis-allowed because they + * did not meet standby container constraints. + */ + public final AtomicInteger failedStandbyAllocations = new AtomicInteger(0); + + /** + * Number of active container failovers initiated in which a standby container was found. + * If two standby containers had to selected for one failing active, it counts as two. + */ + public final AtomicInteger failoversToStandby = new AtomicInteger(0); + + /** + * Number of active container failovers initiated in which a standby container was NOT found. + */ + public final AtomicInteger failoversToAnyHost = new AtomicInteger(0); + + /** + * Number of stops of standby containers that completed. + */ + public final AtomicInteger standbyStopsComplete = new AtomicInteger(0); + + // Map of active containers that are in failover, indexed by the active container's resourceID (at the time of failure) + public final ConcurrentMap failovers = new ConcurrentHashMap(0); Review comment: move state that pertains to failover to a new class. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] vjagadish1989 commented on a change in pull request #903: SEP-19: Allocator changes for standby-aware container allocation, and active container failover
vjagadish1989 commented on a change in pull request #903: SEP-19: Allocator changes for standby-aware container allocation, and active container failover URL: https://github.com/apache/samza/pull/903#discussion_r255766345 ## File path: samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java ## @@ -195,14 +195,25 @@ protected final SamzaResourceRequest peekPendingRequest() { return resourceRequestState.peekPendingRequest(); } + /** + * Method to handle re-requesting of resource for a given container, in cases where + * a prior resource request for the container, expired or the launch failed (due to a startup-fail, or + * standby constraints not being met). + * @param containerID Identifier of the container that will be run when a resource is allocated + * + */ + public void requestResourceDueToLaunchFailOrExpiredRequest(String containerID) { Review comment: remove this method since it simply delegates to requestResource(containerID, ANY_HOST).. Instead can invoke requestResource(containerID, ANY_HOST) at the call-site directly This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] mynameborat commented on a change in pull request #915: Consolidating offset read and write for store-offsets and side-inputs, maintaining backward compatbility
mynameborat commented on a change in pull request #915: Consolidating offset read and write for store-offsets and side-inputs, maintaining backward compatbility URL: https://github.com/apache/samza/pull/915#discussion_r256068006 ## File path: samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java ## @@ -129,26 +173,35 @@ public static boolean storeExists(File storeDir) { * Read and return the contents of the offset file. * * @param storagePartitionDir the base directory of the store - * @param offsetFileName name of the offset file * @return the content of the offset file if it exists for the store, null otherwise. */ - public static String readOffsetFile(File storagePartitionDir, String offsetFileName) { -String offset = null; -File offsetFileRef = new File(storagePartitionDir, offsetFileName); + public static Map readOffsetFile(File storagePartitionDir, Set storeSSPs) { +Map offsets = null; +String fileContents; +File offsetFileRef = new File(storagePartitionDir, OFFSET_FILE_NAME); String storePath = storagePartitionDir.getPath(); if (offsetFileRef.exists()) { LOG.info("Found offset file in storage partition directory: {}", storePath); try { -offset = FileUtil.readWithChecksum(offsetFileRef); +fileContents = FileUtil.readWithChecksum(offsetFileRef); +try { + offsets = OBJECT_MAPPER.readValue(fileContents, OFFSETS_TYPE_REFERENCE); +} catch (JsonParseException | JsonMappingException e) { + LOG.info("Exception in json-parsing offset file {} {}, reading as string offset-value", storagePartitionDir.toPath(), OFFSET_FILE_NAME); + offsets = (storeSSPs.size() == 1) ? storeSSPs.stream().collect(Collectors.toMap(x -> x, y -> fileContents)) : null; +} catch (IOException e) { + LOG.info("Exception in json-parsing offset file {} {}, reading as string offset-value", storagePartitionDir.toPath(), OFFSET_FILE_NAME); + offsets = (storeSSPs.size() == 1) ? storeSSPs.stream().collect(Collectors.toMap(x -> x, y -> fileContents)) : null; Review comment: If the contents are not proper json, wouldn't we get JsonParseException? I was referring to IOException block since that would mean either we couldn't read the file successfully or the file was not found. FileNotFoundException extends IOException. It is sufficient to have only two catch blocks one for parse exception and another to catch all other failure scenarios. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] mynameborat commented on a change in pull request #915: Consolidating offset read and write for store-offsets and side-inputs, maintaining backward compatbility
mynameborat commented on a change in pull request #915: Consolidating offset read and write for store-offsets and side-inputs, maintaining backward compatbility URL: https://github.com/apache/samza/pull/915#discussion_r256068006 ## File path: samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java ## @@ -129,26 +173,35 @@ public static boolean storeExists(File storeDir) { * Read and return the contents of the offset file. * * @param storagePartitionDir the base directory of the store - * @param offsetFileName name of the offset file * @return the content of the offset file if it exists for the store, null otherwise. */ - public static String readOffsetFile(File storagePartitionDir, String offsetFileName) { -String offset = null; -File offsetFileRef = new File(storagePartitionDir, offsetFileName); + public static Map readOffsetFile(File storagePartitionDir, Set storeSSPs) { +Map offsets = null; +String fileContents; +File offsetFileRef = new File(storagePartitionDir, OFFSET_FILE_NAME); String storePath = storagePartitionDir.getPath(); if (offsetFileRef.exists()) { LOG.info("Found offset file in storage partition directory: {}", storePath); try { -offset = FileUtil.readWithChecksum(offsetFileRef); +fileContents = FileUtil.readWithChecksum(offsetFileRef); +try { + offsets = OBJECT_MAPPER.readValue(fileContents, OFFSETS_TYPE_REFERENCE); +} catch (JsonParseException | JsonMappingException e) { + LOG.info("Exception in json-parsing offset file {} {}, reading as string offset-value", storagePartitionDir.toPath(), OFFSET_FILE_NAME); + offsets = (storeSSPs.size() == 1) ? storeSSPs.stream().collect(Collectors.toMap(x -> x, y -> fileContents)) : null; +} catch (IOException e) { + LOG.info("Exception in json-parsing offset file {} {}, reading as string offset-value", storagePartitionDir.toPath(), OFFSET_FILE_NAME); + offsets = (storeSSPs.size() == 1) ? storeSSPs.stream().collect(Collectors.toMap(x -> x, y -> fileContents)) : null; Review comment: If the contents are not proper json, wouldn't we get JsonParseException? I was referring to IOException block since that would mean either we couldn't read the file successfully or the file was not found. FileNotFoundException extends IOException. It is sufficient to have only two catch blocks one for json parse failures and another to catch all other failure scenarios. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] rmatharu commented on a change in pull request #915: Consolidating offset read and write for store-offsets and side-inputs, maintaining backward compatbility
rmatharu commented on a change in pull request #915: Consolidating offset read and write for store-offsets and side-inputs, maintaining backward compatbility URL: https://github.com/apache/samza/pull/915#discussion_r256096861 ## File path: samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java ## @@ -129,26 +173,35 @@ public static boolean storeExists(File storeDir) { * Read and return the contents of the offset file. * * @param storagePartitionDir the base directory of the store - * @param offsetFileName name of the offset file * @return the content of the offset file if it exists for the store, null otherwise. */ - public static String readOffsetFile(File storagePartitionDir, String offsetFileName) { -String offset = null; -File offsetFileRef = new File(storagePartitionDir, offsetFileName); + public static Map readOffsetFile(File storagePartitionDir, Set storeSSPs) { +Map offsets = null; +String fileContents; +File offsetFileRef = new File(storagePartitionDir, OFFSET_FILE_NAME); String storePath = storagePartitionDir.getPath(); if (offsetFileRef.exists()) { LOG.info("Found offset file in storage partition directory: {}", storePath); try { -offset = FileUtil.readWithChecksum(offsetFileRef); +fileContents = FileUtil.readWithChecksum(offsetFileRef); +try { + offsets = OBJECT_MAPPER.readValue(fileContents, OFFSETS_TYPE_REFERENCE); +} catch (JsonParseException | JsonMappingException e) { + LOG.info("Exception in json-parsing offset file {} {}, reading as string offset-value", storagePartitionDir.toPath(), OFFSET_FILE_NAME); + offsets = (storeSSPs.size() == 1) ? storeSSPs.stream().collect(Collectors.toMap(x -> x, y -> fileContents)) : null; +} catch (IOException e) { + LOG.info("Exception in json-parsing offset file {} {}, reading as string offset-value", storagePartitionDir.toPath(), OFFSET_FILE_NAME); + offsets = (storeSSPs.size() == 1) ? storeSSPs.stream().collect(Collectors.toMap(x -> x, y -> fileContents)) : null; Review comment: done This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] vjagadish1989 commented on a change in pull request #903: SEP-19: Allocator changes for standby-aware container allocation, and active container failover
vjagadish1989 commented on a change in pull request #903: SEP-19: Allocator changes for standby-aware container allocation, and active container failover URL: https://github.com/apache/samza/pull/903#discussion_r256072645 ## File path: samza-core/src/main/java/org/apache/samza/clustermanager/HostAwareContainerAllocator.java ## @@ -81,12 +102,24 @@ public void assignResourceRequests() { if (expired) { updateExpiryMetrics(request); if (resourceAvailableOnAnyHost) { -log.info("Request for container: {} on {} has expired. Running on ANY_HOST", request.getContainerID(), request.getPreferredHost()); -runStreamProcessor(request, ResourceRequestState.ANY_HOST); +// if standby is not enabled, request a anyhost request +if (!new JobConfig(config).getStandbyTasksEnabled()) { + log.info("Request for container: {} on {} has expired. Running on ANY_HOST", request.getContainerID(), + request.getPreferredHost()); + runStreamProcessor(request, ResourceRequestState.ANY_HOST); +} else if (StandbyTaskUtil.isStandbyContainer(containerID)) { + // only standby resources can be on anyhost rightaway + checkStandbyTaskConstraintsAndRunStreamProcessor(request, ResourceRequestState.ANY_HOST, + peekAllocatedResource(ResourceRequestState.ANY_HOST), state); +} else { + // re-requesting resource for active container Review comment: this looks a little bit complex. Is this else block needed - esp., when we are doing a similar action in L:120-122 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] vjagadish1989 commented on a change in pull request #903: SEP-19: Allocator changes for standby-aware container allocation, and active container failover
vjagadish1989 commented on a change in pull request #903: SEP-19: Allocator changes for standby-aware container allocation, and active container failover URL: https://github.com/apache/samza/pull/903#discussion_r255769800 ## File path: samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java ## @@ -78,9 +79,10 @@ public final AtomicInteger releasedContainers = new AtomicInteger(0); /** - * ContainerStatus of failed containers. + * ContainerStatuses of failed containers indexed by samzaContainerId. + * Written by the AMRMCallbackThread, read by the ContainerAllocator thread when performing active-standby container failover. */ - public final ConcurrentMap failedContainersStatus = new ConcurrentHashMap(); + public final ConcurrentMap> failedContainersStatus = new ConcurrentHashMap>(); Review comment: Why do you need this - ConcurrentMap>.. It seems like the usage is to determine the physical-id corresponding to a given logical-id. If so, the existing data-structures should be sufficient? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] vjagadish1989 commented on a change in pull request #903: SEP-19: Allocator changes for standby-aware container allocation, and active container failover
vjagadish1989 commented on a change in pull request #903: SEP-19: Allocator changes for standby-aware container allocation, and active container failover URL: https://github.com/apache/samza/pull/903#discussion_r256083123 ## File path: samza-core/src/main/java/org/apache/samza/clustermanager/HostAwareContainerAllocator.java ## @@ -119,4 +153,219 @@ private void updateExpiryMetrics(SamzaResourceRequest request) { state.expiredPreferredHostRequests.incrementAndGet(); } } -} + + // Method to run a container on the given resource if it meets all standby constraints. If not, we re-request resource + // for the container (similar to the case when we re-request for a launch-fail or request expiry). + private boolean checkStandbyTaskConstraintsAndRunStreamProcessor(SamzaResourceRequest request, String preferredHost, + SamzaResource samzaResource, SamzaApplicationState state) { + +// If standby tasks are not enabled run streamprocessor and return true +if (!new JobConfig(config).getStandbyTasksEnabled()) { + runStreamProcessor(request, preferredHost); + return true; +} + +String containerID = request.getContainerID(); + +if (checkStandbyConstraints(request, samzaResource, state)) { + // This resource can be used to launch this container + log.info("Running container {} on preferred host {} meets standby constraints, launching on {}", containerID, + preferredHost, samzaResource.getHost()); + runStreamProcessor(request, preferredHost); + state.successfulStandbyAllocations.incrementAndGet(); + return true; +} else { + // This resource cannot be used to launch this container, so we treat it like a launch fail, and issue an ANY_HOST request + log.info("Running container {} on host {} does not meet standby constraints, cancelling resource request, releasing resource, and making a new ANY_HOST request", + containerID, samzaResource.getHost()); + resourceRequestState.releaseUnstartableContainer(samzaResource, preferredHost); + resourceRequestState.cancelResourceRequest(request); + requestResourceDueToLaunchFailOrExpiredRequest(containerID); + state.failedStandbyAllocations.incrementAndGet(); + return false; +} + } + + // Helper method to check if this SamzaResourceRequest for a container can be met on this resource, given standby + // container constraints, and the current set of pending and running containers + private boolean checkStandbyConstraints(SamzaResourceRequest request, SamzaResource samzaResource, + SamzaApplicationState samzaApplicationState) { +String containerIDToStart = request.getContainerID(); +String host = samzaResource.getHost(); +List containerIDsForStandbyConstraints = this.standbyContainerConstraints.get(containerIDToStart); + +// Check if any of these conflicting containers are running/launching on host +for (String containerID : containerIDsForStandbyConstraints) { + SamzaResource resource = samzaApplicationState.pendingContainers.get(containerID); + + // return false if a conflicting container is pending for launch on the host + if (resource != null && resource.getHost().equals(host)) { +log.info("Container {} cannot be started on host {} because container {} is already scheduled on this host", +containerIDToStart, samzaResource.getHost(), containerID); +return false; + } + + // return false if a conflicting container is running on the host + resource = samzaApplicationState.runningContainers.get(containerID); + if (resource != null && resource.getHost().equals(host)) { +log.info("Container {} cannot be started on host {} because container {} is already running on this host", +containerIDToStart, samzaResource.getHost(), containerID); +return false; + } +} + +return true; + } + + /** + * Intercept resource requests, which are due to either a launch-failure or resource-request expired or standby + * 1. a standby container, we proceed to make a anyhost request + * 2. an activeContainer, we try to fail-it-over to a standby + * @param containerID Identifier of the container that will be run when a resource is allocated + */ + @Override + public void requestResourceDueToLaunchFailOrExpiredRequest(String containerID) { +if (StandbyTaskUtil.isStandbyContainer(containerID)) { + log.info("Handling rerequesting for container {} using an any host request"); + super.requestResource(containerID, ResourceRequestState.ANY_HOST); // proceed with a the anyhost request +} else { + requestResource(containerID, ResourceRequestState.ANY_HOST); // invoke local method & select a new standby if possible +} + } + + // Intercept issuing of resource requests from the CPM + // 1. for ActiveContainers and instead choose a StandbyContainer to stop + // 2. for a
[GitHub] vjagadish1989 commented on a change in pull request #903: SEP-19: Allocator changes for standby-aware container allocation, and active container failover
vjagadish1989 commented on a change in pull request #903: SEP-19: Allocator changes for standby-aware container allocation, and active container failover URL: https://github.com/apache/samza/pull/903#discussion_r254810012 ## File path: samza-core/src/main/java/org/apache/samza/clustermanager/ClusterResourceManager.java ## @@ -122,6 +122,10 @@ public ClusterResourceManager(Callback callback) { public abstract void launchStreamProcessor(SamzaResource resource, CommandBuilder builder); + // Requests the stopping of a StreamProcessor, identified by the given resource + public abstract void stopStreamProcessor(SamzaResource resource); + + public abstract void stop(SamzaApplicationState.SamzaAppStatus status); Review comment: tie this interface with the onProcessorComplete() callback in the docs This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] shanthoosh merged pull request #874: SAMZA-2058: Integrate the input stream expansion aware SystemStreamGrouper to JobModel generation flow.
shanthoosh merged pull request #874: SAMZA-2058: Integrate the input stream expansion aware SystemStreamGrouper to JobModel generation flow. URL: https://github.com/apache/samza/pull/874 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] rmatharu commented on a change in pull request #915: Consolidating offset read and write for store-offsets and side-inputs, maintaining backward compatbility
rmatharu commented on a change in pull request #915: Consolidating offset read and write for store-offsets and side-inputs, maintaining backward compatbility URL: https://github.com/apache/samza/pull/915#discussion_r256213346 ## File path: samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java ## @@ -562,9 +566,14 @@ private boolean isLoggedStoreValid(String storeName, File loggedStoreDir) { (long) new StorageConfig(config).getChangeLogDeleteRetentionsInMs().get(storeName).get(); } + // if the store has no changelogSSP, simply use null + SystemStreamPartition changelogSSP = null; Review comment: done This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services