Re: MessageStream with multiple concurrent operation stacks

2019-10-22 Thread Eric Shieh
Thank you Bharath and Boris, I really appreciate your response and guidance!

Regards,

Eric

On Tue, Oct 22, 2019 at 6:00 PM Boris S  wrote:

> Yes, to answer your second question , you should be able to fork the
> streams. Message Streams are idempotent and can be used in different
> pipelines.
>
> With regards to group id, Samza does set the group ids, but, I think, they
> are the same for the whole job. The idea behind it is, that Samza is not
> using Kafka level partitioning. Samza itself distributes the partitions
> directly to its tasks.
>
> Boris.
>
> On Tue, Oct 22, 2019 at 11:55 AM Bharath Kumara Subramanian <
> codin.mart...@gmail.com> wrote:
>
> > Hi Eric,
> >
> > Based on the source code, it appears that each job designates a unique
> > > group id when subscribing to kafka topic, is my understanding correct?
> > >
> >
> > Yes. Samza uses a combination of job name and job id to generate the
> group
> > id.
> >
> >
> > > is it possible to have 2 independent stack
> >
> > of operations applied on the same InputStream?
> >
> >
> > Yes. The code snippet provided in the email should work as expected.
> >
> > Hope that helps.
> >
> > Thanks,
> > Bharath
> >
> > On Fri, Oct 18, 2019 at 5:55 AM Eric Shieh  wrote:
> >
> > > Hi,
> > >
> > > Based on the source code, it appears that each job designates a unique
> > > group id when subscribing to kafka topic, is my understanding correct?
> > If
> > > so, since one cannot call appDescriptor.getInputStream with the
> > > same KafkaInputDescriptor twice, is it possible to have 2 independent
> > stack
> > > of operations applied on the same InputStream?  In essence, I have a
> > > requirement to process a message from 1 InputStream and write to 2
> > > OutputStreams or sinks after 2 different independent stacks of
> operations
> > > applied.  One way to solve this is to deploy 2 independent jobs but the
> > > downside of it is it would be difficult to synchronize the 2 jobs.  Is
> it
> > > possible to do the following:
> > >
> > > MessageStream ms =
> appDescriptor.getInputStream(kid);
> > > MessageStream msForkPoint = ms.map(mapping_logic1);
> > > msForkPoint.filter(filter_logic_1).sendTo(outputSream1);
> > > msForkPoint.map(mapping_logic2).sink(write_to_DB);
> > >
> > > Based on the source code, each operation instantiates a new instance of
> > > MessageStream and registers the new StreamOperatorSpec with the
> previous
> > > MessageStream instance's StreamOperatorSpec essentially forming a
> "linked
> > > list" of parent-child StreamOperatorSpecs.  Since each parent
> > OperatorSpec
> > > maintains a LinkedHashSet of next OperatorSpecs, the above code of
> > forking
> > > 2 independent operation stacks after the initial map seems to be
> > feasible.
> > >
> > > Regards,
> > >
> > > Eric
> > >
> >
>


Re: MessageStream with multiple concurrent operation stacks

2019-10-22 Thread Boris S
Yes, to answer your second question , you should be able to fork the
streams. Message Streams are idempotent and can be used in different
pipelines.

With regards to group id, Samza does set the group ids, but, I think, they
are the same for the whole job. The idea behind it is, that Samza is not
using Kafka level partitioning. Samza itself distributes the partitions
directly to its tasks.

Boris.

On Tue, Oct 22, 2019 at 11:55 AM Bharath Kumara Subramanian <
codin.mart...@gmail.com> wrote:

> Hi Eric,
>
> Based on the source code, it appears that each job designates a unique
> > group id when subscribing to kafka topic, is my understanding correct?
> >
>
> Yes. Samza uses a combination of job name and job id to generate the group
> id.
>
>
> > is it possible to have 2 independent stack
>
> of operations applied on the same InputStream?
>
>
> Yes. The code snippet provided in the email should work as expected.
>
> Hope that helps.
>
> Thanks,
> Bharath
>
> On Fri, Oct 18, 2019 at 5:55 AM Eric Shieh  wrote:
>
> > Hi,
> >
> > Based on the source code, it appears that each job designates a unique
> > group id when subscribing to kafka topic, is my understanding correct?
> If
> > so, since one cannot call appDescriptor.getInputStream with the
> > same KafkaInputDescriptor twice, is it possible to have 2 independent
> stack
> > of operations applied on the same InputStream?  In essence, I have a
> > requirement to process a message from 1 InputStream and write to 2
> > OutputStreams or sinks after 2 different independent stacks of operations
> > applied.  One way to solve this is to deploy 2 independent jobs but the
> > downside of it is it would be difficult to synchronize the 2 jobs.  Is it
> > possible to do the following:
> >
> > MessageStream ms = appDescriptor.getInputStream(kid);
> > MessageStream msForkPoint = ms.map(mapping_logic1);
> > msForkPoint.filter(filter_logic_1).sendTo(outputSream1);
> > msForkPoint.map(mapping_logic2).sink(write_to_DB);
> >
> > Based on the source code, each operation instantiates a new instance of
> > MessageStream and registers the new StreamOperatorSpec with the previous
> > MessageStream instance's StreamOperatorSpec essentially forming a "linked
> > list" of parent-child StreamOperatorSpecs.  Since each parent
> OperatorSpec
> > maintains a LinkedHashSet of next OperatorSpecs, the above code of
> forking
> > 2 independent operation stacks after the initial map seems to be
> feasible.
> >
> > Regards,
> >
> > Eric
> >
>


Re: MessageStream with multiple concurrent operation stacks

2019-10-22 Thread Bharath Kumara Subramanian
Hi Eric,

Based on the source code, it appears that each job designates a unique
> group id when subscribing to kafka topic, is my understanding correct?
>

Yes. Samza uses a combination of job name and job id to generate the group
id.


> is it possible to have 2 independent stack

of operations applied on the same InputStream?


Yes. The code snippet provided in the email should work as expected.

Hope that helps.

Thanks,
Bharath

On Fri, Oct 18, 2019 at 5:55 AM Eric Shieh  wrote:

> Hi,
>
> Based on the source code, it appears that each job designates a unique
> group id when subscribing to kafka topic, is my understanding correct?  If
> so, since one cannot call appDescriptor.getInputStream with the
> same KafkaInputDescriptor twice, is it possible to have 2 independent stack
> of operations applied on the same InputStream?  In essence, I have a
> requirement to process a message from 1 InputStream and write to 2
> OutputStreams or sinks after 2 different independent stacks of operations
> applied.  One way to solve this is to deploy 2 independent jobs but the
> downside of it is it would be difficult to synchronize the 2 jobs.  Is it
> possible to do the following:
>
> MessageStream ms = appDescriptor.getInputStream(kid);
> MessageStream msForkPoint = ms.map(mapping_logic1);
> msForkPoint.filter(filter_logic_1).sendTo(outputSream1);
> msForkPoint.map(mapping_logic2).sink(write_to_DB);
>
> Based on the source code, each operation instantiates a new instance of
> MessageStream and registers the new StreamOperatorSpec with the previous
> MessageStream instance's StreamOperatorSpec essentially forming a "linked
> list" of parent-child StreamOperatorSpecs.  Since each parent  OperatorSpec
> maintains a LinkedHashSet of next OperatorSpecs, the above code of forking
> 2 independent operation stacks after the initial map seems to be feasible.
>
> Regards,
>
> Eric
>


Re: Samza App Deployment Topology - Best Practices

2019-10-22 Thread Eric Shieh
Hi Bharath,

Thank you very much, this is extremely helpful!  I had not originally
contemplated using beam but will definitely look into it following your
recommendation.

Based on your response and upon further reading of the source code, I
realized my original understanding of the samza-elasticsearch
system/connector and the overall Samza "system/connector" was incorrect.
Elasticsearch system/connector, unlike Kafka or HDFS systems, does not have
input nor output descriptors; furthermore since it maps stream name to
Elasticsearch index and doc_type name, I incorrectly assumed that Samza
internally used Kafka topics as the sources to Elasticsearch
"system/connector" so that writes to Elasticsearch can be buffered through
Kafka.  I now understand the Elasticsearch "system/connector" is simply a
lower-level API that can be called by the jobs as a message stream sink
which means I need to implement a generic job to consume 1 or more Kafka
topics and use the API to write to Elasticsearch.

