Re: Question about custom StreamJob/Factory
As promised, here's the link to the repository: https://github.com/sonian/samza-kubernetes The section "Your Job Image" covers my remaining questions on the low-level API. We use Clojure on the backend, so I'm using that to sanity-check the example high-level app and will update the example if it turns out I made any goofs! After looking at more code, I believe I better understand how the high-level API functions: it basically makes StreamTask-equivalent objects for every operator (map, etc.) which eventually get run by a JCL (via Container) created by StreamProcessor which execute them in a single-thread pool. There doesn't seem to be an `AsyncStreamTask` equivalent for these operators, though. Although `LocalApplicationRunner#createStreamProcessor` has the ability to handle `AsyncTaskFactory`, `TaskFactoryUtil#createTaskFactory` only returns `StreamTaskFactory` when passed a `StreamApplication`. The crux being: I don't see a great story for operators that need to, e.g., make database calls for each message or perform another blocking operation. Any clarification on these two topics would be much appreciated! Thanks, Tom Jagadish Venkatramanwrites: +Yi Hi Tom, Thank you for your feedback on Samza's architecture. Pluggability has been a differentiator that has enabled us to support a wide range of use-cases - from stand-alone deployments to managed services, from streaming to batch inputs and integrations with various systems from Kafka, Kinesis, Azure to ElasticSearch. Thanks for your ideas on integrating Samza and Kubernetes. Let me formalize your intuition a bit more. The following four aspects are key to running Samza with any environment. 1. Liveness detection/monitoring: This provides a means for discovering the currently available processors in the group and discovering when a processor is no longer running. The different JC implementations we have rely on Zk, Yarn or AzureBlobStore for liveness detection. 2. Partition-assignment/coordination: Once there is agreement on the available processors, this is just a matter of computing assignments. Usually, (1) and (2) will require you to identify each processor and to agree on the available processors in the group. For example, when the ClusterBasedJC starts a container, it is assigned a durable ID. 3. Resource management: This focusses on whether you want your containers to be managed / started by Samza itself or have something external to Samza that starts it. While the former allows you to run a managed service, the latter allows for more flexibility in your deployment environments. We use both models heavily at LinkedIn. As an example, the ClusterBasedJC requests resources from YARN and starts the containers itself. The ZkBasedJC assumes a more general deployment model and allows containers to be started externally and relies on Samza only for (1) and (2). 4. Auto-scaling: Here again, you can build auto-scaling right into Samza if there's support for resource management or do it externally. Having said this, you can implement this integration with Kubernetes at multiple-levels depending on how we choose to tackle the above aspects. ">> My intuition is that I need a JobCoordinator/Factory in the form of a server that sets up Watches on the appropriate Kubernetes resources so that when I perform the action in [4.1] *something* happens. " This alternative does seem more complex. Hence, I would not go down this path as the first-step. For a start, I would lean on the side of simplicity and recommend the following solution: - Configure your Samza job to leverage the existing ZkBasedJC. - Start multiple instances of your job by running the *run-app.sh* script. I believe Kubernetes has good support for this as well. - Configure Kubernetes to auto-scale your instances on-demand depending on load. - As new instances join and leave, Samza will automatically re-distribute partitions among them. Additionally, we would be thrilled if you could contribute your learnings back to the community - in the form of a blog-post / documentation to Samza itself on running with Kubernetes. Please let us know should you need any further help. Here's an example to get you started: https://github.com/apache/samza-hello-samza/tree/master/src/main/java/samza/examples/wikipedia/application Best, Jagdish On Sat, Jan 27, 2018 at 8:54 AM, Tom Davis wrote: Hi there! First off, thanks for the continued work on Samza -- I looked into many DC/stream processors and Samza was a real standout with its smart architecture and pluggable design. I'm working on a custom StreamJob/Factory for running Samza jobs on Kubernetes. Those two classes are pretty straight forward: I create a Deployment in Kubernetes with the appropriate number of Pods (a number <= the number of Kafka partitions I created the input topic with). Now I'm moving onto what executes in the actual Docker containers and I'm a bit confused. My
Re: Question about custom StreamJob/Factory
Thanks for the timely and thorough reply! Based on your explanation, it sounds like when using the high-level API I don't need to go through the JobRunner or `run-job.sh` at all -- is that correct? I can simply run as many instances of, e.g., `WikipediaZkLocalApplication` as I want and Samza will take care of assigning partitions, responding to changes via ZK, etc.? As for contributing back, I absolutely plan to do so! I will probably end up doing a series of blog posts as this is (management-willing) the start of a large undertaking to fix a lot of thorny problems we have with our backend apps and unify on Samza for batch/stream operations. I am wrapping up a small repo now that has: 1. A KubernetesJob/Factory implementation that can be used with JobRunner to launch an appropriately-sized stateful deployment 2. An example gradle-based app that generates a Docker image capable of executing the packaged StreamApplication (plus example k8s resources) On the topic of (2), I fear I mostly just rehashed the application part of hello-samza with different Gradle plugins, but as a learning exercise it has been instructive. My initial questions were in regards to (1) but after reading your explanation I'm not clear on the way forward there (and it's certainly more complex than the StreamApplication approach). My thought right now is: 1. Create an image that executes a derived `LocalContainerRunner` 2. Ensure both CONTAINER_ID and COORDINATOR_URL are set; the former to the stable ID generated by Kubernetes (e.g. "app-0", "app-1", etc.) and the latter to, e.g. "file:///serialized/job/model.json" However, this would require writing a LocalContainerRunner that didn't assume it was running at the pleasure of a YARN/Mesos-style resource negotiator (it can't exactly heartbeat with a JSON file, etc.) It seems that, overall, the lower-level Task API is more tied to the notion of resource management than the higher-level Application API. Is it fair the say the Application is effectively a single Task in that it runs in a single thread, reading messages from one or more input streams, and writing messages to zero or more output streams? And that I should compose more complex topologies by running multiple applications in my cluster? Okay, enough questions for now! I hope to publish the repository tomorrow and would love to get some more experienced eyes on it to learn all the ways I screwed up. I'll post to this thread again with a link. Thanks, Tom Jagadish Venkatramanwrites: +Yi Hi Tom, Thank you for your feedback on Samza's architecture. Pluggability has been a differentiator that has enabled us to support a wide range of use-cases - from stand-alone deployments to managed services, from streaming to batch inputs and integrations with various systems from Kafka, Kinesis, Azure to ElasticSearch. Thanks for your ideas on integrating Samza and Kubernetes. Let me formalize your intuition a bit more. The following four aspects are key to running Samza with any environment. 1. Liveness detection/monitoring: This provides a means for discovering the currently available processors in the group and discovering when a processor is no longer running. The different JC implementations we have rely on Zk, Yarn or AzureBlobStore for liveness detection. 2. Partition-assignment/coordination: Once there is agreement on the available processors, this is just a matter of computing assignments. Usually, (1) and (2) will require you to identify each processor and to agree on the available processors in the group. For example, when the ClusterBasedJC starts a container, it is assigned a durable ID. 3. Resource management: This focusses on whether you want your containers to be managed / started by Samza itself or have something external to Samza that starts it. While the former allows you to run a managed service, the latter allows for more flexibility in your deployment environments. We use both models heavily at LinkedIn. As an example, the ClusterBasedJC requests resources from YARN and starts the containers itself. The ZkBasedJC assumes a more general deployment model and allows containers to be started externally and relies on Samza only for (1) and (2). 4. Auto-scaling: Here again, you can build auto-scaling right into Samza if there's support for resource management or do it externally. Having said this, you can implement this integration with Kubernetes at multiple-levels depending on how we choose to tackle the above aspects. ">> My intuition is that I need a JobCoordinator/Factory in the form of a server that sets up Watches on the appropriate Kubernetes resources so that when I perform the action in [4.1] *something* happens. " This alternative does seem more complex. Hence, I would not go down this path as the first-step. For a start, I would lean on the side of simplicity and recommend the following solution: - Configure your Samza job to leverage the existing
Re: Question about custom StreamJob/Factory
+Yi Hi Tom, Thank you for your feedback on Samza's architecture. Pluggability has been a differentiator that has enabled us to support a wide range of use-cases - from stand-alone deployments to managed services, from streaming to batch inputs and integrations with various systems from Kafka, Kinesis, Azure to ElasticSearch. Thanks for your ideas on integrating Samza and Kubernetes. Let me formalize your intuition a bit more. The following four aspects are key to running Samza with any environment. 1. Liveness detection/monitoring: This provides a means for discovering the currently available processors in the group and discovering when a processor is no longer running. The different JC implementations we have rely on Zk, Yarn or AzureBlobStore for liveness detection. 2. Partition-assignment/coordination: Once there is agreement on the available processors, this is just a matter of computing assignments. Usually, (1) and (2) will require you to identify each processor and to agree on the available processors in the group. For example, when the ClusterBasedJC starts a container, it is assigned a durable ID. 3. Resource management: This focusses on whether you want your containers to be managed / started by Samza itself or have something external to Samza that starts it. While the former allows you to run a managed service, the latter allows for more flexibility in your deployment environments. We use both models heavily at LinkedIn. As an example, the ClusterBasedJC requests resources from YARN and starts the containers itself. The ZkBasedJC assumes a more general deployment model and allows containers to be started externally and relies on Samza only for (1) and (2). 4. Auto-scaling: Here again, you can build auto-scaling right into Samza if there's support for resource management or do it externally. Having said this, you can implement this integration with Kubernetes at multiple-levels depending on how we choose to tackle the above aspects. ">> My intuition is that I need a JobCoordinator/Factory in the form of a server that sets up Watches on the appropriate Kubernetes resources so that when I perform the action in [4.1] *something* happens. " This alternative does seem more complex. Hence, I would not go down this path as the first-step. For a start, I would lean on the side of simplicity and recommend the following solution: - Configure your Samza job to leverage the existing ZkBasedJC. - Start multiple instances of your job by running the *run-app.sh* script. I believe Kubernetes has good support for this as well. - Configure Kubernetes to auto-scale your instances on-demand depending on load. - As new instances join and leave, Samza will automatically re-distribute partitions among them. Additionally, we would be thrilled if you could contribute your learnings back to the community - in the form of a blog-post / documentation to Samza itself on running with Kubernetes. Please let us know should you need any further help. Here's an example to get you started: https://github.com/apache/samza-hello-samza/tree/master/src/main/java/samza/examples/wikipedia/application Best, Jagdish On Sat, Jan 27, 2018 at 8:54 AM, Tom Daviswrote: > Hi there! First off, thanks for the continued work on Samza -- I looked > into many DC/stream processors and Samza was a real standout with its > smart architecture and pluggable design. > > I'm working on a custom StreamJob/Factory for running Samza jobs on > Kubernetes. Those two classes are pretty straight forward: I create a > Deployment in Kubernetes with the appropriate number of Pods (a number > <= the number of Kafka partitions I created the input topic with). Now > I'm moving onto what executes in the actual Docker containers and I'm a > bit confused. > > My plan was to mirror as much as possible what the YarnJob does > which is setup an environment that will work with `run-jc.sh`. However, > I don't need ClusterBasedJobCoordinator because Kubernetes is not an > offer-based resource negotiator; if the JobCoordinator is running it > means, by definition, it received the appropriate resources. So a > PassThroughJobCoordinator with appropriate main() method seemed like the > ticket. Unfortunately, the PTJC doesn't actually seem to *do* anything > -- unlike the CBJC which has a run-loop and presumably schedules > containers and the like. > > I saw the preview documentation on flexible deployment, but it didn't > totally click for me. Perhaps because it was also my first introduction > to the high-level API? (I've just been writing StreamTask impls) > > Here's a brief description of the workflow I'm envisioning, perhaps > someone could tell me the classes I should implement and what sorts of > containers I might need running in the environment to coordinate > everything? > > 1. I create a topic in Kafka with N partitions > 2. I start a job configured to run N-X containers > 2.1. If my topic has 4 partitions and I have low