Re: I can't deploy my application package

2017-06-27 Thread Amol Kekre
Guiherme,
This is DataTorrent specific question. You should take it up on DataTorrent
user list.

Thks,
Amol



E:a...@datatorrent.com | M: 510-449-2606 | Twitter: @*amolhkekre*

www.datatorrent.com


On Tue, Jun 27, 2017 at 2:07 PM, Guilherme Hott 
wrote:

> Hi guys, I am having this problem when I try to deploy my package using
> the web interface.
>
> File "databuilding-1.0-SNAPSHOT.apa" upload failed. -1
> 6/27/2017, 1:33:59 PM
>
> I tested many times and there is nothing about this in the log files
>
> --
> *Guilherme Hott*
> *Software Engineer*
> Skype: guilhermehott
> @guilhermehott
> https://www.linkedin.com/in/guilhermehott
>
>


Re: Is there a way to schedule an operator?

2017-06-14 Thread Amol Kekre
The only thing missing is to kick off a job, in case the ask is to use
resources the batch way "use and terminate once done". An operator that
keeps an eye and has ability to kick off a job suffices. Kicking off a
batch job can be done via any of the following

1. Files
   -> Start post all data arrival. Usually a .done file in a dir, which
triggers entire dir to be processed
   -> Start asap and end on .done
2. Message (a start message)

I think batch use cases are mainly #1. This technically is not a batch vs
stream use case, just a scheduler (Oozie like) part of batch.

Thks
Amol



E:a...@datatorrent.com | M: 510-449-2606 | Twitter: @*amolhkekre*

www.datatorrent.com


On Tue, Jun 13, 2017 at 11:47 PM, Ganelin, Ilya  wrote:

> I think it's a very relevant use case. In the Apex formulation this would
> work as follows. An operator runs continuously and maintains an internal
> state that tracks process files or an offset (e.g. In Kafka). As more data
> becomes available, the operator performs the appropriate operation and then
> returns to waiting. In this fashion, batched data is processed as soon as
> it becomes available but the process overall is still a batch process since
> it's limited by the production of the source batches.
>
> There are a couple of examples of this in Malhar, for example the
> AbstractFileInputOperator.
>
> Your earlier comment with regards to your motivation is interesting. Can
> you elaborate on the load reduction you get with your approach? A number of
> batched small writes to a DB may prove to be more efficient from a latency
> or database utilization standpoint when compared with infrequent large
> batch writes particularly if they involve index updates.
>
>
>
>
> --
> *From:* dashi...@yahoo.com 
> *Sent:* Tuesday, June 13, 2017 6:36:29 PM
> *To:* guilhermeh...@gmail.com; users@apex.apache.org
> *Subject:* Re: Is there a way to schedule an operator?
>
> I have input operators that reach out to Google, Facebook, Bing, Yahoo
> etc. once a day or an hour and download marketing spend statistics. Apex
> promises batch and streaming to be equal class citizens. How is this
> equality achieved if there's no scheduler for batch jobs to rely on? If
> want the dag to take data stream from batch pipeline and affect streaming
> pipelines running alongside. Do you not see this as a valid use case?
>
> Sent from Yahoo Mail on Android
> 
>
> On Tue, Jun 13, 2017 at 5:29 PM, Guilherme Hott
>  wrote:
> Hi guys,
>
> Is there a way to schedule an operator? I need an operator start the DAG
> once a day at 00am.
>
> Best
>
> --
> *Guilherme Hott*
> *Software Engineer*
> Skype: guilhermehott
> @guilhermehott
> https://www.linkedin.com/in/guilhermehott
>
>
> --
>
> The information contained in this e-mail is confidential and/or
> proprietary to Capital One and/or its affiliates and may only be used
> solely in performance of work or services for Capital One. The information
> transmitted herewith is intended only for use by the individual or entity
> to which it is addressed. If the reader of this message is not the intended
> recipient, you are hereby notified that any review, retransmission,
> dissemination, distribution, copying or other use of, or taking of any
> action in reliance upon this information is strictly prohibited. If you
> have received this communication in error, please contact the sender and
> delete the material from your computer.
>


Re: how to decide no of operator partitions

2017-05-07 Thread Amol Kekre
Rishi,
How much time elapses between old app and new app? It may an issue of
resource needs to catch up. Have  you benchmarked your operators/app?

Thks
Amol



E:a...@datatorrent.com | M: 510-449-2606 | Twitter: @*amolhkekre*

www.datatorrent.com


On Sun, May 7, 2017 at 11:02 AM, rishi  wrote:

> Hi,
>
> I am running one of the apex application which contains multiple operators.
> For some operators I assigned >1 partitions as it has to do some heavy
> lifting compared to other operators. Like kafka operator I have given 2
> partitions.
>
> Now thing is I restarted the running application which got killed after 2-3
> days and after that I see there is huge latency in some of the
> operators.Even after 2-3 days latency no is huge and application is not
> able
> to process today's record.
>
> So please suggest me how to handle this kind of scenario. when we restart
> application and if we see huge latency no.
>
> Thanks
> Rishi
>
>
>
>
>
>
> --
> View this message in context: http://apache-apex-users-list.
> 78494.x6.nabble.com/how-to-decide-no-of-operator-partitions-tp1580.html
> Sent from the Apache Apex Users list mailing list archive at Nabble.com.
>


Re: Wait until setup completes prior to dag execution

2017-04-25 Thread Amol Kekre
Yes that will work too if they can be parallel partitioned.

Thks
Amol



E:a...@datatorrent.com | M: 510-449-2606 | Twitter: @*amolhkekre*