Regards,

Eric

On Tue, Oct 22, 2019 at 9:31 AM Bharath Kumara Subramanian <
codin.mart...@gmail.com> wrote:

> Hi Eric,
>
> Thanks for additional clarification. I now have a better understanding of
> your use case.
> Your use case be accomplished using various approaches.
>
>
>1. One Samza application that consumes all of your input streams (A and
>D) and also your intermediate output streams (B) and routes it to HDFS
> or
>ElasticSearch. With this approach,
>   1. You may have to scale up or scale down your entire job to react to
>   changes to either of your inputs (A, D & B as well). It might
> potentially
>   result in under utilization of resources.
>   2. A failure in one component could impact other components. For
>   example, it is possible that there are other writers to output
> stream B and
>   you don't want to disrupt them because of a bug in the
> transformation logic
>   of input stream A.
>   3. On the similar lines of 2, it also possible that due to a HDFS or
>   elastic search outage, the back pressure causes output B to grow
> which
>   might potentially impact the time to catch up and also impact your
> job's
>   performance (e.g. this can happen if you have priorities setup
> across your
>   streams)
>2. Another approach is to split them into multiple jobs; one for
>consuming input sources (A and D) and routing to appropriate
> destinations
>(B & C), another for consuming the output from the previous job (B) and
>routing it to HDFS or ElasticSearch. With this approach
>   1. You isolate the common/shared logic to write to HDFS or
>   ElasticSearch into its own job which allows you to manage it
> independently
>   including scale up/down.
>   2. During HDFS/ElasticSearch outages, other components are
>   non-impacted and the back pressure causes your stream B to grow
> which Kafka
>   can handle well.
>
>
> Our recommend API to write Samza application is Apache Beam
> . Examples on how to write a sample application
> using Beam API and run it using Samza can be found:
> https://github.com/apache/samza-beam-examples
>
> Please reach out to us if you have more questions.
>
> Thanks,
> Bharath
>
> On Wed, Oct 16, 2019 at 7:31 AM Eric Shieh  wrote:
>
> > Thank you Bharath.  Regarding my 2nd question, perhaps the following
> > scenario can help to illustrate what I am looking to achieve:
> >
> > Input stream A -> Job 1 -> Output stream B (Kafka Topic B)
> > Input stream A -> Job 2 -> Output stream C
> > Input stream D -> Job 3 -> Output stream B (Kafka Topic B)
> > Input stream B (Kafka Topic B) -> Elasticsearch (or write to HDFS)
> >
> > In the case of "Input stream B (Kafka Topic B) -> Elasticsearch (or write
> > to HDFS)" this is what I was referring to as "Common/shared system
> > services" that does not have any transformation logic except message sink
> > to either Elasticsearch or HDFS using Samza's systems/connectors.  In
> other
> > words, Job 1 and Job 3 both output to "Output stream B" expecting
> messages
> > will be persisted in Elasticsearch or HDFS, would I need to specify the
> > system/connector configuration separately in Job 1 and Job 3?  Is there a
> > way to have "Input stream B  (Kafka Topic B) -> Elasticsearch (or write
> to
> > HDFS)" as its own stand-alone job so I can have the following:
> >
> > RESTful web services (or other none Samza services/applications) as Kafka
> > producer ->  Input stream B (Kafka Topic B) -> Elasticsearch (or write to
> > HDFS)
> >
> > Regards,
> >
> > Eric
> >
> > On Mon, Oct 14, 2019 at 8:35 PM Bharath Kumara Subramanian <
> > codin.mart...@gmail.com> wrote:
> >
> > > Hi Eric,
> > >
> > > Answers to your questions are as follows
> > >
> > >
> > > >
> > > >
> > > >
> > > > *Can I, or is it recommended to, package multiple jobs as 1
> > > deploymentwith
> > > > 1 properties file or keep each app separated?  Based on
> > 

Re: Samza App Deployment Topology - Best Practices

2019-10-22 Thread Bharath Kumara Subramanian
Hi Eric,

Thanks for additional clarification. I now have a better understanding of
your use case.
Your use case be accomplished using various approaches.


   1. One Samza application that consumes all of your input streams (A and
   D) and also your intermediate output streams (B) and routes it to HDFS or
   ElasticSearch. With this approach,
  1. You may have to scale up or scale down your entire job to react to
  changes to either of your inputs (A, D & B as well). It might potentially
  result in under utilization of resources.
  2. A failure in one component could impact other components. For
  example, it is possible that there are other writers to output