www.datatorrent.com


On Tue, Apr 25, 2017 at 8:54 PM, Bhupesh Chawda <bhup...@datatorrent.com>
wrote:

> If the operators are not very heavy, perhaps thread local could work..
>
> ~ Bhupesh
>
>
> ___
>
> Bhupesh Chawda
>
> E: bhup...@datatorrent.com | Twitter: @bhupeshsc
>
> www.datatorrent.com  |  apex.apache.org
>
>
>
> On Sat, Apr 22, 2017 at 11:04 PM, Amol Kekre <a...@datatorrent.com> wrote:
>
>>
>> This could be solved by throttling ramp up. We have cases in production
>> that do this. If throttling at the input operator is there, then it is a
>> simple matter of communicating it via software.
>>
>> Lets assume the input operator(s) can be throttled from 0% to 100%. They
>> all they need to know is the throttle number. Since this comes from
>> downstream, you can figure out ways of communicating back. Some ideas (just
>> brainstorming)
>> 1. Look at commit window to know all operators are past setup (? Can an
>> operator know about commit window?)
>> 2. Communicate via other mechanism -> Write to a file (setup done, ramp
>> up now)
>> 3. Use statListner
>> 4. Vlad' solution
>> etc.
>>
>> I think #3 may work well. Additionally it can be tuned to also help
>> resolve in run-time "how much can the input ingestion operator at". Via #3
>> this can be dynamic, and additionally the dag remains the same, it just
>> fine-tunes live. #4 can also do this by passing back meta data on the
>> feedback port.
>>
>> Thks
>> Amol
>>
>>
>>
>> E:a...@datatorrent.com | M: 510-449-2606 <(510)%20449-2606> | Twitter: @
>> *amolhkekre*
>>
>> www.datatorrent.com
>>
>>
>> On Fri, Apr 21, 2017 at 7:15 PM, Vlad Rozov <v.ro...@datatorrent.com>
>> wrote:
>>
>>> One possible way is to introduce a delay operator. I am not 100% that it
>>> will work, but it is worth trying. Introduce a dummy output port on the
>>> operator that takes too long to setup. Connect it to an input port of a
>>> delay operator. Connect delay operator output port to a dummy input port of
>>> the upstream operator. In the upstream operator do not emit on the actual
>>> output port till a tuple is received on the dummy input port. In the
>>> downstream operator emit a tuple to the dummy output port in the first
>>> beginWindow().
>>>
>>> Thank you,
>>>
>>> Vlad
>>>
>>> On 4/21/17 12:36, Ganelin, Ilya wrote:
>>>
>>> Yes, Amol – basically wanted to know how to sensibly handle the case
>>> where there’s a big back-buffer to work through.
>>>
>>>
>>>
>>> - Ilya Ganelin
>>>
>>> [image: id:image001.png@01D1F7A4.F3D42980]
>>>
>>>
>>>
>>> *From: *Amol Kekre <a...@datatorrent.com> <a...@datatorrent.com>
>>> *Reply-To: *"users@apex.apache.org" <users@apex.apache.org>
>>> <users@apex.apache.org> <users@apex.apache.org>
>>> *Date: *Thursday, April 20, 2017 at 4:27 PM
>>> *To: *"users@apex.apache.org" <users@apex.apache.org>
>>> <users@apex.apache.org> <users@apex.apache.org>
>>> *Subject: *Re: Wait until setup completes prior to dag execution
>>>
>>>
>>>
>>>
>>>
>>> Ilya,
>>>
>>> Additionally that particular operator will not process data till setup
>>> is done. As Pramod pointed out, there is no reason to wait. At worse the
>>> buffer-servers will fill up. Are you worried about the operator code need
>>> to ramp up in processing data and not get a deluge of it?
>>>
>>>
>>>
>>> Thks
>>>
>>> Amol
>>>
>>>
>>>
>>>
>>>
>>>
>>> E:a...@datatorrent.com | M: 510-449-2606 <(510)%20449-2606> | Twitter: @
>>> *amolhkekre*
>>>
>>> www.datatorrent.com
>>>
>>>
>>>
>>> On Thu, Apr 20, 2017 at 4:19 PM, Pramod Immaneni <pra...@datatorrent.com>
>>> wrote:
>>>
>>> Hi,
>>>
>>>
>>>
>>> I am assuming it is an operator that is not the input operator that has
>>> the lengthy setup function? Any specific reason you don't want to let data
>>> in because things don't get commi

Re: Wait until setup completes prior to dag execution

2017-04-22 Thread Amol Kekre
This could be solved by throttling ramp up. We have cases in production
that do this. If throttling at the input operator is there, then it is a
simple matter of communicating it via software.

Lets assume the input operator(s) can be throttled from 0% to 100%. They
all they need to know is the throttle number. Since this comes from
downstream, you can figure out ways of communicating back. Some ideas (just
brainstorming)
1. Look at commit window to know all operators are past setup (? Can an
operator know about commit window?)
2. Communicate via other mechanism -> Write to a file (setup done, ramp up
now)
3. Use statListner
4. Vlad' solution
etc.

I think #3 may work well. Additionally it can be tuned to also help resolve
in run-time "how much can the input ingestion operator at". Via #3 this can
be dynamic, and additionally the dag remains the same, it just fine-tunes
live. #4 can also do this by passing back meta data on the feedback port.

Thks
Amol



E:a...@datatorrent.com | M: 510-449-2606 | Twitter: @*amolhkekre*

www.datatorrent.com


On Fri, Apr 21, 2017 at 7:15 PM, Vlad Rozov <v.ro...@datatorrent.com> wrote:

> One possible way is to introduce a delay operator. I am not 100% that it
> will work, but it is worth trying. Introduce a dummy output port on the
> operator that takes too long to setup. Connect it to an input port of a
> delay operator. Connect delay operator output port to a dummy input port of
> the upstream operator. In the upstream operator do not emit on the actual
> output port till a tuple is received on the dummy input port. In the
> downstream operator emit a tuple to the dummy output port in the first
> beginWindow().
>
> Thank you,
>
> Vlad
>
> On 4/21/17 12:36, Ganelin, Ilya wrote:
>
> Yes, Amol – basically wanted to know how to sensibly handle the case where
> there’s a big back-buffer to work through.
>
>
>
> - Ilya Ganelin
>
> [image: id:image001.png@01D1F7A4.F3D42980]
>
>
>
> *From: *Amol Kekre <a...@datatorrent.com> <a...@datatorrent.com>
> *Reply-To: *"users@apex.apache.org" <users@apex.apache.org>
> <users@apex.apache.org> <users@apex.apache.org>
> *Date: *Thursday, April 20, 2017 at 4:27 PM
> *To: *"users@apex.apache.org" <users@apex.apache.org>
> <users@apex.apache.org> <users@apex.apache.org>
> *Subject: *Re: Wait until setup completes prior to dag execution
>
>
>
>
>
> Ilya,
>
> Additionally that particular operator will not process data till setup is
> done. As Pramod pointed out, there is no reason to wait. At worse the
> buffer-servers will fill up. Are you worried about the operator code need
> to ramp up in processing data and not get a deluge of it?
>
>
>
> Thks
>
> Amol
>
>
>
>
>
>
> E:a...@datatorrent.com | M: 510-449-2606 <(510)%20449-2606> | Twitter: @
> *amolhkekre*
>
> www.datatorrent.com
>
>
>
> On Thu, Apr 20, 2017 at 4:19 PM, Pramod Immaneni <pra...@datatorrent.com>
> wrote:
>
> Hi,
>
>
>
> I am assuming it is an operator that is not the input operator that has
> the lengthy setup function? Any specific reason you don't want to let data
> in because things don't get committed (like kafka offsets) till all
> operators process the data. If you are worried about input operators are
> going to run-away there is back pressure which will put breaks on when the
> buffers are full.
>
>
>
> Thanks
>
>
>
>
>
> On Thu, Apr 20, 2017 at 3:36 PM, Ganelin, Ilya <
> ilya.gane...@capitalone.com> wrote:
>
> Is there any way to wait for setup to complete prior to allowing data to
> flow through the DAG? I have an operator with a lengthy setup function so
> I’m just wondering what the best way to handle this is. My best solution at
> the moment is to toggle start with an emission rate of zero and toggle it
> on once the DAG is launched.
>
>
>
> - Ilya Ganelin
>
>
> --
>
> The information contained in this e-mail is confidential and/or
> proprietary to Capital One and/or its affiliates and may only be used
> solely in performance of work or services for Capital One. The information
> transmitted herewith is intended only for use by the individual or entity
> to which it is addressed. If the reader of this message is not the intended
> recipient, you are hereby notified that any review, retransmission,
> dissemination, distribution, copying or other use of, or taking of any
> action in reliance upon this information is strictly prohibited. If you
> have received this communication in error, please contact the sender and
> delete the material from your computer.
>
>
>
>
>
> --

Re: JSONObject Error when attempting to configure properties in UI

2017-04-18 Thread Amol Kekre
Ilya,
This question should be on datatorrent users alias. I am moving apex@users
to bcc

Thks
Amol


E:a...@datatorrent.com | M: 510-449-2606 | Twitter: @*amolhkekre*

www.datatorrent.com


On Tue, Apr 18, 2017 at 3:19 PM, Ganelin, Ilya 
wrote:

> I get the following error when attempting to create a new app
> configuration. Any thoughts as to what’s happening?
>
>
>
>
>
>
>
> - Ilya Ganelin
>
> [image: id:image001.png@01D1F7A4.F3D42980]
>
> --
>
> The information contained in this e-mail is confidential and/or
> proprietary to Capital One and/or its affiliates and may only be used
> solely in performance of work or services for Capital One. The information
> transmitted herewith is intended only for use by the individual or entity
> to which it is addressed. If the reader of this message is not the intended
> recipient, you are hereby notified that any review, retransmission,
> dissemination, distribution, copying or other use of, or taking of any
> action in reliance upon this information is strictly prohibited. If you
> have received this communication in error, please contact the sender and
> delete the material from your computer.
>


Re: Rest call to Data Torrent package deploy failing

2017-03-30 Thread Amol Kekre
Surya,
Moving users@apex to bcc. Do take this up on dt-users google group.

Thks
Amol



E:a...@datatorrent.com | M: 510-449-2606 | Twitter: @*amolhkekre*

www.datatorrent.com

*Join us at Apex Big Data World Mt View
, April 4, 2017!*
[image: http://www.apexbigdata.com/san-jose-register.html]


On Thu, Mar 30, 2017 at 10:59 AM, Vlad Rozov 
wrote:

> Hi Surya,
>
> This mailing list covers only Apache Apex questions. For questions related
> to a specific vendor distribution or vendor products, please contact
> corresponding vendor. In this particular case, please raise Datatorrent
> support ticket or contact Datatorrent.
>
> Thank you,
>
> Vlad
>
> *Join us at Apex Big Data World-San Jose
> , April 4, 2017*
> [image: http://www.apexbigdata.com/san-jose-register.html]
> 
> On 3/30/17 10:48, Mukkamula, Suryavamshivardhan (CWM-NR) wrote:
>
> Hi,
>
> Can you please check when will I get the below error message in yellow ,I
> am trying to login to Data Torrent through rest call and deploying the
> package.
>
> [XX@XX254 DEV-Titan]$ logCurlOut=$(curl -k -iX POST -H
> "Content-Type: application/json" -d @userpass.json
> https://XX1254.devfg.rbc.com:9090/ws/v2/login)
>   % Total% Received % Xferd  Average Speed   TimeTime Time
> Current
>  Dload  Upload   Total   SpentLeft
> Speed
> 100   135076  10059   1134880 --:--:-- --:--:-- --:--:--
> 1288
> [XX@XX254 DEV-Titan]$ substring1=$(echo $logCurlOut| cut -d'=' -f
> 2)
> [XX@XX254 DEV-Titan]$ substring2=$(echo $substring1| cut -d';' -f
> 1)
> [XX@XX254 DEV-Titan]$ postCurlResp=$(curl -k -iX POST -T
> modified.apa -H "Cookie: session=$substring2; Content-Type:
> application/octet-stream" https://XX1254.devfg.rbc.
> com:9090/ws/v2/appPackages?merge=replace)
>   % Total% Received % Xferd  Average Speed   TimeTime Time
> Current
>  Dload  Upload   Total   SpentLeft
> Speed
>   0 31.5M0 20 81920 62  2516k  0:00:12 --:--:--  0:00:12
> 3200k
> [XX@XX254 DEV-Titan]$ echo $postCurlResp
> {}ansfer-Encoding: chunkedjson9 GMT
>
> Regards,
> Surya Vamshi
>
>
>
>
> ___
>
> If you received this email in error, please advise the sender (by return
> email or otherwise) immediately. You have consented to receive the attached
> electronically at the above-noted email address; please retain a copy of
> this confirmation for future reference.
>
> Si vous recevez ce courriel par erreur, veuillez en aviser l'expéditeur
> immédiatement, par retour de courriel ou par un autre moyen. Vous avez
> accepté de recevoir le(s) document(s) ci-joint(s) par voie électronique à
> l'adresse courriel indiquée ci-dessus; veuillez conserver une copie de
> cette confirmation pour les fins de reference future.
>
>
>


Re: apex metrics integration into monitoring system

2017-03-02 Thread amol kekre
Mohammad,
Apex metrics/stats are available via webservice. You can write another apex
app to simply call this webservice and write to JMX or any external system.

Thks
Amol


On Tue, Feb 28, 2017 at 5:06 PM, Mohammad Kargar  wrote:

> Is there any way to integrate Apex metrics/stats into an external
> monitoring system (e.g. graphite)? Also what's the best way for enabling
> jmx for a submitted job?
>
> Thanks
>


Re: High Availability of Data Torrent

2017-02-07 Thread Amol Kekre
Chiranjeevi,
Please take this up on dt-us...@googlegroups.com as it is DataTorrent
related. I have moved users@apex to bcc

Thks
Amol


*Join us at Apex Big Data World-San Jose
, April 4, 2017!*
[image: http://www.apexbigdata.com/san-jose-register.html]


On Tue, Feb 7, 2017 at 2:46 AM, chiranjeevi vasupilli 
wrote:

> Hi team,
>
> Please provide the data points to setup seoundry DT, which can active and
> run jobs when Primary DT goes down for any reason.
>
> Looking for data pints like
> • How we can setup
> • Complexity involved in it
> • Pros and Cons
> • Any one already done this kind of setup
>
> Thanks
> Chiru
>


Re: One-time Initialization of in-memory data using a data file

2017-01-23 Thread Amol Kekre
Roger,
I am guessing here. The ask seems to be for an Apex app that otherwise
would query data from Mainframe to not do so. The aim being to offload
key-val lookup from Mainframe. Instead you are looking to see if the query
can be from an operator that does what a key-val store would do for you
(via an input port).

In such a case HDHT/ManagedState would work. It is under-utilization of
what they do, but it will work. You can put in an IMDG that the Apex app
can directly query from. In essence you would replace Mainframe with an
IMDG. Functionally the set up will work just as Mainframe. Operationally
however do check if you want to put up another system (IMDG in this case)
for your devOps to support. From TCO perspective you may get different
answers from your devOps.

Thks,
Amol


On Mon, Jan 23, 2017 at 3:44 PM, Thomas Weise  wrote:

> First link without frame:
>
> https://ci.apache.org/projects/apex-malhar/apex-
> malhar-javadoc-release-3.6/org/apache/apex/malhar/lib/state/managed/
> AbstractManagedStateImpl.html
>
>
> On Mon, Jan 23, 2017 at 3:33 PM, Thomas Weise  wrote:
> > Roger,
> >
> > An Apex operator typically holds state that it uses for processing and
> > often that state is mutable. For large state: "Managed state" in
> > Malhar (and its predecessor HDHT) were designed for large state that
> > can be mutated efficiently under a specific write pattern (semi
> > ordered keys). However, there is no benefit of using these for
> > immutable data that is already in HDFS.
> >
> > In such case it would be best to store them (during migration/ingest)
> > in HDFS a file format that allows for fast random reads (block
> > structured files like HFile or TFile or any other indexed structure
> > provide that).
> >
> > Also, depending on how the data, once in memory, would be used, an
> > Apex operator may or may not be the right home. If the goal is to only
> > lookup data without further processing with a synchronous
> > request/response pattern, then an IMDG or similar system may be a more
> > appropriate solution.
> >
> > Here are pointers for managed state:
> >
> > https://ci.apache.org/projects/apex-malhar/apex-
> malhar-javadoc-release-3.6/index.html
> > https://github.com/apache/apex-malhar/blob/master/
> benchmark/src/main/java/com/datatorrent/benchmark/state/
> ManagedStateBenchmarkApp.java
> >
> > Thanks,
> > Thomas
> >
> >
> > On Sun, Jan 22, 2017 at 11:43 PM, Ashwin Chandra Putta
> >  wrote:
> >> Roger,
> >>
> >> Depending on the certain requirements on expected latency, size of data
> etc,
> >> the operator's design will change.
> >>
> >> If latency needs to be lowest possible, meaning completely in-memory
> and not
> >> hitting the disk for read I/O, there are two scenarios
> >> 1. If the lookup data size is small --> just load to memory in the setup
> >> call, switch off checkpointing to get rid off checkpoint I/O latency in
> >> between. In case of operator restarts, the data should be reloaded in
> setup.
> >> 2. If the lookup data is large --> have many partitions of this
> operator to
> >> minimize the footprint of each partition. Still switch off
> checkpointing and
> >> reload in setup in case of operator restart. Having many partitions will
> >> ensure that the setup load is fast. The incoming query needs to be
> >> partitioned based on the lookup key.
> >>
> >> You can use the PojoEnricher with FSLoader for above design.
> >>
> >> Code:
> >> https://github.com/apache/apex-malhar/blob/master/
> contrib/src/main/java/com/datatorrent/contrib/enrich/POJOEnricher.java
> >> Example:
> >> https://github.com/DataTorrent/examples/tree/master/tutorials/enricher
> >>
> >> In case of large lookup dataset and latency caused by disk read I/O is
> fine,
> >> then use HDHT or managed state as a backup mechanism for the in-memory
> data
> >> to decrease the checkpoint footprint. I could not find example for
> managed
> >> state but here are the links for HDHT..
> >>
> >> Code:
> >> https://github.com/DataTorrent/Megh/tree/master/
> contrib/src/main/java/com/datatorrent/contrib/hdht
> >> Example:
> >> https://github.com/DataTorrent/examples/blob/master/tutorials/hdht/src/
> test/java/com/example/HDHTAppTest.java
> >>
> >> Regards,
> >> Ashwin.
> >>
> >> On Sun, Jan 22, 2017 at 10:45 PM, Sanjay Pujare  >
> >> wrote:
> >>>
> >>> You may want to take a look at com.datatorrent.lib.
> fileaccess.DTFileReader
> >>> in the malhar-library – not sure whether it gives you reading the
> whole file
> >>> into memory.
> >>>
> >>>
> >>>
> >>> Also there is a library called Megh at https://github.com/
> DataTorrent/Megh
> >>> where you might find some useful operators like
> >>> com.datatorrent.contrib.hdht.hfile.HFileImpl .
> >>>
> >>>
> >>>
> >>> From: Roger F 
> >>> Reply-To: 
> >>> Date: Sunday, January 22, 2017 at 9:32 PM
> >>> To: 
> >>> Subject: One-time 

Re: PARTITION_PARALLEL Vs Regular mode

2016-12-22 Thread Amol Kekre
Arvindan,
Based on what you have it looks like shuffle is not needed between
Kafka->ParquetWriter. The decision to use parallel partiion should be
ideally based the need to shuffle. If so option [1] should not be used per
se. Why even bother to shuffle if you do not need to.

Assuming the ask is really between option [2] and [3], the bottleneck in
your mini-dags (each parallel partition) will dictate the number of
partitions. So here are the questions I would ask
1. What are the minimum number of ParquetWriters you need to meet your SLA?
// benchmark each to find out
2. Are you using 120 KafkaConsumers because there are 120 topics? or is it
N topics/120 (where N >> 120) to balance the load? i.e. do you have a
choice to have more kafka partitions

In general for parallel partiion, the number of partitions should as per
the bottleneck operator. Assuming in this case it is Parquet and the answer
is 200 partitions, why would 200 Kafka->200 Parquet with container local
setup not be ok. If they share containers, the memory may actually be same
or lower than 300 separate containers (100 Kafka + 200 Parquet). Kafka and
Parquet most likely do not compete for same resource (I/O vs CPU), so
option [4] as follows should work

4.
- Figure out minimum Parquet writers to meet your SLA
- Bump up Kafka to those many partitions
- Use container local + parallel partition
- Experiment with container memory size to bring net memory down
In this case, you will save on I/O between Kafka->Parquet routing through
NIC; no serialization between Kafka->Parquet. There is a small chance that
thread-local between Kafka->Parquet may work, which will move back-pressure
(from some topic skew, spikes, ) to Kafka publisher.

Thks
Amol



On Thu, Dec 22, 2016 at 11:06 AM, Pramod Immaneni 
wrote:

> Arvindan,
>
> When you had the MxN case with 100 kafka consumers sending to 120 parquet
> writers what was the cpu utilization of the parquet containers. Was it
> close to 100% or did you have spare cycles? I am trying to determine if it
> is an IO bottleneck or processing.
>
> Thanks
>
> On Thu, Dec 22, 2016 at 10:50 AM, Arvindan Thulasinathan <
> aravindan.thulasinat...@oracle.com> wrote:
>
>> Hi,
>>   We have an Apex Application which has a DAG structure like this:
>>
>> KafkaConsumer —>  ParquetWriter
>>
>> The KafkaConsumer is running at a scale where we have 100 containers for
>> consumer consuming from a Kafka-Cluster with an incoming rate of 300K
>> msg/sec and each message is about 1KB (Each message is a highly nested Avro
>> message). We arrived at the 100 container number for the consumer in order
>> to keep-up with the incoming traffic. The ParquetWriter is a bit CPU
>> intensive in our case and we thought we may require about 120 -
>> 130Containers.
>>
>>
>> We have 3 different observations here:
>>   1. Using 100 KafkaConsumer and 120 ParquetWriter Without Partition
>> Parallel:
>>  In this case, Apex automatically introduces a
>> pass-through unifier.  In this scenario, we have seen that invariably
>> ParquetWriter processes tuples at a lesser rate than KafkaConsumer’s emit
>> rate. That is, if Consumer emits Tuples at the rate of 20 million/sec, the
>> ParquetWriter will only write at the rate of 17 million/sec. Also, it
>> invariably leads to backpressure and makes the consumer consume at a lower
>> rate. I have tried going beyond 120 containers as well and
>> I believe a possible reason could be - Unifier and Writer are in the same
>> container and presumably share the same core. And hence they are slower? Is
>> this observation correct? I tried tuning by increasing the
>> Writer.Inputport.QUEUE_SIZE to 10K. The queue is not even getting half
>> full, but still the back-pressure is created for the consumer. Is there any
>> additional tune-up that I can do, to:
>>A. Make the writer process tuples at almost the same pace as Consumer
>> without backpressure on the Consumer
>>
>>   2. Using Partition Parallel with 120 KafkaConsumer and 120
>> ParquetWriter:
>> In this scenario as well, we have seen that ParquetWriter
>> processes tuples at a lesser rate than KafkaConsumer’s emit rate. That is,
>> if Consumer emits Tuples at the rate of 20 million/sec, the ParquetWriter
>> will only write at the rate of 19 million/sec. This behavior is true even
>> if we keep increasing the Consumers and writers to 130 or 140 containers. I
>> believe this behavior is because we think the wrier is a bit CPU intensive.
>>
>> 3. Using a different DAG structure like this:
>>
>>  KafkaConsumer —> ParquetWriter1
>>   |
>>   |——> ParquetWriter2
>>
>>   In this case, both ParquetWriter1 and ParquetWriter2 are same writers.
>> But each have a Partition_Parallel relationship with KafkaConsumer. Each is
>> of 100 containers. So 300 containers in total. The KafkaConsumer sends 1
>> half of messages to ParquetWriter1 and the other half to ParquetWriter2.
>> That is, out of 20 

Re: Ab initio vs Apex

2016-12-09 Thread Amol Kekre
Ananth,
Customers have talked about Ab initio vs Hadoop-Apex, but there is no data
available on any benchmarking or feature compare that I know of. You may
want to look at Hadoop vs Ab initio to find data on internet.

Thks
Amol


On Fri, Dec 9, 2016 at 2:19 AM, Ananth G  wrote:

> Hello all,
>
> I was wondering if anyone has done any tests comparing Apex with Ab
> initio. If yes could you please share your experiences.
>
> I guess Abinitio is closed source and there is not much I could gather
> from the internet
>
> Regards
> Ananth
>
>
>


Re: Apex Communication Protocols

2016-11-25 Thread Amol Kekre
Max,
Between two operators the communication depends on stream locality. For
thread local, the tuple is passed via thread stack; for container local
there is a queue in between. For the rest there is a buffer server. This is
effectively a pub-sub mechanism built over tcp-ip. Default is kryo
serialization, but you can add your own. Addressing is via pub-sub; i.e.
sender does not bother about addressing, the receiver connects to the
buffer server showing interest. Routing is kicked off during launch by the
master (Stram) and is a launch time or "re-do physical plan" time decision.
Re-do will happen in outage, or dynamic changes.

Thks
Amol


On Fri, Nov 25, 2016 at 8:33 AM, Max Bridgewater 
wrote:

> Hi Folks,
>
> I was giving an Apex demo the other day and people asked following
> questions:
>
> 1) what is the communication protocol between operators when they are on
> distant nodes. That means, how does Apex transport the tuples from one node
> to the other?
> Is it a custom protocol on top of TCP/IP or is it RPC?
> 2) What is the serialization algorithm used?
> 3) What is the addressing scheme between operators? That means how does
> Apex know where an operator is located and how to route data to it? Is
> there an operator registry? If so, where does it reside?
>
> Thoughts?
>
> Thanks,
> Max.
>
>


Re: balanced of Stream Codec

2016-10-15 Thread Amol Kekre
Sunil,
Round robin in an internal operator could be used in exactly once writes to
external system for certain operations. I do know what your business logic
is, but in case it can be split into partitions and then unified (for
example aggregates), you have a situation where you can use round robin and
then unify the result. The result can then be fed into an output operator
that handles exactly once semantics with external system like Cassandra
etc. Apex engine will guarantee that all the tuples in a window are same
for the logical operator, so for the operator that you partition by
round-robin, the result post-unifier should be identical (aka effectively
idempotent) even if each partition is not idempotent. Often "each partition
be idempotent" is not useful for internal operators. The case where this
would not work is if order of tuples based on category_name is important.

Thks
Amol


On Sat, Oct 15, 2016 at 6:03 PM, Sandesh Hegde 
wrote:

> Round robin is not idempotent, so you can't have exactly once.
>
> On Sat, Oct 15, 2016 at 4:49 PM Munagala Ramanath 
> wrote:
>
>> If you want round-robin distribution which will give you uniform load
>> across all partitions you can use
>> a StreamCodec like this (provided the number of partitions is known and
>> static):
>>
>> *public class CatagoryStreamCodec extends
>> KryoSerializableStreamCodec {*
>> *  private int n = 0;*
>> *  @Override*
>> *  public int getPartition(Object in) {*
>> *return n++ % nPartitions;// nPartitions is the number of
>> partitions*
>> *  }*
>> *}*
>>
>> If you want certain category names to go to certain partitions, you can
>> create that mapping
>> within the StreamCodec (map category names to integers in the range
>> *0..nPartitions-1*), and, for each tuple, lookup the category name in
>> the map and return the corresponding value.
>>
>> Ram
>> 
>>
>> On Fri, Oct 14, 2016 at 1:17 PM, Sunil Parmar 
>> wrote:
>>
>> We’re using Stream codec to consistently / parallel processing of the
>> data across the operator partitions. Our requirement is to serialize
>> processing of the data based on particular tuple attribute let’s call it
>> ‘catagory_name’ . In order to achieve the parallel processing of different
>> category names we’re written our stream codec as following.
>>
>>public class CatagoryStreamCodec extends 
>> KryoSerializableStreamCodec
>> {
>>
>> private static final long serialVersionUID = -687991492884005033L;
>>
>>
>>
>> @Override
>>
>> public int getPartition(Object in) {
>>
>> try {
>>
>> InputTuple tuple = (InputTuple) in;
>>
>> String partitionKehy = tuple.getName();
>>
>> if(partitionKehy != null) {
>>
>> return partitionKehy.hashCode();
>>
>> }
>>
>> }
>>}
>>
>> It’s working as expected *but *we observed inconsistent partitions when
>> we run this in production env with 20 partitioner of the operator following
>> the codec in the dag.
>>
>>- Some operator instance didn’t process any data
>>- Some operator instance process as many tuples as combined everybody
>>else
>>
>>
>> Questions :
>>
>>- getPartition method supposed to return the actual partition or just
>>some lower bit used for deciding partition ?
>>- Number of partitions is known to application properties and can
>>vary between deployments or environments. Is it best practice to use that
>>property in the stream codec ?
>>- Any recommended hash function for getting consistent variations in
>>the lower bit with less variety of data. we’ve ~100+ categories and I’m
>>thinking to have 10+ operator partitions.
>>
>>
>> Thanks,
>> Sunil
>>
>>


Re: Support for dynamic topology

2016-08-30 Thread Amol Kekre
Hyunseok,
>From the DAG it looks like single port. In case that is true, one way to
get around this is to introduce a dummy pass-through operator during design
of the app itself. This pass through operator shoudl have two input ports
and one output port. One of the port is getting data from Y, other is open
during launch time. During run time try inserting new operator P and
connect it to this open port, and then disconned Y. A little tricky, but
see if it works. The danger here is that the connects and disconnects
should be triggered on the same window boundary, else you will have a
window with both old and new operators are active.

Thks
Amol


On Tue, Aug 30, 2016 at 1:57 PM, Thomas Weise <thomas.we...@gmail.com>
wrote:

> Hi,
>
> This depends on the operator Z. If it has multiple input ports and those
> are optional, you can add P, then connect P to Z (and X), then remove Y.
>
> If Z has a single port, then Z and everything downstream would need to be
> removed or else the change won't result in a valid DAG and won't be
> accepted.
>
> Thomas
>
>
> On Tue, Aug 30, 2016 at 1:48 PM, Hyunseok Chang <hyunseok.ch...@gmail.com>
> wrote:
>
>> So if I replace Y in pipeline X -> Y -> Z -> U -> W -> V with P, then
>> what I would have is X -> P -> Z' -> U' -> W' -> V' ?   Where Z', U', W'
>> and V' are new operator instances that need to be deployed along with P.
>>
>> Is my understanding correct?   If so is there any reason why we cannot
>> re-use existing operators downstream?
>>
>> -hs
>>
>>
>> On Tue, Aug 30, 2016 at 2:46 PM, Amol Kekre <a...@datatorrent.com> wrote:
>>
>>>
>>> Hyunseok,
>>> The new route in the pipeline will have a new Z operator. If you want to
>>> use the old Z operator (state?) then things get tricky. Do confirm that you
>>> do not plan to use old Z operator.
>>>
>>> Thks,
>>> Amol
>>>
>>>
>>> On Tue, Aug 30, 2016 at 11:02 AM, Sandesh Hegde <sand...@datatorrent.com
>>> > wrote:
>>>
>>>> Hello hs,
>>>>
>>>> Yes, you can change the topology from the Apex CLI.
>>>>
>>>> One possible sequence of commands for your scenario is described below,
>>>>
>>>> connect appid
>>>> begin-logical-plan-change
>>>> create-operator 
>>>> add-stream-sink ...  ( for the input of P )
>>>> add-stream-sink ... ( for the output of P )
>>>> remove-operator ...
>>>> submit
>>>>
>>>> Note: All the required operators needs to be in the package.
>>>>
>>>> Thanks
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Tue, Aug 30, 2016 at 7:22 AM Hyunseok Chang <
>>>> hyunseok.ch...@gmail.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I'd like to know more about Apex support for dynamic topology.
>>>>>
>>>>> From my readings on Apex, I understand we can add additional parallel
>>>>> tasks for each operator and change data partitioning among them 
>>>>> dynamically
>>>>> at run time (so-called data partitioning and unification features).
>>>>>
>>>>> My question is can we change the "logical" DAG at run time?
>>>>>
>>>>> Let's say my logical DAG is a chain of three operators X, Y & Z (i.e.,
>>>>> X -> Y -> Z).  Now at run time I want to replace operator Y with operator
>>>>> P, such that the new logical DAG would look like X -> P -> Z.
>>>>>
>>>>> Is it something I can do with Apex?
>>>>>
>>>>> Thanks!
>>>>> -hs
>>>>>
>>>>>
>>>
>>
>


Re: can operators emit on a different from the operator itself thread?

2016-08-10 Thread Amol Kekre
Should be have this as an utility ?

Thks
Amol


On Wed, Aug 10, 2016 at 1:57 PM, Munagala Ramanath 
wrote:

> For cases where use of a different thread is needed, it can write tuples
> to a queue from where the operator thread pulls them --
> JdbcPollInputOperator in Malhar has an example.
>
> Ram
>
> On Wed, Aug 10, 2016 at 1:50 PM, hsy...@gmail.com 
> wrote:
>
>> Hey Vlad,
>>
>> Thanks for bringing this up. Is there an easy way to detect unexpected
>> use of emit method without hurt the performance. Or at least if we can
>> detect this in debug mode.
>>
>> Regards,
>> Siyuan
>>
>> On Wed, Aug 10, 2016 at 11:27 AM, Vlad Rozov 
>> wrote:
>>
>>> The short answer is no, creating worker thread to emit tuples is not
>>> supported by Apex and will lead to an undefined behavior. Operators in Apex
>>> have strong thread affinity and all interaction with the platform must
>>> happen on the operator thread.
>>>
>>> Vlad
>>>
>>
>>
>


Re: A proposal for Malhar

2016-07-12 Thread Amol Kekre
My vote is to do 2&3

Thks
Amol


On Tue, Jul 12, 2016 at 12:14 PM, Kottapalli, Venkatesh <
vkottapa...@directv.com> wrote:

> +1 for deprecating the packages listed below.
>
> -Original Message-
> From: hsy...@gmail.com [mailto:hsy...@gmail.com]
> Sent: Tuesday, July 12, 2016 12:01 PM
>
> +1
>
> On Tue, Jul 12, 2016 at 11:53 AM, David Yan <da...@datatorrent.com> wrote:
>
> > Hi all,
> >
> > I would like to renew the discussion of retiring operators in Malhar.
> >
> > As stated before, the reason why we would like to retire operators in
> > Malhar is because some of them were written a long time ago before
> > Apache incubation, and they do not pertain to real use cases, are not
> > up to par in code quality, have no potential for improvement, and
> > probably completely unused by anybody.
> >
> > We do not want contributors to use them as a model of their
> > contribution, or users to use them thinking they are of quality, and
> then hit a wall.
> > Both scenarios are not beneficial to the reputation of Apex.
> >
> > The initial 3 packages that we would like to target are *lib/algo*,
> > *lib/math*, and *lib/streamquery*.
> >
> > I'm adding this thread to the users list. Please speak up if you are
> > using any operator in these 3 packages. We would like to hear from you.
> >
> > These are the options I can think of for retiring those operators:
> >
> > 1) Completely remove them from the malhar repository.
> > 2) Move them from malhar-library into a separate artifact called
> > malhar-misc
> > 3) Mark them deprecated and add to their javadoc that they are no
> > longer supported
> >
> > Note that 2 and 3 are not mutually exclusive. Any thoughts?
> >
> > David
> >
> > On Tue, Jun 7, 2016 at 2:27 PM, Pramod Immaneni
> > <pra...@datatorrent.com>
> > wrote:
> >
> >> I wanted to close the loop on this discussion. In general everyone
> >> seemed to be favorable to this idea with no serious objections. Folks
> >> had good suggestions like documenting capabilities of operators, come
> >> up well defined criteria for graduation of operators and what those
> >> criteria may be and what to do with existing operators that may not
> >> yet be mature or unused.
> >>
> >> I am going to summarize the key points that resulted from the
> >> discussion and would like to proceed with them.
> >>
> >>- Operators that do not yet provide the key platform capabilities to
> >>make an operator useful across different applications such as
> >> reusability,
> >>partitioning static or dynamic, idempotency, exactly once will still
> be
> >>accepted as long as they are functionally correct, have unit tests
> >> and will
> >>go into a separate module.
> >>- Contrib module was suggested as a place where new contributions go
> in
> >>that don't yet have all the platform capabilities and are not yet
> >> mature.
> >>If there are no other suggestions we will go with this one.
> >>- It was suggested the operators documentation list those platform
> >>capabilities it currently provides from the list above. I will
> >> document a
> >>structure for this in the contribution guidelines.
> >>- Folks wanted to know what would be the criteria to graduate an
> >>operator to the big leagues :). I will kick-off a separate thread
> >> for it as
> >>I think it requires its own discussion and hopefully we can come
> >> up with a
> >>set of guidelines for it.
> >>- David brought up state of some of the existing operators and their
> >>retirement and the layout of operators in Malhar in general and how
> it
> >>causes problems with development. I will ask him to lead the
> >> discussion on
> >>that.
> >>
> >> Thanks
> >>
> >> On Fri, May 27, 2016 at 7:47 PM, David Yan <da...@datatorrent.com>
> wrote:
> >>
> >> > The two ideas are not conflicting, but rather complementing.
> >> >
> >> > On the contrary, putting a new process for people trying to
> >> > contribute while NOT addressing the old unused subpar operators in
> >> > the repository
> >> is
> >> > what is conflicting.
> >> >
> >> > Keep in mind that when people try to contribute, they always look
> >> > at the existing operators already in the r