stream B and
  you don't want to disrupt them because of a bug in the
transformation logic
  of input stream A.
  3. On the similar lines of 2, it also possible that due to a HDFS or
  elastic search outage, the back pressure causes output B to grow which
  might potentially impact the time to catch up and also impact your job's
  performance (e.g. this can happen if you have priorities setup
across your
  streams)
   2. Another approach is to split them into multiple jobs; one for
   consuming input sources (A and D) and routing to appropriate destinations
   (B & C), another for consuming the output from the previous job (B) and
   routing it to HDFS or ElasticSearch. With this approach
  1. You isolate the common/shared logic to write to HDFS or
  ElasticSearch into its own job which allows you to manage it
independently
  including scale up/down.
  2. During HDFS/ElasticSearch outages, other components are
  non-impacted and the back pressure causes your stream B to grow
which Kafka
  can handle well.


Our recommend API to write Samza application is Apache Beam
. Examples on how to write a sample application
using Beam API and run it using Samza can be found:
https://github.com/apache/samza-beam-examples

Please reach out to us if you have more questions.

Thanks,
Bharath

On Wed, Oct 16, 2019 at 7:31 AM Eric Shieh  wrote:

> Thank you Bharath.  Regarding my 2nd question, perhaps the following
> scenario can help to illustrate what I am looking to achieve:
>
> Input stream A -> Job 1 -> Output stream B (Kafka Topic B)
> Input stream A -> Job 2 -> Output stream C
> Input stream D -> Job 3 -> Output stream B (Kafka Topic B)
> Input stream B (Kafka Topic B) -> Elasticsearch (or write to HDFS)
>
> In the case of "Input stream B (Kafka Topic B) -> Elasticsearch (or write
> to HDFS)" this is what I was referring to as "Common/shared system
> services" that does not have any transformation logic except message sink
> to either Elasticsearch or HDFS using Samza's systems/connectors.  In other
> words, Job 1 and Job 3 both output to "Output stream B" expecting messages
> will be persisted in Elasticsearch or HDFS, would I need to specify the
> system/connector configuration separately in Job 1 and Job 3?  Is there a
> way to have "Input stream B  (Kafka Topic B) -> Elasticsearch (or write to
> HDFS)" as its own stand-alone job so I can have the following:
>
> RESTful web services (or other none Samza services/applications) as Kafka
> producer ->  Input stream B (Kafka Topic B) -> Elasticsearch (or write to
> HDFS)
>
> Regards,
>
> Eric
>
> On Mon, Oct 14, 2019 at 8:35 PM Bharath Kumara Subramanian <
> codin.mart...@gmail.com> wrote:
>
> > Hi Eric,
> >
> > Answers to your questions are as follows
> >
> >
> > >
> > >
> > >
> > > *Can I, or is it recommended to, package multiple jobs as 1
> > deploymentwith
> > > 1 properties file or keep each app separated?  Based on
> thedocumentation,
> > > it appears to support 1 app/job within a singleconfiguration as there
> is
> > no
> > > mechanism to assign multiple app classes andgiven each a name unless I
> am
> > > mistaken*
> > >
> >
> >  *app.class* is a single valued configuration and your understanding
> about
> > it based on the documentation is correct.
> >
> >
> > >
> > >
> > > *If only 1 app per config+deployment, what is the best way to
> > > handlerequirement #3 - common/shared system services as there is no app
> > or
> > > jobper say, I just need to specify the streams and output system
> > > (ieorg.apache.samza.system.hdfs.writer*
> > >
> >
> > There are couple of options to achieve your #3 requirement.
> >
> >1. If there is enough commonality between your jobs, you could have
> one
> >application class that describes the logic and have the different
> >configurations to modify the behavior of the application logic. This
> > does
> >come with some of the following considerations
> >   1. Your deployment system needs to support deploying the same
> >   application with different configs.
> >   2. Potential duplication of configuration if you configuration
> system
> >   doesn't support hierarchies and overrides.
> >   3. Potentially unmanageable for evolution, since