Re: [VOTE] SEP-1: Semantics of ProcessorId in Samza

2017-03-29 Thread Yan Fang
+1 . Thanks for the proposal, Navina. :)

Fang, Yan
yanfang...@gmail.com

On Thu, Mar 30, 2017 at 4:24 AM, Prateek Maheshwari <
pmaheshw...@linkedin.com.invalid> wrote:

> +1 (non binding) from me.
>
> - Prateek
>
> On Tue, Mar 28, 2017 at 2:17 PM, Boris S  wrote:
>
> > +1 Looks good to me.
> >
> > On Tue, Mar 28, 2017 at 2:00 PM, xinyu liu 
> wrote:
> >
> > > +1 on my side. Very happy to see this proposal. This is a blocker for
> > > integrating fluent API with StreamProcessor, and hopefully we can get
> it
> > > resolved soon :).
> > >
> > > Thanks,
> > > Xinyu
> > >
> > > On Tue, Mar 28, 2017 at 11:28 AM, Navina Ramesh (Apache) <
> > > nav...@apache.org>
> > > wrote:
> > >
> > > > Hi everyone,
> > > >
> > > > This is a voting thread for SEP-1: Semantics of ProcessorId in Samza.
> > > > For reference, here is the wiki link:
> > > > https://cwiki.apache.org/confluence/display/SAMZA/SEP-
> > > > 1%3A+Semantics+of+ProcessorId+in+Samza
> > > >
> > > > Link to discussion mail thread:
> > > > http://mail-archives.apache.org/mod_mbox/samza-dev/201703.
> > > > mbox/%3CCANazzuuHiO%3DvZQyFbTiYU-0Sfh3riK%3Dz4j_
> > > AdCicQ8rBO%3DXuYQ%40mail.
> > > > gmail.com%3E
> > > >
> > > > Please vote on this SEP asap. :)
> > > >
> > > > Thanks!
> > > > Navina
> > > >
> > >
> >
>


Re: [DISCUSS] Moving to github/pull-request for code review and check-in

2016-02-19 Thread Yan Fang
+1.

Though I am familiar with the current way, still think the pull requests
are simpler.

Cheers,

Fang, Yan
yanfang...@gmail.com

On Fri, Feb 19, 2016 at 11:10 AM, Milinda Pathirage 
wrote:

> +1. Calcite uses pull requests for contributions from non-committers and
> according to my experience with Calcite, pull requests are easier than the
> current approach we follow in Samza.
>
> Milinda
>
> On Thu, Feb 18, 2016 at 9:09 PM, Roger Hoover 
> wrote:
>
> > +1 - Thanks for bringing this up, Yi.  I've done it both ways and feel
> > pull requests are much easier.
> >
> > Sent from my iPhone
> >
> > > On Feb 18, 2016, at 4:25 PM, Navina Ramesh
> 
> > wrote:
> > >
> > > +1
> > >
> > > Haven't tried any contribution with pull requests. But sounds simpler
> > than
> > > attaching the patch to JIRA.
> > >
> > > Navina
> > >
> > >> On Thu, Feb 18, 2016 at 4:01 PM, Jacob Maes 
> > wrote:
> > >>
> > >> +1
> > >>
> > >> As a relatively new contributor to Samza, I've certainly felt the
> > current
> > >> process was overly-complicated.
> > >>
> > >>> On Thu, Feb 18, 2016 at 3:53 PM, Yi Pan  wrote:
> > >>>
> > >>> Hi, all,
> > >>>
> > >>> I want to start the discussion on our code review/commit process.
> > >>>
> > >>> I felt that our code review and check-in process is a little bit
> > >>> cumbersome:
> > >>> - developers need to create RBs and attach diff to JIRA
> > >>> - committers need to review RBs, dowload diff and apply, then push.
> > >>>
> > >>> It would be much lighter if we take the pull request only approach,
> as
> > >>> Kafka already converted to:
> > >>> - for the developers, the only thing needed is to open a pull
> request.
> > >>> - for committers, review and apply patch is from the same PR and
> merge
> > >> can
> > >>> be done directly on remote git repo.
> > >>>
> > >>> Of course, there might be some hookup scripts that we will need to
> link
> > >>> JIRA w/ pull request in github, which Kafka already does. Any
> comments
> > >> and
> > >>> feedbacks are welcome!
> > >>>
> > >>> Thanks!
> > >>>
> > >>> -Yi
> > >
> > >
> > >
> > > --
> > > Navina R.
> >
>
>
>
> --
> Milinda Pathirage
>
> PhD Student | Research Assistant
> School of Informatics and Computing | Data to Insight Center
> Indiana University
>
> twitter: milindalakmal
> skype: milinda.pathirage
> blog: http://milinda.pathirage.org
>


Re:Re: Samza processing reference data

2015-10-28 Thread Yan Fang


* Is there a tentative date for 0.10.0 release?
I think it's coming out soon. @Yi Pan , he should know more about that.


* I checked the checkpoint topic for Samza job and it seems the checkpoint 
topic is created with1 partition by default. Given that each Samza task will 
need to read from checkpoint topic, it is similar to what I need to read (Each 
Samza task is reading from the same partition of a topic). I am wondering how 
is that achieved?
In current implementation, only the AM reads the checkpoint stream and 
distribute the information to all the nodes using the http server. Not all the 
nodes are consuming the checkpoint stream. Correct me if I am wrong.


Thanks,
Yan






At 2015-10-28 02:49:23, "Chen Song" <chen.song...@gmail.com> wrote:
>Thanks Yan.
>
>* Is there a tentative date for 0.10.0 release?
>* I checked the checkpoint topic for Samza job and it seems the checkpoint
>topic is created with1 partition by default. Given that each Samza task
>will need to read from checkpoint topic, it is similar to what I need to
>read (Each Samza task is reading from the same partition of a topic). I am
>wondering how is that achieved?
>
>Chen
>
>On Sat, Oct 24, 2015 at 5:52 AM, Yan Fang <yanfangw...@163.com> wrote:
>
>> Hi Chen Song,
>>
>>
>> Sorry for the late reply. What you describe is a typical bootstrap use
>> case. Check
>> http://samza.apache.org/learn/documentation/0.9/container/streams.html ,
>> the bootstrap configuration. By using this one, Samza will always read the
>> *topicR* from the beginning when it restarts. And then it treats the
>> *topicR* as a normal topic after reading existing msgs in the *topicD*.
>>
>>
>> == can we configure each individual Samza task to read data from all
>> partitions from a topic?
>> It works in the 0.10.0 by using the broadcast stream. In the 0.9.0, you
>> have to "create topicR with the same number of partitions as *topicD*, and
>> replicate data to all partitions".
>>
>>
>> Hope this still helps.
>>
>>
>> Thanks,
>> Yan
>>
>>
>> At 2015-10-22 04:44:41, "Chen Song" <chen.song...@gmail.com> wrote:
>> >In our samza app, we need to read data from MySQL (reference table) with a
>> >stream. So the requirements are
>> >
>> >* Read data into each Samza task before processing any message.
>> >* The Samza task should be able to listen to updates happening in MySQL.
>> >
>> >I did some research after scanning through some relevant conversations and
>> >JIRAs on the community but did not find a solution yet. Neither I find a
>> >recommended way to do this.
>> >
>> >If my data streams comes from a topic called *topicD*, options in my mind
>> >are:
>> >
>> >   - Use Kafka
>> >  1. Use one of CDC based solution to replicate data in MySQL to a
>> >  topic Kafka. https://github.com/wushujames/mysql-cdc-projects/wiki.
>> >  Say the topic is called *topicR*.
>> >  2. In my Samza app, read reference table from *topicR *and persisted
>> >  in a cache in each Samza task's local storage.
>> > - If the data in *topicR *is NOT partitioned in the same way as
>> > *topicD*, can we configure each individual Samza task to read
>> data
>> > from all partitions from a topic?
>> > - If the answer to the above question is no, do I need to
>> >create *topicR
>> > *with the same number of partitions as *topicD*, and replicate
>> > data to all partitions?
>> > - On start, how to make Samza task to block processing the first
>> > message from *topicD* before reading all data from *topicR*.
>> >  3. Any new updates/deletes to *topicR *will be consumed to update
>> the
>> >  local cache of each Samza task.
>> >  4. On failure or restarts, each Samza task will read from the
>> >  beginning from *topicR*.
>> >   - Not Use Kafka
>> >  - Each Samza task reads a Snapshot of database and builds its local
>> >  cache, and it then needs to read periodically to update its
>> >local cache. I
>> >  have read about a few blogs, and this doesn't sound a solid way
>> >in the long
>> >  term.
>> >
>> >Any thoughts?
>> >
>> >Chen
>> >
>> >   -
>> >
>> >--
>> >Chen Song
>>
>
>
>
>-- 
>Chen Song


Re:Re:Need help in log4j.xml externalization

2015-10-27 Thread Yan Fang
" When I keep my log4j.xml in //$PWD/deploy/alice/config/log4j.xml and do not 
set any "samza.container.name" then it is not generating any log files for me. "


== this is weird. The run-am/run-container script should automatically create 
the samba.container.name if it is not set...











At 2015-10-26 19:57:52, "Patni, Ankush"  wrote:
>Hi Yang,
>
>Thanks a lot for help. I have couple of doubts and want to clear them :
>
>I am starting  samza task from same machine where it is running.
>My problem is : I am not using StreamAppender as suggested by you in previous 
>reply. But still DailyRollingFileAppender uses the ${samza.container.name}.
>
>When I keep my log4j.xml in //$PWD/deploy/alice/config/log4j.xml  and do not 
>set any "samza.container.name"  then it is not generating any log files for me.
>
>But when I set something export 
>JAVA_OPTS="-Dsamza.container.name=samza-application-master-task1"  then it 
>generate the desired file in given location : =${samza.log.dir}
>
>
>
>Also when my log4j.xml is in my jar then everything run fine, log files are 
>getting generated within corresponding container. So is there any way I can 
>generate the log files in corresponding task folders using external log4j and 
>not giving in my jar? There are more than 7 task I am running from same jar 
>file.
>
>
>
>My log4j.xml looks like :
>
>
>
>
>http://jakarta.apache.org/log4j/;>
>/>
>
>class="org.apache.log4j.DailyRollingFileAppender">
>   value="${samza.log.dir}/${samza.container.name}.log" />
>  
>  
> 
>  
>   
>
>   
>  
>  
>  
>  
> 
>  
>   
>
>   
>  
>  
>  
>  
>   
>
>
>Regards,
>Ankush
>***
>
>This email message and any attachments are intended solely for the use of the 
>addressee. If you are not the intended recipient, you are prohibited from 
>reading, disclosing, reproducing, distributing, disseminating or otherwise 
>using this transmission. If you have received this message in error, please 
>promptly notify the sender by reply email and immediately delete this message 
>from your system. This message and any attachments may contain information 
>that is confidential, privileged or exempt from disclosure. Delivery of this 
>message to any person other than the intended recipient is not intended to 
>waive any right or privilege. Message transmission is not guaranteed to be 
>secure or free of software viruses.
>***


Re:Re: questions of partition and task of Samza

2015-10-26 Thread Yan Fang
Hi Selina,


Your understanding is correct. Yes, you "need to consumer the original input 
and send it back to Kafka and reset the* Key to departmentName *and then 
consume it again 
to count in Samza" if you want to count the number of students in the same 
departmentName. This is a typical aggregation use case. Because after 
aggregating the students in the same department, you can do more than just 
"count". :)


Cheers,
Yan


At 2015-10-25 06:12:50, "Selina Tech" <swucaree...@gmail.com> wrote:
>Hi, Yan:
>
>  Thanks a lot for your reply.
>
>  You mentioned "if you give the msgs the same partition key", which
>mean same partition key value or  same partition key attribute name?
>
>   I mentioned "primary key" as "key" at public
>KeyedMessage(java.lang.String topic, K key, V message) or you can ignore
>it. I explain it in another way below.
>
>   If I need aggregate data, but the data are not in same partition, do
>we need consumer the data, and put it back it to Kafka with with new key
>and then consumer it again and aggregate it in Samza.
>
>  For example,  messages about student GPA information was send to
>Kafka by* K key(String schoolName)*. The message looks like "name,
>schoolName,  departmentName,  grade, GPA", and assuming I have 3
>partitions, With my understanding, all student records in one school should
>go to same partition.
>
>  Right now I need to aggregate data for same department, no matter
>which school.  which mean all the same departmentName message will be in
>three different partition. If I just count it in one samza job, will the
>result correct?  Do I need to consumer the original input and send it back
>to Kafka and reset the* Key to  departmentName *and then consume it again
>to count in Samza?
>
> If I did not understand the partition and task of Samza, would you
>like to correct me?
>
>Sincerely,
>Selina
>
>On Sat, Oct 24, 2015 at 2:45 AM, Yan Fang <yanfangw...@163.com> wrote:
>
>>
>>
>> Hi Selina,
>>
>>
>> what do you mean by "primary key" here? Is it one of the partitions of
>> "input" or something like "if one msg meets condition x, we think msg has
>> the primary key"?
>>
>>
>> If you just want to count the msgs, you can count in one Samza job and
>> send the result to "output" topic. You can send to any partition of the
>> "output" if you give the msgs the same partition key.
>>
>>
>> Thanks,
>> Yan
>>
>>
>>
>>
>>
>>
>>
>> At 2015-10-22 08:30:15, "Selina Tech" <swucaree...@gmail.com> wrote:
>> >Hi, All:
>> >
>> >In the Samza document, it mentioned "Each task consumes data from
>> >one partition for each of the job’s input streams." Does it mean if the
>> >data processing one job is not in one partition, the result will be wrong.
>> >
>> >Assuming my Samza input data on Kafka topic -- "input" is
>> >partitioned by default -- round robin. And I have five partitions. If my
>> >Samza job is to count messages by primary key of the message at "input"
>> >topic, and then output it to kafka topic -- "output".
>> >
>> >   So I need steps as below
>> >  1. read data from Kafka topic "input"
>> >  2. reset the partition key to "primary key" in Samza
>> >  3. produce it back to Kafka topic named as "temp"
>> >  4. read "temp" topic at Samza
>> >  5. count it in Samza
>> >  6. write it to Kafka topic named as "output"
>> >
>> >  If I just read data from Kafka topic "input" and count it in Samza
>> >and write it to topic "output". The result will not be correct because
>> there
>> >might have multiple messages for same "primary key" in "output" topic.  Do
>> >I understand it correctly?
>> >
>> >Sincerely,
>> >Selina
>>


Re:Re:Need help in log4j.xml externalization

2015-10-26 Thread Yan Fang
Hi Ankush,


In current situation, if you run the Yarn client and the Yarn application in 
different machines, the logs will not be mixed. Otherwise, there is no other 
ways, because they share the same environment variable, if the environment 
variable is set, they both will use it. You may want to open a JIRA for this 
request.


But IMHO, to get the logs for individual task, it is better to package them in 
different jar/tars. That maybe easier to manager -- if one task is 
down/changed, it does not affect other tasks. (assuming what you mean "task" is 
a "job", not the "task" in the context of Samza).


Thanks,
Yan


At 2015-10-26 21:00:35, "Patni, Ankush"  wrote:
>
>Hi Yan,
>
>Small doubt :
>
>Is it possible to set the log4j.xml in somewhere so that it will not get mix 
>with yarn client and will be available for yarn application only. With this I 
>will be able to get the logs for my individual task in their respective 
>container.
>
>Ankush
>
>***
>
>This email message and any attachments are intended solely for the use of the 
>addressee. If you are not the intended recipient, you are prohibited from 
>reading, disclosing, reproducing, distributing, disseminating or otherwise 
>using this transmission. If you have received this message in error, please 
>promptly notify the sender by reply email and immediately delete this message 
>from your system. This message and any attachments may contain information 
>that is confidential, privileged or exempt from disclosure. Delivery of this 
>message to any person other than the intended recipient is not intended to 
>waive any right or privilege. Message transmission is not guaranteed to be 
>secure or free of software viruses.
>***


Re:Need help in log4j.xml externalization

2015-10-24 Thread Yan Fang


Hi Ankush,


1. why we need to give first of all 
-Dsamza.container.name=samza-application-master as it is already present in 
run-am.sh


I think there is a confusion here. When you set export 
JAVA_OPTS="-Dlog4j.configuration=file://$PWD/deploy/alice/config/log4j.xml ", 
you only set it in the machine which you submit the job, not the machines that 
you run the job. 


To be more clear:


1. there are two parts of one Samza job: Yarn client and Yarn Application. Yarn 
client by default uses the log4j-console.xml, which can not use any 
StreamAppender. See here 
https://github.com/apache/samza/blob/master/samza-shell/src/main/bash/run-job.sh
 . Yarn Application uses the log4j.xml file that you want to externalize. See 
here 
https://github.com/apache/samza/blob/master/samza-shell/src/main/bash/run-am.sh 
.


2. If you use different machine to submit the Samza job and run the Samza job, 
the solution is simple: export 
JAVA_OPTS="-Dlog4j.configuration=file://$PWD/deploy/alice/config/log4j.xml " in 
all the machines which will run the Samza job.


3. If you use one of the machines for running Samza to submit the Samza job, 
there maybe a bug. Because run-job.sh and run-am.sh scripts can not 
differenciate which log4j.xml to use. So when you submit the job, it will use 
the samza log4j.xml, which may have the StreamAppender. You can open a JIRA for 
this. It is an easy fix. If you do not use the StreamAppender, there should be 
no issue.


Hope this late reply still helps.


Thanks,
Yan







At 2015-10-21 21:38:34, "Patni, Ankush"  wrote:
>Hello Team,
>
>I am trying to externalize log4j from my task.
>
>So at present I run all the task from one tar.gz. And inside that I have 
>log4j.xml.
>
>But now I want to externalize the log4j.xml so that I can have more control 
>over logs. So before running my task I tried to set the JAVA_OPTS:
>
>
>export 
>JAVA_OPTS="-Dlog4j.configuration=file://$PWD/deploy/alice/config/log4j.xml "
>
>
>
>And I get the following error when I run my task :
>
>
>
>Like : deploy/alice/bin/run-job.sh 
>--config-factory=org.apache.samza.config.factories.PropertiesConfigFactory 
>--config-path=file://$PWD/deploy/alice/config/EntryTask.properties
>
>
>
>log4j:ERROR Could not create an Appender. Reported error follows.
>
>org.apache.samza.SamzaException: Got null container name from system property: 
>samza.container.name. This is used as the key for the log appender, so can't 
>proceed.
>
>at 
> org.apache.samza.logging.log4j.StreamAppender.activateOptions(StreamAppender.java:89)
>
>at 
> org.apache.log4j.config.PropertySetter.activate(PropertySetter.java:307)
>
>at 
> org.apache.log4j.xml.DOMConfigurator.parseAppender(DOMConfigurator.java:295)
>
>at 
> org.apache.log4j.xml.DOMConfigurator.findAppenderByName(DOMConfigurator.java:176)
>
>
>
>Then I tried with following command :
>
>
>export 
>JAVA_OPTS="-Dlog4j.configuration=file://$PWD/deploy/alice/config/log4j.xml 
>-Dsamza.container.name=samza-application-master"
>
>
>log4j:ERROR Could not create an Appender. Reported error follows.
>java.lang.NullPointerException
>at java.io.StringReader.(StringReader.java:50)
>at 
> org.codehaus.jackson.JsonFactory.createJsonParser(JsonFactory.java:636)
>at 
> org.codehaus.jackson.map.ObjectMapper.readValue(ObjectMapper.java:1863)
>at 
> org.apache.samza.logging.log4j.StreamAppender.getConfig(StreamAppender.java:180)
>at 
> org.apache.samza.logging.log4j.StreamAppender.activateOptions(StreamAppender.java:92)
>at 
> org.apache.log4j.config.PropertySetter.activate(PropertySetter.java:307)
>at 
> org.apache.log4j.xml.DOMConfigurator.parseAppender(DOMConfigurator.java:295)
>
>
>
>So my doubt is : why we need to give first of all 
>-Dsamza.container.name=samza-application-master as it is already present in 
>run-am.sh
>
>Second thing could anyone please help me in externilaztion of log4j.xml.
>
>  Regards,
>Ankush
>***
>
>This email message and any attachments are intended solely for the use of the 
>addressee. If you are not the intended recipient, you are prohibited from 
>reading, disclosing, reproducing, distributing, disseminating or otherwise 
>using this transmission. If you have received this message in error, please 
>promptly notify the sender by reply email and immediately delete this message 
>from your system. This message and any attachments may contain information 
>that is confidential, privileged or exempt from disclosure. Delivery of this 
>message to any person other than the intended recipient is not intended to 
>waive any right or privilege. Message transmission is not guaranteed to be 
>secure or free of software viruses.
>***


Re:questions of partition and task of Samza

2015-10-24 Thread Yan Fang


Hi Selina,


what do you mean by "primary key" here? Is it one of the partitions of "input" 
or something like "if one msg meets condition x, we think msg has the primary 
key"?


If you just want to count the msgs, you can count in one Samza job and send the 
result to "output" topic. You can send to any partition of the "output" if you 
give the msgs the same partition key.


Thanks,
Yan







At 2015-10-22 08:30:15, "Selina Tech"  wrote:
>Hi, All:
>
>In the Samza document, it mentioned "Each task consumes data from
>one partition for each of the job’s input streams." Does it mean if the
>data processing one job is not in one partition, the result will be wrong.
>
>Assuming my Samza input data on Kafka topic -- "input" is
>partitioned by default -- round robin. And I have five partitions. If my
>Samza job is to count messages by primary key of the message at "input"
>topic, and then output it to kafka topic -- "output".
>
>   So I need steps as below
>  1. read data from Kafka topic "input"
>  2. reset the partition key to "primary key" in Samza
>  3. produce it back to Kafka topic named as "temp"
>  4. read "temp" topic at Samza
>  5. count it in Samza
>  6. write it to Kafka topic named as "output"
>
>  If I just read data from Kafka topic "input" and count it in Samza
>and write it to topic "output". The result will not be correct because there
>might have multiple messages for same "primary key" in "output" topic.  Do
>I understand it correctly?
>
>Sincerely,
>Selina


Re: Asynchronous approach and samza

2015-09-20 Thread Yan Fang
Hi Michael,

Samza is designed for high-throughput and realtime processing. If you are
using HTTP request/external service, you may not retrieve the same
performance as not using it. However, technically speaking, there is
nothing blocking you to do this, (well, discouraged anyway :). Samza by
default does not provide this feature. So you maybe a little cautious when
implementing this.

Thanks,

Fang, Yan
yanfang...@gmail.com

On Sun, Sep 20, 2015 at 4:28 PM, Michael Sklyar  wrote:

> Hi,
>
> What would be the best approach for doing "blocking" operations in Samza?
>
> For example, we have a kafka stream of urls for which we need to gather
> external data via HTTP (such as alexa rank, get the page title and
> headers..). Other scenarios include database access and decision making via
> a rule engine.
>
> Samza processes messages in a singe thread, HTTP requests might take
> hundreds of miliseconds. With the single threaded design the throughput
> would be very limited, which can be solved with an asynchronous approach.
> However Samza documentation explicitely states
> "*You are strongly discouraged from using threads in your job’s code*".
>
> It seems that Samza design suits very well "data transformation" scenarios,
> what is not clear is how well can it support external services?
>
> Thanks,
> Michael Sklyar
>


Re: Review Request 38049: SAMZA-769 Replace deprecated method call and fix warnings

2015-09-16 Thread Yan Fang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/38049/#review99189
---



samza-autoscaling/src/main/java/org/apache/samza/autoscaling/deployer/ConfigManager.java
 (line 157)
<https://reviews.apache.org/r/38049/#comment156100>

since this is from the http.close(), just try/catch the httpclient.close().

also, not use the printStackTrace(), use log.error(message, e)


- Yan Fang


On Sept. 2, 2015, 11:45 a.m., Aleksandar Bircakovic wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/38049/
> ---
> 
> (Updated Sept. 2, 2015, 11:45 a.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> Replaced deprecated method call and suppressed some warnings.
> 
> 
> Diffs
> -
> 
>   samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java bc926c5 
>   
> samza-autoscaling/src/main/java/org/apache/samza/autoscaling/deployer/ConfigManager.java
>  7089796 
>   
> samza-autoscaling/src/main/java/org/apache/samza/autoscaling/utils/YarnUtil.java
>  b2d37a7 
>   samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java 
> c564964 
>   
> samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamWriter.java
>  4eaaec2 
>   
> samza-log4j/src/main/java/org/apache/samza/logging/log4j/serializers/LoggingEventJsonSerde.java
>  a18d8e0 
> 
> Diff: https://reviews.apache.org/r/38049/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Aleksandar Bircakovic
> 
>



Re: Runtime Execution Model

2015-09-16 Thread Yan Fang
-- Hi Lukas,

I want to learn more from your production environment. How do you use
ProcessJobFactory
in Docker containers? Do you use one ProcessJobFactory process all the
tasks, or spawn out as many threads as the task number? How is the
fault-tolerance?


-- Hi Yi,

* Any progress in your side, in terms of the standalone job? (Chris' patch
is big, :)

*  Invert the JobCoordinator to the standalone Samza process s.t. the leader
process of the Samza job becomes the JobCoordinator
Currently, we run the JobCoordinator first, and then Yarn talks to
the JobCoordinator. Isn't it enough so far?

*  Make the partition assignment as pluggable model to distribute the tasks to
all Samza processes in a job-group in coordination.
   I think the reason for this is for the Kafka's new feature.The API
design needs to be compatible with Kafka.

*  Make Samza process multi-threaded while maintaining the per-task
single-threaded
programming model for the users
   Do we already have this, or need to add that? This I think can be
done in current ProcessJob. We can have the same number of threads as the
tasks.

Thanks,

Fang, Yan
yanfang...@gmail.com

On Tue, Sep 15, 2015 at 10:54 AM, Yi Pan <nickpa...@gmail.com> wrote:

> Hi, all,
>
> Thanks for pitching in for the improvement plan. We have actually discussed
> this for a while now. In a complete view, I think that there are the
> following issues need to be addressed:
> 1) Currently, the steps involved to launch a Samza process are too complex
> and intertwined with YARN.
> 2) The Samza partition assignment is embedded within YARN AppMaster
> implementation, which makes it difficult to run the job outside YARN
> environment
>
> We have actually already started some work to address the above issues:
> 1) SAMZA-516: support standalone Samza jobs. Chris has started this work
> and has a proto-type patch available. This allows a ZK-based coordination
> to start standalone Samza processes w/o YARN
>
> There are also planned changes to allow de-coupling of Samza job
> coordination logic from YARN AppMaster:
> 1) SAMZA-680 Invert the JobCoordinator and AM logic. This would allow us to
> keep the Samza-specific JobCoordinator logic independent from
> cluster-management systems.
>
> There is one more thing I am thinking: we may want to make the partition
> assignment logic as a pluggable module, such that we can choose different
> coordination mechanism in partition assignment as needed (e.g. ZK-based,
> cluster-management based, or Kafka-based coordination).
>
>
> Ultimately, I think that we should try to refactor the current job
> launching model to the following:
> 1) Make standalone Samza process the standard Samza process model
> 2) Invert the JobCoordinator to the standalone Samza process s.t. the
> leader process of the Samza job becomes the JobCoordinator
> 3) Make the partition assignment as pluggable model to distribute the tasks
> to all Samza processes in a job-group in coordination
> 4) Make launching of Samza process agnostic of cluster-management systems.
> The cluster-management systems will simply provide the functionality of
> placing the standard Samza processes to actual available nodes
> 5) Make Samza process multi-threaded while maintaining the per-task
> single-threaded programming model for the users.
>
> Thoughts?
>
> -Yi
>
>
>
>
>
> On Tue, Sep 15, 2015 at 9:50 AM, Hannes Stockner <
> hannes.stock...@gmail.com>
> wrote:
>
> > +1
> >
> >
> > On Tue, Sep 15, 2015 at 5:43 PM, Bruno Bonacci <bruno.bona...@gmail.com>
> > wrote:
> >
> > > Hi,
> > >
> > > I support what Lukas saying. Samza packaging requirements are not
> > friendly,
> > > I use the ThreadJobFactory for the same reason.
> > >
> > > Bruno
> > >
> > > On Tue, Sep 15, 2015 at 5:39 PM, Lukas Steiblys <lu...@doubledutch.me>
> > > wrote:
> > >
> > > > Hi Yan,
> > > >
> > > > We use Samza in a production environment using ProcessJobFactory in
> > > Docker
> > > > containers because it greatly simplifies our deployment process and
> > makes
> > > > much better use of resources.
> > > >
> > > > Is there any plan to make the ThreadJobFactory or ProcessJobFactory
> > > > multithreaded? I will look into doing that myself, but I think it
> might
> > > be
> > > > useful to implement this for everyone. I am sure there are plenty of
> > > cases
> > > > where people do not want to use YARN, but want more parallelism in
> > their
> > > > tasks.
> > > >
> > > > Lukas
> > > >
> &

Re: Review Request 38296: SAMZA-341: Support metrics report via Ganglia

2015-09-14 Thread Yan Fang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/38296/#review98895
---



build.gradle (line 219)
<https://reviews.apache.org/r/38296/#comment155670>

if we only use it as isEmpty, do not think worth doing this.



docs/learn/documentation/versioned/container/metrics.md (line 25)
<https://reviews.apache.org/r/38296/#comment155586>

as mentioned in SAMZA-340's review, can we move this part to the end of 
this doc? Because Kafka's way is a little more recommeded. (no bias to Ganglia. 
:)



docs/learn/documentation/versioned/jobs/configuration-table.html (line 1387)
<https://reviews.apache.org/r/38296/#comment155652>

is this the default port for Ganglia or Graphite?



samza-ganglia/src/main/java/org/apache/samza/metrics/reporter/SamzaGangliaReporter.java
 (line 41)
<https://reviews.apache.org/r/38296/#comment155655>

add java doc



samza-ganglia/src/main/java/org/apache/samza/metrics/reporter/SamzaGangliaReporter.java
 (line 44)
<https://reviews.apache.org/r/38296/#comment155656>

should be SamzaGangliaReporter



samza-ganglia/src/main/java/org/apache/samza/metrics/reporter/SamzaGangliaReporter.java
 (line 62)
<https://reviews.apache.org/r/38296/#comment155665>

1. if there is only one port name, should it be UNICAST?
Maybe it is better to use getModeForAddress to determin it.

2. why is the ttl "1"? do we make it configurable?



samza-ganglia/src/main/java/org/apache/samza/metrics/reporter/SamzaGangliaReporter.java
 (line 66)
<https://reviews.apache.org/r/38296/#comment155667>

maybe throw the Samza Exception to kill the job if the Ganglia is not 
available?



samza-ganglia/src/main/java/org/apache/samza/metrics/reporter/SamzaGangliaReporter.java
 (line 72)
<https://reviews.apache.org/r/38296/#comment155671>

wrap it with SamzaException



samza-ganglia/src/main/java/org/apache/samza/metrics/reporter/SamzaGangliaReporter.java
 (line 101)
<https://reviews.apache.org/r/38296/#comment155673>

change to error level



samza-ganglia/src/main/java/org/apache/samza/metrics/reporter/SamzaGangliaReporter.java
 (line 112)
<https://reviews.apache.org/r/38296/#comment155674>

same, error level



samza-ganglia/src/main/java/org/apache/samza/metrics/reporter/SamzaGangliaReporter.java
 (line 123)
<https://reviews.apache.org/r/38296/#comment155675>

error level



samza-ganglia/src/main/java/org/apache/samza/metrics/reporter/SamzaGangliaReporter.java
 (line 143)
<https://reviews.apache.org/r/38296/#comment155676>

error



samza-ganglia/src/main/java/org/apache/samza/metrics/reporter/SamzaGangliaReporter.java
 (line 154)
<https://reviews.apache.org/r/38296/#comment155677>

error



samza-ganglia/src/main/java/org/apache/samza/metrics/reporter/SamzaGangliaReporter.java
 (line 165)
<https://reviews.apache.org/r/38296/#comment155678>

error



samza-ganglia/src/test/java/org/apache/samza/metrics/reporter/GangliaCounterTest.java
 (line 31)
<https://reviews.apache.org/r/38296/#comment155699>

    when the test are not run in the order shown in the class, the asserts will 
be incorrect.


- Yan Fang


On Sept. 11, 2015, 1:22 p.m., Aleksandar Pejakovic wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/38296/
> ---
> 
> (Updated Sept. 11, 2015, 1:22 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> Added new moduo for Ganglia support.
> 
> Implemented come is based on 
> [SAMZA-340](https://issues.apache.org/jira/browse/SAMZA-340)
> 
> 
> Diffs
> -
> 
>   build.gradle 3a7fabc 
>   checkstyle/import-control.xml bc07ae8 
>   docs/learn/documentation/versioned/container/metrics.md 11a62f9 
>   docs/learn/documentation/versioned/jobs/configuration-table.html c23d7d3 
>   gradle/dependency-versions.gradle 36d564b 
>   
> samza-ganglia/src/main/java/org/apache/samza/metrics/reporter/GangliaCounter.java
>  PRE-CREATION 
>   
> samza-ganglia/src/main/java/org/apache/samza/metrics/reporter/GangliaGauge.java
>  PRE-CREATION 
>   
> samza-ganglia/src/main/java/org/apache/samza/metrics/reporter/GangliaReporterFactory.java
>  PRE-CREATION 
>   
> samza-ganglia/src/main/java/org/apache/samza/metrics/reporter/GangliaSnapshot.java
>  PRE-CREATION 
>   
> samza-ganglia/src/main/java/org/apache/samza/metrics/reporter/GangliaTimer.java
>  PRE-CREATION 
>   
> samza-ganglia/src/main/java/org/apache/samza/metrics/reporter/SamzaGangliaReporter.java
>  PR

Re: Runtime Execution Model

2015-09-14 Thread Yan Fang
Hi Bruno,

AFAIK, there is no existing JobFactory that brings as many threads as the
partition number. But I think nothing stops you to implement this: you can
get the partition information from the JobCoordinator, and then bring as
many threads as the partition/task number.

Since the two local factories (ThreadJobFactory and ProcessJobFactory) are
mainly for development, there is no additional document. But most of the
code here

is
self-explained.

Thanks,

Fang, Yan
yanfang...@gmail.com

On Sat, Sep 12, 2015 at 1:47 PM, Bruno Bonacci 
wrote:

> Hi,
> I'm looking for additional documentation on the different RUNTIME
> EXECUTION MODELS of the different `job.factory.class`.
>
> I'm particularly interested on how each factory (ThreadJobFactory,
> ProcessJobFactory and YarnJobFactory) will create tasks consume and process
> messages out of Kafka and the thread model used.
>
> I did a few tests with the ThreadJob factory consuming out of a kafka
> topic with 5 partitions and I was expecting that it would use multiple
> threads to consume/process the different partitions, however it is
> using only one thread at runtime.
>
> Is there any way to tell Samza to use multiple processing threads (1 per
> partition)??
>
>
> Thanks
> Bruno
>


Re: not able to submit samza job to resource manager

2015-09-04 Thread Yan Fang
Hi Raja,

For the log, even though it does not present in the URL, you may still be
able to see the logs by using *yarn logs -applicationId * (or
go to the logs directory)

If you are using the 0.9.0 samza, I am afraid this version may not work
with the secured cluster. If you are using the master branch (compile it by
yourself), can you try to apply the patch
https://issues.apache.org/jira/browse/SAMZA-727, if it works, we can commit
this patch?

Thanks,

Fang, Yan
yanfang...@gmail.com

On Fri, Sep 4, 2015 at 11:23 AM, Raja.Aravapalli <raja.aravapa...@target.com
> wrote:

> Hi Yan,
>
> I couldn't see application present in the ResourceManager / JHS for the
> application id. And as you can see in my below email, tracking url is also
> showing as N/A.
>
> Only one thing that is working is: "yarn application -status "
>
> 
>  Application Report :
>  Application-Id : 
>  Application-Name : samza-testing_1
>  Application-Type : Samza
>  User : 
>  Queue : default
>  Start-Time : 1441387039186
>  Finish-Time : 1441387039210
>  Progress : 0%
>  State : FAILED
>  Final-State : FAILED
>  Tracking-URL : N/A
>  RPC Port : -1
>  AM Host : N/A
>  Aggregate Resource Allocation : 0 MB-seconds, 0 vcore-seconds
>  Diagnostics : User:  is not
> allowed to impersonate 
>  
>
>
> Our cluster is secured with Kerberos. Any specific setting I should set
> when working in secure cluster ??
>
>
>
> Regards,
> Raja Mahesh Aravapalli.
>
> -Original Message-
> From: Yan Fang [mailto:yanfang...@gmail.com]
> Sent: Friday, September 04, 2015 11:48 PM
> To: dev@samza.apache.org
> Subject: Re: not able to submit samza job to resource manager
>
> Hi Raja,
>
> Can you check the yarn's log? This gives us more information to see what
> is the problem.
>
> Thanks,
>
> Fang, Yan
> yanfang...@gmail.com
>
> On Fri, Sep 4, 2015 at 10:23 AM, Raja.Aravapalli <
> raja.aravapa...@target.com
> > wrote:
>
> > Hi
> >
> > Please help me identify the problem:
> >
> > I am not able to submit the samza job to yarn RM. Found this when I
> > ran the "yarn application -status "
> >
> > 
> > Application Report :
> > Application-Id : 
> > Application-Name : samza-testing_1
> > Application-Type : Samza
> > User : 
> > Queue : default
> > Start-Time : 1441387039186
> > Finish-Time : 1441387039210
> > Progress : 0%
> > State : FAILED
> > Final-State : FAILED
> > Tracking-URL : N/A
> > RPC Port : -1
> > AM Host : N/A
> > Aggregate Resource Allocation : 0 MB-seconds, 0 vcore-seconds
> > Diagnostics : User:  is not
> > allowed to impersonate 
> > 
> >
> > Please help me identify, what's stopping me submitting in to cluster ?
> >
> > Thank you.
> >
> >
> > Regards,
> > Raja Mahesh Aravapalli.
> >
> >
>


Re: samza-hello-samza build cannot find samza 0.10.0-SNAPSHOT artifacts on maven

2015-08-28 Thread Yan Fang
run ./gradlew publishToMavenLocal ?

Fang, Yan
yanfang...@gmail.com

On Fri, Aug 28, 2015 at 10:06 AM, Yi Pan nickpa...@gmail.com wrote:

 Hi, all,

 I tried to build samza-hello-samza from latestt branch today and it failed
 due to the missing artifacts w/ 0.10.0-SNAPSHOT version on maven. The
 README.md file does not mention how to access the *-SNAPSHOT version of
 artifacts either. I am curious how the build for samza-hello-samza on
 latest branch work?

 @Yan, could you help to share your points here?

 Thanks!

 -Yi



Re: Why do I always receive waring email?

2015-08-26 Thread Yan Fang
Thanks. I filed a JIRA https://issues.apache.org/jira/browse/INFRA-10207 .
Waiting for any feedback.

Fang, Yan
yanfang...@gmail.com

On Wed, Aug 26, 2015 at 3:33 AM, Dan danharve...@gmail.com wrote:

 I also get this too now and again.

 I think it must be an apache e-mail issue, can we get someone from their
 infrastructure team to have a look?

  - Dan


 On 26 August 2015 at 06:02, Yan Fang yanfang...@gmail.com wrote:

  I actually do not know... I receive it frequently as well... But since I
  still can get emails, ignoring it
 
  Fang, Yan
  yanfang...@gmail.com
 
  On Tue, Aug 25, 2015 at 1:59 PM, Selina Tech swucaree...@gmail.com
  wrote:
 
   Dear all:
  
I received a few warning email as below. Does anyone know how
  should I
   avoid those waring email?
  
   Sincerely,
   Selina
  
   - -- - - - - - -
   Hi! This is the ezmlm program. I'm managing the
   dev@samza.apache.org mailing list.
  
   I'm working for my owner, who can be reached
   at dev-ow...@samza.apache.org.
  
  
   Messages to you from the dev mailing list seem to
   have been bouncing. I've attached a copy of the first bounce
   message I received.
  
   If this message bounces too, I will send you a probe. If the probe
  bounces,
   I will remove your address from the dev mailing list,
   without further notice . . . . . .
  
- - -- - - - - -
  
 



Re: [Discuss/Vote] upgrade to Yarn 2.6.0

2015-08-24 Thread Yan Fang
Hi Roger,

If you have plan to upgrade to 2.6.0, and no other companies are using
2.4.0, I think we can upgrade to 2.6.0 yarn in 0.10.0.

Thanks,

Fang, Yan
yanfang...@gmail.com

On Thu, Aug 20, 2015 at 4:48 PM, Yi Pan nickpa...@gmail.com wrote:

 Hi, Selina,

 Samza 0.9.1 on YARN 2.6 is the proved working solution.

 Best,

 -Yi

 On Thu, Aug 20, 2015 at 12:28 PM, Selina Tech swucaree...@gmail.com
 wrote:

  Hi, Yi:
   If I use Samza0.9.1 and Yarn2.6.0, Will the system be failed?
 
  Sincerely,
  Selina
 
  On Wed, Aug 19, 2015 at 1:58 PM, Yi Pan nickpa...@gmail.com wrote:
 
   Hi, Roger,
  
   In LinkedIn we have already moved to YARN 2.6 and is moving to YARN 2.7
   now. I am not aware of any major issues in upgrading. I will let our
 team
   member Jon Bringhurst to chime in since he did all the upgrade and may
  have
   more insights.
  
   @Jon, could you help to comment on this?
  
   Thanks!
  
   -Yi
  
   On Wed, Aug 19, 2015 at 9:12 AM, Roger Hoover roger.hoo...@gmail.com
   wrote:
  
We're using 2.4.0 in production.  Are there any major
 incompatibilities
   to
watch out for when upgrading to 2.6.0?
   
Thanks,
   
Roger
   
On Mon, Aug 17, 2015 at 4:41 PM, Yan Fang yanfang...@gmail.com
  wrote:
   
 Hi guys,

 we have been discussing upgrading to Yarn 2.6.0 (SAMZA-536
 https://issues.apache.org/jira/browse/SAMZA-536), because there
  are
some
 bug fixes after 2.4.0 and we can not enable the Yarn RM recovering
feature
 in Yarn 2.4.0 (SAMZA-750 
https://issues.apache.org/jira/browse/SAMZA-750
 )
 .

 So we just want to make sure if any production users are still
 using
   Yarn
 2.4.0 and do not plan to upgrade to 2.6.0+?

 If not further concern, I think we can go and upgrade to Yarn 2.6.0
  in
 Samza 0.10.0 release.

 Thanks,

 Fang, Yan
 yanfang...@gmail.com

   
  
 



Re: SAMZA build failing!!!

2015-08-24 Thread Yan Fang
Hi Raja,

Do you only run samza-core or the whole samza project? I downloaded the
samza from master branch and run ./gradlew clean build. There is no error.
Could you give a little more information how you get this error?

Thanks,

Fang, Yan
yanfang...@gmail.com

On Mon, Aug 24, 2015 at 9:54 AM, Raja.Aravapalli raja.aravapa...@target.com
 wrote:

 Hi,


 I was n't able to build SAMZA to execute the Samza jobs.


 Receiving below exception while executing samza-core_2.10.

 I checkedout the master branch from https://github.com/apache/samza.git
 and trying to build!!

 

 * What went wrong:
 Execution failed for task ':samza-core_2.10:test'.


 :samza-core_2.10:processTestResources
 :samza-core_2.10:testClasses
 :samza-core_2.10:checkstyleTest
 :samza-core_2.10:test

 testCanReadPropertiesConfigFiles FAILED
 java.lang.IllegalArgumentException: Illegal character in authority at
 index 7: file://samza1\samza-core/src/test/resources/test.properties
 at java.net.URI.create(URI.java:859)
 at
 org.apache.samza.config.factories.TestPropertiesConfigFactory.testCanReadPropertiesConfigFiles(TestPropertiesConfigFactory.scala:34)

 Caused by:
 java.net.URISyntaxException: Illegal character in authority at
 index 7: file://samza1\samza-core/src/test/resources/test.properties
 at java.net.URI$Parser.fail(URI.java:2829)
 at java.net.URI$Parser.parseAuthority(URI.java:3167)
 at java.net.URI$Parser.parseHierarchical(URI.java:3078)
 at java.net.URI$Parser.parse(URI.java:3034)
 at java.net.URI.init(URI.java:595)
 at java.net.URI.create(URI.java:857)
 ... 1 more


 


 Can someone please help me fix this. Thank you.


 Regards,
 Raja Mahesh Aravapalli.




Re: Review Request 37604: SAMZA-760 Samza Container should catch Throwables instead of just catching Exceptions

2015-08-24 Thread Yan Fang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/37604/#review96145
---


Overall, LGTM. Could you also add a unit test to verify this? Thank you.


samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala (line 
581)
https://reviews.apache.org/r/37604/#comment151430

throwable, not exception in the log msg


- Yan Fang


On Aug. 19, 2015, 8:14 a.m., Aleksandar Bircakovic wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/37604/
 ---
 
 (Updated Aug. 19, 2015, 8:14 a.m.)
 
 
 Review request for samza.
 
 
 Repository: samza
 
 
 Description
 ---
 
 Added a catch for Throwables in Samza container. Catching Throwables can 
 cause problems in specific situations so I also added a partial function 
 'safely' that should take care of that specific situations.
 
 
 Diffs
 -
 
   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
 85b012b 
 
 Diff: https://reviews.apache.org/r/37604/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Aleksandar Bircakovic
 




Re: Review Request 37536: SAMZA-710 Update WebServlet and RestServlet to read coordinatorStream information

2015-08-20 Thread Yan Fang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/37536/#review95965
---


can you also tested this patch? It does not start the LatestConfigManager 
anywhere, this may cause problem. Thank you.


samza-core/src/main/java/org/apache/samza/container/ConfigManager.java (line 29)
https://reviews.apache.org/r/37536/#comment151161

How about renaming to LatestConfigManager? Then it is more specific and 
does not confuse with other config related classes.



samza-core/src/main/java/org/apache/samza/container/ConfigManager.java (line 33)
https://reviews.apache.org/r/37536/#comment151159

the source should be Job-coordinator



samza-core/src/main/java/org/apache/samza/container/ConfigManager.java (line 42)
https://reviews.apache.org/r/37536/#comment151160

copy/paste error . :)



samza-core/src/main/java/org/apache/samza/coordinator/stream/AbstractCoordinatorStreamManager.java
 (line 100)
https://reviews.apache.org/r/37536/#comment151157

if there is no other manager uses this method, maybe we can consider 
putting it into the LatestConfigManager?



samza-core/src/test/scala/org/apache/samza/config/TestConfigManager.scala (line 
34)
https://reviews.apache.org/r/37536/#comment151155

wrong name.



samza-core/src/test/scala/org/apache/samza/config/TestConfigManager.scala (line 
41)
https://reviews.apache.org/r/37536/#comment151156

can this be a little more concrete? The thing we want to test is that, 
configManager gets the current config, then we update the config, then we 
guarantee the configManager gets the latest config?


- Yan Fang


On Aug. 20, 2015, 9:19 a.m., Aleksandar Bircakovic wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/37536/
 ---
 
 (Updated Aug. 20, 2015, 9:19 a.m.)
 
 
 Review request for samza.
 
 
 Repository: samza
 
 
 Description
 ---
 
 WebServlet and RestServlet now read information from coordinator stream 
 consumer and get new config.
 
 
 Diffs
 -
 
   checkstyle/import-control.xml aaa235a 
   samza-core/src/main/java/org/apache/samza/container/ConfigManager.java 
 PRE-CREATION 
   
 samza-core/src/main/java/org/apache/samza/coordinator/stream/AbstractCoordinatorStreamManager.java
  ca97ce8 
   samza-core/src/main/java/org/apache/samza/job/model/JobModel.java 7b59274 
   samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala 
 a926ce6 
   samza-core/src/test/scala/org/apache/samza/config/TestConfigManager.scala 
 PRE-CREATION 
   
 samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala
  09f4dc3 
   
 samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterWebServlet.scala
  7fd5122 
 
 Diff: https://reviews.apache.org/r/37536/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Aleksandar Bircakovic
 




Re: Review Request 34974: SAMZA-676: implement broadcast stream

2015-08-20 Thread Yan Fang
---


Thanks,

Yan Fang



Re: Review Request 37642: SAMZA-695 Update the StreamAppender doc

2015-08-20 Thread Yan Fang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/37642/#review95964
---



docs/learn/documentation/versioned/jobs/logging.md (line 100)
https://reviews.apache.org/r/37642/#comment151153

can it be something like and change name of log stream with param 
'StreamName'. Because this is not the StreamAppender's name.



docs/learn/documentation/versioned/jobs/logging.md (line 104)
https://reviews.apache.org/r/37642/#comment151154

how about adding a comment  !-- optional -- here?


- Yan Fang


On Aug. 20, 2015, 9:25 a.m., Aleksandar Pejakovic wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/37642/
 ---
 
 (Updated Aug. 20, 2015, 9:25 a.m.)
 
 
 Review request for samza.
 
 
 Repository: samza
 
 
 Description
 ---
 
 Added requested param to logging.md
 
 
 Diffs
 -
 
   docs/learn/documentation/versioned/jobs/logging.md d1b372c 
 
 Diff: https://reviews.apache.org/r/37642/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Aleksandar Pejakovic
 




[DISCUSS] KIP-28 - Add a transform client for data processing

2015-08-19 Thread Yan Fang
Hi Guozhang,

Thank you for writing the KIP-28 up. (Hope this is the right thread for me to 
post some comments. :) 

I still have some confusing about the implementation of the Processor:

1. why do we maintain a separate consumer and producer for each worker thread?
— from my understanding, the new consumer api will be able to fetch certain 
topic-partition. Is one consumer enough for one Kafka.process (it is shared 
among work threads)? The same thing for the producer, is one producer enough 
for sending out messages to the brokers? Will this have better performance?

2. how is the “Stream Synchronization” achieved?
— you talked about “pause” and “notify” the consumer. Still not very clear. 
If worker thread has group_1 {topicA-0, topicB-0} and group_2 {topicA-1, 
topicB-1}, and topicB is much slower. How can we pause the consumer to sync 
topicA and topicB if there is only one consumer?

3. how does the partition timestamp monotonically increase?
— “When the lowest timestamp corresponding record gets processed by the 
thread, the partition time possibly gets advanced.” How does the “gets 
advanced” work? Do we get another “lowest message timestamp value”? But doing 
this, may not get an “advanced” timestamp.

4. thoughts about the local state management.
— from the description, I think there is one kv store per partition-group. 
That means if one work thread is assigned more than one partition groups, it 
will have more than one kv-store connections. How can we avoid mis-operation? 
Because one partition group can easily write to another partition group’s kv 
store (they are in the same thread). 

5. do we plan to implement the throttle ?
— since we are “forwarding” the messages. It is very possible that, 
upstream-processor is much faster than the downstream-processor, how do we plan 
to deal with this?

6. how does the parallelism work?
— we achieve this by simply adding more threads? Or we plan to have the 
mechanism which can deploy different threads to different machines? It is easy 
to image that we can deploy different processors to different machines, then 
how about the work threads? Then how is the fault-tolerance? Maybe this is 
out-of-scope of the KIP?

Two nits in the KIP-28 doc:

1. miss the “close” method interfaceProcessorK1,V1,K2,V2. We have the 
“override close()” in KafkaProcessor.

2. “punctuate” does not accept “parameter”, while StatefulProcessJob has a 
punctuate method that accepts parameter.

Thanks,
Yan

Re: KIP-28 kafka processor

2015-08-18 Thread Yan Fang
Thanks, Chris and Jay.

So do we add comments in this thread? Seems I can not leave comments in the
confluence. :)

Thanks,

Fang, Yan
yanfang...@gmail.com

On Mon, Aug 17, 2015 at 11:05 PM, Yi Pan nickpa...@gmail.com wrote:

 Hi, Chris and Jay,

 Thanks for the reminder. I plan to follow up this week.

 Cheers!

 -Yi

 On Sun, Aug 16, 2015 at 12:27 PM, Jay Kreps jay.kr...@gmail.com wrote:

  +1 Any feedback would be appreciated!
 
  -Jay
 
  On Sat, Aug 15, 2015 at 3:55 PM, Chris Riccomini criccom...@apache.org
  wrote:
 
   Hey all,
  
   I wanted to call attention to KIP-28:
  
  
  
 
 https://cwiki.apache.org/confluence/display/KAFKA/KIP-28+-+Add+a+processor+client
  
   This is the result of the last conversation that we had about
   samza's future direction.
  
   It would be good to have the samza community involved in this.
  
   Cheers,
   Chris
  
 



Re: Use one producer for both coordinator stream and users system?

2015-08-18 Thread Yan Fang
Hi Tao,

First, one kafka producer has an i/o thread. (correct me if I am wrong).

Second, after Samza 0.10.0, we have a coordinator stream, which stores the
checkpoint, config and other locality information for auto-scaling, dynamic
configuration, etc purpose. (See Samza-348
https://issues.apache.org/jira/browse/SAMZA-348). So we have a producer
for this coordinator stream.

Therefore, each contains will have at least two producers, one is for the
coordinator stream, one is for the users system.

My question is, can we use only one producer for both coordinator stream
and the users system to have better performance? (from the doc, it may
retrieve better performance.)

Thanks,

Fang, Yan
yanfang...@gmail.com

On Mon, Aug 17, 2015 at 9:49 PM, Tao Feng fengta...@gmail.com wrote:

 Hi Yan,

 Naive question: what do we need producer thread of coordinator stream for?

 Thanks,
 -Tao

 On Mon, Aug 17, 2015 at 2:09 PM, Yan Fang yanfang...@gmail.com wrote:

  Hi guys,
 
  I have this question because Kafka's doc
  
 
 http://kafka.apache.org/082/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html
  
  seems recommending having one producer shared by all threads (*The
  producer is thread safe and should generally be shared among all threads
  for best performance.*), while currently the coordinator stream is
 using a
  separate producer (usually, there are two producers(two producer threads)
  in each container: one is for the coordinator stream , one is for the
  real job)
 
  1. Will having one producer shared by all thread really improve the
  performance? (haven't done the perf test myself. Guess Kafka has some
  proof).
 
  2. if yes, should we go this way?
 
  Thanks,
 
  Fang, Yan
  yanfang...@gmail.com
 



Re: remote kafka producer -- kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries

2015-08-18 Thread Yan Fang
Thanks, Selina, for sharing this solution for the reference. :)

Cheers,

Fang, Yan
yanfang...@gmail.com

On Mon, Aug 17, 2015 at 7:17 PM, Job-Selina Wu swucaree...@gmail.com
wrote:

 Hi, All:

   Finally I fixed this bug.

 1. set advertised.host.name at config/server.properites as AWS
 *private IP* address
 (not public *DNS*)
 2. comment host.name at config/server.properites
 3. In remote java producer:
 props.put(metadata.broker.list,  borkerPrivateIp+ :9092);
 The value brokerPrivateIp is same as advertised.host.name
  at config/server.properites

  This bug blocked me a while...

 Sincerely,
 Selina

 On Thu, Aug 13, 2015 at 9:56 PM, Job-Selina Wu swucaree...@gmail.com
 wrote:

  Dear All:
 
 I got kafka.common.FailedToSendMessageException: Failed to send
  messages after 3 tries as below. When I have a remote java Kafka producer
  try to produce message to Kafka broker Server. Both Producer and Broker
 are
  at AWS cloud. BTW, I tried my code first at local machine and Virtual
  machine first, It did not work either.(advertised.host.name was set to
  the ip address of the kafka server*)*
 
  *-**This is my KafkaProducer at remote Producer for producer
  configuration**-*
  public class KafkaProducer {
 
  Properties props = new Properties();
 
  private final ProducerString, String producer;
  private final String kafkaServerIP = 52.19.2.74:9092;
 
  public KafkaProducer() {
 
 
  props.put(metadata.broker.list, kafkaServerIP);
  //props.put(bootstrap.servers, localhost:9092 );
  props.put(serializer.class, kafka.serializer.StringEncoder);
  props.put(advertised.host.name, localhost);
  props.put(request.required.acks, 0);
 
  ProducerConfig config = new ProducerConfig(props);
 
  producer = new ProducerString, String(config);
  }
 
  public ProducerString, String getProducer() {
 
  return this.producer;
  }
  }
 
 
  *The configs/server.properties at Kafka Server at AWS*-
 
  zookeeper.connect=localhost:2181
  zookeeper.connection.timeout.ms=6000
 
  delete.topic.enable=true
 
  broker.id=0
  port=9092
  host.name=localhost
  *advertised.host.name http://advertised.host.name*=
  ec2-51-18-21-235.us-west-1.compute.amazonaws.com
 
  # below is same as default
  #advertised.port=port accessible by clients
  #advertised.port=port accessible by clients
  num.network.threads=3
  num.io.threads=8
  socket.send.buffer.bytes=102400
  socket.receive.buffer.bytes=102400
  socket.request.max.bytes=104857600
  log.dirs=/tmp/kafka-logs
  num.partitions=1
  num.recovery.threads.per.data.dir=1
  #log.flush.interval.messages=1
  #log.flush.interval.ms=1000
  log.retention.hours=168
  #log.retention.bytes=1073741824
  log.segment.bytes=1073741824
  log.retention.check.interval.ms=30
  log.cleaner.enable=false
 
 
  - - --- - - - - --
 
  kafka.common.FailedToSendMessageException: Failed to send messages after
 3
  tries.
  kafka.common.FailedToSendMessageException: Failed to send messages after
 3
  tries.
  at
 
 kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
  at kafka.producer.Producer.send(Producer.scala:77)
  at kafka.javaapi.producer.Producer.send(Producer.scala:33)
  at com.cinarra.kafka.Main.main(Main.java:21)
  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  at
 
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
  at
 
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  at java.lang.reflect.Method.invoke(Method.java:606)
  at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:293)
  at java.lang.Thread.run(Thread.java:745)
 
 
  reference:
 
 
 http://stackoverflow.com/questions/30217255/cant-connect-to-a-remote-kafka-producer-from-windows-through-java-code
 
  Your help is highly appreciated,
  Selina
 
 



Re: Review Request 37528: SAMZA-736 BrokerProxy will stuck in infinite loop if consumer.fetch throws OOME

2015-08-18 Thread Yan Fang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/37528/#review95817
---



samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala 
(lines 163 - 164)
https://reviews.apache.org/r/37528/#comment150958

will this stop the Container, or just log the error and finish this thread?

If we want to stop the Container, I think we need to throw a SamzaException 
here.



samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala 
(line 46)
https://reviews.apache.org/r/37528/#comment150960

why is the order changed?



samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala 
(line 366)
https://reviews.apache.org/r/37528/#comment150959

this test seems not testing anything. :)


- Yan Fang


On Aug. 18, 2015, 12:58 p.m., Aleksandar Pejakovic wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/37528/
 ---
 
 (Updated Aug. 18, 2015, 12:58 p.m.)
 
 
 Review request for samza.
 
 
 Repository: samza
 
 
 Description
 ---
 
 Added new catch blocks to prevent infinite loops
 
 
 Diffs
 -
 
   
 samza-core/src/main/scala/org/apache/samza/util/ExponentialSleepStrategy.scala
  376b277 
   samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala 
 614f33f 
   
 samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
  e285dec 
 
 Diff: https://reviews.apache.org/r/37528/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Aleksandar Pejakovic
 




Re: Review Request 37536: SAMZA-710 Update WebServlet and RestServlet to read coordinatorStream information

2015-08-17 Thread Yan Fang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/37536/#review95672
---


I believe this patch works, but could you also add a unit test to verify the 
correctness? Also overusing the localityManager is a little concerned, could 
you think of another way? Thank you.


samza-core/src/main/java/org/apache/samza/job/model/JobModel.java (line 74)
https://reviews.apache.org/r/37536/#comment150767

this is a little overuse of the localityManager. It is not designed to get 
the latest config.



samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala
 (line 106)
https://reviews.apache.org/r/37536/#comment150765

sanitize it as well.


- Yan Fang


On Aug. 17, 2015, 2:56 p.m., Aleksandar Bircakovic wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/37536/
 ---
 
 (Updated Aug. 17, 2015, 2:56 p.m.)
 
 
 Review request for samza.
 
 
 Repository: samza
 
 
 Description
 ---
 
 WebServlet and RestServlet now read information from coordinator stream 
 consumer and get new config.
 
 
 Diffs
 -
 
   
 samza-core/src/main/java/org/apache/samza/coordinator/stream/AbstractCoordinatorStreamManager.java
  ca97ce8 
   samza-core/src/main/java/org/apache/samza/job/model/JobModel.java 7b59274 
   
 samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala
  09f4dc3 
   
 samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterWebServlet.scala
  7fd5122 
 
 Diff: https://reviews.apache.org/r/37536/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Aleksandar Bircakovic
 




[Discuss/Vote] upgrade to Yarn 2.6.0

2015-08-17 Thread Yan Fang
Hi guys,

we have been discussing upgrading to Yarn 2.6.0 (SAMZA-536
https://issues.apache.org/jira/browse/SAMZA-536), because there are some
bug fixes after 2.4.0 and we can not enable the Yarn RM recovering feature
in Yarn 2.4.0 (SAMZA-750 https://issues.apache.org/jira/browse/SAMZA-750)
.

So we just want to make sure if any production users are still using Yarn
2.4.0 and do not plan to upgrade to 2.6.0+?

If not further concern, I think we can go and upgrade to Yarn 2.6.0 in
Samza 0.10.0 release.

Thanks,

Fang, Yan
yanfang...@gmail.com


Use one producer for both coordinator stream and users system?

2015-08-17 Thread Yan Fang
Hi guys,

I have this question because Kafka's doc
http://kafka.apache.org/082/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html
seems recommending having one producer shared by all threads (*The
producer is thread safe and should generally be shared among all threads
for best performance.*), while currently the coordinator stream is using a
separate producer (usually, there are two producers(two producer threads)
in each container: one is for the coordinator stream , one is for the
real job)

1. Will having one producer shared by all thread really improve the
performance? (haven't done the perf test myself. Guess Kafka has some
proof).

2. if yes, should we go this way?

Thanks,

Fang, Yan
yanfang...@gmail.com


Re: remote kafka producer -- kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries

2015-08-16 Thread Yan Fang
Hi Selina,

I guess you can post this question in the Kafka mailing list if this is
pure kafka, there are more experts in that community, though there are
some Kafka experts here as well. :)

Thanks,

Fang, Yan
yanfang...@gmail.com

On Thu, Aug 13, 2015 at 9:56 PM, Job-Selina Wu swucaree...@gmail.com
wrote:

 Dear All:

I got kafka.common.FailedToSendMessageException: Failed to send
 messages after 3 tries as below. When I have a remote java Kafka producer
 try to produce message to Kafka broker Server. Both Producer and Broker are
 at AWS cloud. BTW, I tried my code first at local machine and Virtual
 machine first, It did not work either.(advertised.host.name was set to
 the ip address of the kafka server*)*

 *-**This is my KafkaProducer at remote Producer for producer
 configuration**-*
 public class KafkaProducer {

 Properties props = new Properties();

 private final ProducerString, String producer;
 private final String kafkaServerIP = 52.19.2.74:9092;

 public KafkaProducer() {


 props.put(metadata.broker.list, kafkaServerIP);
 //props.put(bootstrap.servers, localhost:9092 );
 props.put(serializer.class, kafka.serializer.StringEncoder);
 props.put(advertised.host.name, localhost);
 props.put(request.required.acks, 0);

 ProducerConfig config = new ProducerConfig(props);

 producer = new ProducerString, String(config);
 }

 public ProducerString, String getProducer() {

 return this.producer;
 }
 }


 *The configs/server.properties at Kafka Server at AWS*-

 zookeeper.connect=localhost:2181
 zookeeper.connection.timeout.ms=6000

 delete.topic.enable=true

 broker.id=0
 port=9092
 host.name=localhost
 *advertised.host.name http://advertised.host.name*=
 ec2-51-18-21-235.us-west-1.compute.amazonaws.com

 # below is same as default
 #advertised.port=port accessible by clients
 #advertised.port=port accessible by clients
 num.network.threads=3
 num.io.threads=8
 socket.send.buffer.bytes=102400
 socket.receive.buffer.bytes=102400
 socket.request.max.bytes=104857600
 log.dirs=/tmp/kafka-logs
 num.partitions=1
 num.recovery.threads.per.data.dir=1
 #log.flush.interval.messages=1
 #log.flush.interval.ms=1000
 log.retention.hours=168
 #log.retention.bytes=1073741824
 log.segment.bytes=1073741824
 log.retention.check.interval.ms=30
 log.cleaner.enable=false


 - - --- - - - - --

 kafka.common.FailedToSendMessageException: Failed to send messages after 3
 tries.
 kafka.common.FailedToSendMessageException: Failed to send messages after 3
 tries.
 at
 kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
 at kafka.producer.Producer.send(Producer.scala:77)
 at kafka.javaapi.producer.Producer.send(Producer.scala:33)
 at com.cinarra.kafka.Main.main(Main.java:21)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:293)
 at java.lang.Thread.run(Thread.java:745)


 reference:

 http://stackoverflow.com/questions/30217255/cant-connect-to-a-remote-kafka-producer-from-windows-through-java-code

 Your help is highly appreciated,
 Selina




Re: [SAMZA-423] Integrate Lucene into Samza

2015-08-16 Thread Yan Fang
Hi Robert,

Thank you for the contribution and very sorry for the late replay. :(

Left some comments in the JIRA in terms of the design doc. Hope that can
help you polish your design.

Thanks,

Fang, Yan
yanfang...@gmail.com

On Thu, Aug 13, 2015 at 12:29 AM, Robert Zuljevic r.zulje...@levi9.com
wrote:

 Hello all,



 I’ve added a design document to task SAMZA-423. If anyone is interested
 please take a look : )



 I have some outstanding questions that I would like to resolve so I could
 continue working on this as soon as possible:



 1.   Should the API support bulk indexing/removing/matching? (note
 that in some cases this might end up being only cosmetical)?

 2.   Should the match API return the objects representing indexed
 elements or their wrappers (as suggested in the design document)?

 3.   Should this task renamed to something like “Create Document
 Store” and create two other tasks dealing with concrete implementations in
 Luwak and Lucene (which is the endgoal).



 As stated in the ticket any and all suggestions/comments/criticisms are
 welcome : )



 Met vriendelijke groet / Kind regards,

 Robert Žuljević

 Software Developer

 [image: Title: Levi9 IT Services]
 --

 Address: Trifkovicev trg 6, 21000 Novi Sad, Serbia

 Tel.: +31 20 6701 947 | +381 21 2155 500

 Mobile: +381 64 428 28 46

 Skype: robert.zuljevic

 Internet: www.levi9.com



 Chamber of commerce Levi9 Holding: 34221951

 Chamber of commerce Levi9 IT Services BV: 34224746
 --

 This e-mail may contain confidential or privileged information. If you are
 not (one of) the intended recipient(s), please notify the sender
 immediately by reply e-mail and delete this message and any attachments
 permanently without retaining a copy. Any review, disclosure, copying,
 distribution or taking any action in reliance on the contents of this
 e-mail by persons or entities other than the intended recipient(s) is
 strictly prohibited and may be unlawful.

 The services of Levi9 are exclusively subject to its general terms and
 conditions. These general terms and conditions can be found on
 www.levi9.com and a copy will be promptly submitted to you on your
 request and free of charge.





Re: Kill All Jobs

2015-08-13 Thread Yan Fang
Hi Jordi,

Thanks. This is useful. If possible, can you open a JIRA and upload the
patch there?

Thanks,

Fang, Yan
yanfang...@gmail.com

On Thu, Aug 6, 2015 at 8:04 AM, Shekar Tippur ctip...@gmail.com wrote:

 Thanks Jordi. This really helps.

 - Shekar

 On Thu, Aug 6, 2015 at 12:21 AM, Jordi Blasi Uribarri jbl...@nextel.es
 wrote:

  Hi,
 
  As a little present (and I know this is not the way to get the code in
 the
  project, but I am new to this sharing). I just made a simple script to
 kill
  all the jobs running in Samza. It is supposed to live with
 kill-yarn-job.sh
  in the bin folder. It shares me time, so maybe someone finds it helpful.
 
 
  [[ $JAVA_OPTS != *-Dlog4j.configuration* ]]  export
  JAVA_OPTS=$JAVA_OPTS -Dlog4j.configuration=file:$(dirname
  $0)/log4j-console.xml
 
  exec $(dirname $0)/run-class.sh
  org.apache.hadoop.yarn.client.cli.ApplicationCLI application -list | grep
  application_ | awk -F ' ' '{ print $1 }' | while read linea
  do
$(dirname $0)/kill-yarn-job.sh $linea
  done
 
  Hope it helps.
 
 Bye
 
  Jordi
  
  Jordi Blasi Uribarri
  Área I+D+i
 
  jbl...@nextel.es
  Oficina Bilbao
 
  [http://www.nextel.es/wp-content/uploads/Firma_Nextel_2015.png]
 



Review Request 37428: SAMZA-723: hello-samza hangs when using StreamAppender

2015-08-12 Thread Yan Fang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/37428/
---

Review request for samza.


Bugs: SAMZA-723
https://issues.apache.org/jira/browse/SAMZA-723


Repository: samza


Description
---

fixed 2 parts of the problem:
1. Start streamAppender until the JobCoordinator is running
2. deadlock in Producer thread and the main thread

More explaination is in JIRA.


Diffs
-

  samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala 
a926ce6 
  samza-log4j/src/main/java/org/apache/samza/config/Log4jSystemConfig.java 
209296d 
  samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java 
8948453 
  
samza-log4j/src/test/java/org/apache/samza/logging/log4j/TestStreamAppender.java
 3e81240 

Diff: https://reviews.apache.org/r/37428/diff/


Testing
---


Thanks,

Yan Fang



Re: Mailing list join request

2015-08-11 Thread Yan Fang
shouldn't you send to dev-subscr...@samza.apache.org ? :)

Fang, Yan
yanfang...@gmail.com

On Wed, Aug 12, 2015 at 12:33 PM, Eli Reisman apache.mail...@gmail.com
wrote:

 subscribe



Re: Missing a change log offset for SystemStreamPartition

2015-08-10 Thread Yan Fang
$MappedValues.foreach(MapLike.scala:245)
 at
 scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
 at
 org.apache.samza.storage.TaskStorageManager.startConsumers(TaskStorageManager.scala:84)
 at
 org.apache.samza.storage.TaskStorageManager.init(TaskStorageManager.scala:63)
 at
 org.apache.samza.container.TaskInstance.startStores(TaskInstance.scala:88)
 at
 org.apache.samza.container.SamzaContainer$$anonfun$startStores$2.apply(SamzaContainer.scala:607)
 at
 org.apache.samza.container.SamzaContainer$$anonfun$startStores$2.apply(SamzaContainer.scala:607)
 at scala.collection.Iterator$class.foreach(Iterator.scala:727)
 at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
 at
 scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206)
 at
 org.apache.samza.container.SamzaContainer.startStores(SamzaContainer.scala:607)
 at
 org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:550)
 at
 org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer.scala:108)
 at
 org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:87)
 at
 org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)


 The job fails even when there is no message sent to the input topic.

 Samza is version 0.9.1 and kafka 0.8.2.

 Thanks,

   Jordi

 -Mensaje original-
 De: Jordi Blasi Uribarri [mailto:jbl...@nextel.es]
 Enviado el: lunes, 10 de agosto de 2015 10:26
 Para: dev@samza.apache.org
 Asunto: RE: Missing a change log offset for SystemStreamPartition

 Hi,

 I have migrated samza to the last versión and recreated the job with a new
 store name so the streams were created clean. I am getting the same error:

 java version 1.7.0_79
 OpenJDK Runtime Environment (IcedTea 2.5.6) (7u79-2.5.6-1~deb7u1) OpenJDK
 64-Bit Server VM (build 24.79-b02, mixed mode) log4j:WARN No appenders
 could be found for logger (org.apache.samza.metrics.JmxServer).
 log4j:WARN Please initialize the log4j system properly.
 log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for
 more info.
 Exception in thread main org.apache.samza.SamzaException: Missing a
 change log offset for SystemStreamPartition [kafka, commdb-changelog, 2].
 at
 org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3$$anonfun$1.apply(TaskStorageManager.scala:87)
 at
 org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3$$anonfun$1.apply(TaskStorageManager.scala:87)
 at scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
 at scala.collection.AbstractMap.getOrElse(Map.scala:58)
 at
 org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3.apply(TaskStorageManager.scala:87)
 at
 org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3.apply(TaskStorageManager.scala:84)
 at
 scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
 at
 scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
 at
 scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
 at
 scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
 at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
 at
 scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
 at scala.collection.MapLike$MappedValues.foreach(MapLike.scala:245)
 at
 scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
 at
 org.apache.samza.storage.TaskStorageManager.startConsumers(TaskStorageManager.scala:84)
 at
 org.apache.samza.storage.TaskStorageManager.init(TaskStorageManager.scala:63)
 at
 org.apache.samza.container.TaskInstance.startStores(TaskInstance.scala:88)
 at
 org.apache.samza.container.SamzaContainer$$anonfun$startStores$2.apply(SamzaContainer.scala:607)
 at
 org.apache.samza.container.SamzaContainer$$anonfun$startStores$2.apply(SamzaContainer.scala:607)
 at scala.collection.Iterator$class.foreach(Iterator.scala:727)
 at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
 at
 scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206)
 at
 org.apache.samza.container.SamzaContainer.startStores(SamzaContainer.scala:607)
 at
 org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:550)
 at
 org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer.scala:108)
 at
 org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:87)
 at
 org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)

 Is there any other info I can attach to help find the problem?

 Thanks,

   Jordi

 -Mensaje original-
 De: Yan Fang

Re: Missing a change log offset for SystemStreamPartition

2015-08-07 Thread Yan Fang
Hi Jordi,

Sorry for getting you back late. Was quite busy yesterday.

I think the reason of your error is that you mismatched Samza version and
Kafka version.

Seems that, you are using Samza 0.8.1 with Kafka 0.8.2, which is not
supported.

So my suggestion is to upgrade to *Samza 0.9.1*, then use *Kafka 0.8.2*.
This match is proved working.

Hope this helps you.

Thanks,


Fang, Yan
yanfang...@gmail.com

On Thu, Aug 6, 2015 at 1:16 AM, Jordi Blasi Uribarri jbl...@nextel.es
wrote:

 I changed the job name and the store name. I was defining two different
 stores and in case that was the problem, I also eliminated the second one.
 I am getting the same exception.

 Exception in thread main org.apache.samza.SamzaException: Missing a
 change log offset for SystemStreamPartition [kafka, testdb-changelog, 2].
 at
 org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3$$anonfun$1.apply(TaskStorageManager.scala:87)
 at
 org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3$$anonfun$1.apply(TaskStorageManager.scala:87)
 at scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
 at scala.collection.AbstractMap.getOrElse(Map.scala:58)
 at
 org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3.apply(TaskStorageManager.scala:87)
 at
 org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3.apply(TaskStorageManager.scala:84)
 at
 scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
 at
 scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
 at
 scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
 at
 scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
 at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
 at
 scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
 at scala.collection.MapLike$MappedValues.foreach(MapLike.scala:245)
 at
 scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
 at
 org.apache.samza.storage.TaskStorageManager.startConsumers(TaskStorageManager.scala:84)
 at
 org.apache.samza.storage.TaskStorageManager.init(TaskStorageManager.scala:63)
 at
 org.apache.samza.container.TaskInstance.startStores(TaskInstance.scala:88)
 at
 org.apache.samza.container.SamzaContainer$$anonfun$startStores$2.apply(SamzaContainer.scala:607)
 at
 org.apache.samza.container.SamzaContainer$$anonfun$startStores$2.apply(SamzaContainer.scala:607)
 at scala.collection.Iterator$class.foreach(Iterator.scala:727)
 at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
 at
 scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206)
 at
 org.apache.samza.container.SamzaContainer.startStores(SamzaContainer.scala:607)
 at
 org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:550)
 at
 org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer.scala:108)
 at
 org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:87)
 at
 org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)

 As I have the autocreate configured in Kafka I am not creating anything
 for the store. Is that ok?

 By the way, is there any problem on having two different stores?

 Thanks,

 Jordi

 -Mensaje original-
 De: Yan Fang [mailto:yanfang...@gmail.com]
 Enviado el: miércoles, 05 de agosto de 2015 20:23
 Para: dev@samza.apache.org
 Asunto: Re: Missing a change log offset for SystemStreamPartition

 Hi Jordi,

 I wonder, the reason of your first exception is that, you changed the task
 number (partition number of your input stream), but still were using the
 same changelog stream. It is trying to send to the partition 2, which does
 not exist?

 Can you reproduce this exception in a new job? (new store name, new job
 name)

 The second exception is caused by the wrong offset format, I believe.

 Let me know how the new job goes.

 Thanks,

 Fang, Yan
 yanfang...@gmail.com

 On Wed, Aug 5, 2015 at 12:51 AM, Jordi Blasi Uribarri jbl...@nextel.es
 wrote:

  Hi,
 
  I am trying to use the Keystore to manage some state information.
  Basically this is the code I am using. As long as I have tested, the
  rest is working correctly.
 
  private KeyValueStoreString, String storestp;
 
  public void init(Config config, TaskContext context) {
   this.storestp = (KeyValueStoreString, String)
  context.getStore(stepdb);
 }
 
 public void process(IncomingMessageEnvelope envelope,
  MessageCollector collector,
  TaskCoordinator coordinator)
  {
 …
  String str = storestp.get(code

Re: Review Request 37039: SAMZA-748 Coordinator URL always 127.0.0.1

2015-08-06 Thread Yan Fang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/37039/#review94370
---



samza-core/src/main/scala/org/apache/samza/util/Util.scala (line 308)
https://reviews.apache.org/r/37039/#comment148943

use javadoc format, {@link}, not [[]]



samza-core/src/main/scala/org/apache/samza/util/Util.scala (line 310)
https://reviews.apache.org/r/37039/#comment148944

same. not use [[]]


- Yan Fang


On Aug. 4, 2015, 12:42 p.m., József Márton Jung wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/37039/
 ---
 
 (Updated Aug. 4, 2015, 12:42 p.m.)
 
 
 Review request for samza.
 
 
 Repository: samza
 
 
 Description
 ---
 
 We are using InetAddress.getLocalHost().getHostAddress() for the 
 org.apache.samza.coordinator.server.HttpServer#getUrl. But getLocalHost() may 
 return a loopback address, 127.0.0.1, which is not reachable by other 
 machines.
 
 Added a method to Util.scala which resolves the first network address which 
 is not a loopback address.
 
 
 Diffs
 -
 
   
 samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/CoordinatorStreamMessage.java
  e3adc85 
   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
 27b2517 
   
 samza-core/src/main/scala/org/apache/samza/coordinator/server/HttpServer.scala
  dfe3a45 
   samza-core/src/main/scala/org/apache/samza/metrics/JmxServer.scala de45123 
   
 samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporterFactory.scala
  6b7f0ba 
   samza-core/src/main/scala/org/apache/samza/util/Util.scala 419452c 
   samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala ead6f94 
   
 samza-log4j/src/main/java/org/apache/samza/logging/log4j/serializers/LoggingEventJsonSerde.java
  cbf552c 
 
 Diff: https://reviews.apache.org/r/37039/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 József Márton Jung
 




Re: Review Request 36903: SAMZA-744: shutdown stores before shutdown producers

2015-08-05 Thread Yan Fang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36903/#review94193
---



samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTest.scala
 (lines 47 - 49)
https://reviews.apache.org/r/36903/#comment148730

we need to remove the author information. :) And maybe add some java doc 
instead.

My 2 cents:
1. If this is a real test, to be consistent, we may want to use 
TestStreamTask (begin with Test), or change all other TestSomething to 
SomethingTest (e.g. change TestStateful to StatefulTest)

2. If this is not a real test, I prefer something like StreamTaskUtil to be 
less ambiguous.



samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTest.scala
 (line 96)
https://reviews.apache.org/r/36903/#comment148740

is this tag used?



samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTest.scala
 (line 148)
https://reviews.apache.org/r/36903/#comment148741

same, is this used?



samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTest.scala
 (line 169)
https://reviews.apache.org/r/36903/#comment148742

There is no TestJob. (I know, it is copy/paste issue :)



samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTest.scala
 (line 176)
https://reviews.apache.org/r/36903/#comment148752

why TestStateStoreTask here? I think you mean TestTask.awaitTaskReistered



samza-test/src/test/scala/org/apache/samza/test/integration/TestShutdownContainer.scala
 (line 64)
https://reviews.apache.org/r/36903/#comment148745

From the description, it is not testing the Container Shutdown, actually it 
is testing the store restoring feature.



samza-test/src/test/scala/org/apache/samza/test/integration/TestShutdownContainer.scala
 (line 66)
https://reviews.apache.org/r/36903/#comment148734

Since we already are doing the abstraction, is it possible to put the 
common config into StreamTastTest object? Becaue I see a lot of the same 
configs in ShutdownContainerTest and TestStatefulTask.



samza-test/src/test/scala/org/apache/samza/test/integration/TestShutdownContainer.scala
 (lines 87 - 89)
https://reviews.apache.org/r/36903/#comment148755

in the 0.10.0, we do not have checkpoint factory, I believe



samza-test/src/test/scala/org/apache/samza/test/integration/TestShutdownContainer.scala
 (lines 142 - 146)
https://reviews.apache.org/r/36903/#comment148754

are those two methods used anywhere?



samza-test/src/test/scala/org/apache/samza/test/integration/TestShutdownContainer.scala
 (line 155)
https://reviews.apache.org/r/36903/#comment148758

how about adding override ?



samza-test/src/test/scala/org/apache/samza/test/integration/TestShutdownContainer.scala
 (line 165)
https://reviews.apache.org/r/36903/#comment148759

how about adding override?



samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
 (lines 88 - 91)
https://reviews.apache.org/r/36903/#comment148756

same



samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
 (line 95)
https://reviews.apache.org/r/36903/#comment148757

actually i do not understand why we need a companion object here. We just 
use the default task number, 1.

And awaitTaskRegistered and register methods are not used anywhere.



samza-test/src/test/scala/org/apache/samza/test/integration/TestTask.scala 
(lines 32 - 34)
https://reviews.apache.org/r/36903/#comment148731

Instead of the author information, I think putting some java doc explaining 
this class/object will be better.



samza-test/src/test/scala/org/apache/samza/test/integration/TestTask.scala 
(line 37)
https://reviews.apache.org/r/36903/#comment148749

rm ;


- Yan Fang


On Aug. 4, 2015, 9:30 p.m., Yi Pan (Data Infrastructure) wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/36903/
 ---
 
 (Updated Aug. 4, 2015, 9:30 p.m.)
 
 
 Review request for samza, Yan Fang, Chinmay Soman, Chris Riccomini, and 
 Navina Ramesh.
 
 
 Bugs: SAMZA-744
 https://issues.apache.org/jira/browse/SAMZA-744
 
 
 Repository: samza
 
 
 Description
 ---
 
 SAMZA-744: shutdown stores before shutdown producers
 
 
 Diffs
 -
 
   build.gradle 0852adc4e8e0c2816afd1ebf433f1af6b44852f7 
   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
 27b2517048ad5730762506426ee7578c66181db8 
   
 samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTest.scala
  PRE-CREATION 
   
 samza-test/src/test/scala/org/apache/samza/test/integration/TestShutdownContainer.scala
  PRE-CREATION 
   
 samza-test/src/test/scala/org/apache/samza/test/integration

Re: Missing a change log offset for SystemStreamPartition

2015-08-05 Thread Yan Fang
Hi Jordi,

I wonder, the reason of your first exception is that, you changed the task
number (partition number of your input stream), but still were using the
same changelog stream. It is trying to send to the partition 2, which does
not exist?

Can you reproduce this exception in a new job? (new store name, new job
name)

The second exception is caused by the wrong offset format, I believe.

Let me know how the new job goes.

Thanks,

Fang, Yan
yanfang...@gmail.com

On Wed, Aug 5, 2015 at 12:51 AM, Jordi Blasi Uribarri jbl...@nextel.es
wrote:

 Hi,

 I am trying to use the Keystore to manage some state information.
 Basically this is the code I am using. As long as I have tested, the rest
 is working correctly.

 private KeyValueStoreString, String storestp;

 public void init(Config config, TaskContext context) {
  this.storestp = (KeyValueStoreString, String)
 context.getStore(stepdb);
}

public void process(IncomingMessageEnvelope envelope,
 MessageCollector collector,
 TaskCoordinator coordinator)
 {
…
 String str = storestp.get(code)
 …
 }

 When I load it, it goes to running but, whe I send the messages through
 Kafka stream It goes to Failed state. I have found this Exception:
 Exception in thread main org.apache.samza.SamzaException: Missing a
 change log offset for SystemStreamPartition [kafka, stepdb-changelog, 2].
 at
 org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3$$anonfun$1.apply(TaskStorageManager.scala:87)
 at
 org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3$$anonfun$1.apply(TaskStorageManager.scala:87)
 at scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
 at scala.collection.AbstractMap.getOrElse(Map.scala:58)
 at
 org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3.apply(TaskStorageManager.scala:87)
 at
 org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3.apply(TaskStorageManager.scala:84)
 at
 scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
 at
 scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
 at
 scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
 at
 scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
 at scala.collection.immutable.Map$Map2.foreach(Map.scala:130)
 at
 scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
 at scala.collection.MapLike$MappedValues.foreach(MapLike.scala:245)
 at
 scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
 at
 org.apache.samza.storage.TaskStorageManager.startConsumers(TaskStorageManager.scala:84)
 at
 org.apache.samza.storage.TaskStorageManager.init(TaskStorageManager.scala:63)
 at
 org.apache.samza.container.TaskInstance.startStores(TaskInstance.scala:88)
 at
 org.apache.samza.container.SamzaContainer$$anonfun$startStores$2.apply(SamzaContainer.scala:607)
 at
 org.apache.samza.container.SamzaContainer$$anonfun$startStores$2.apply(SamzaContainer.scala:607)
 at scala.collection.Iterator$class.foreach(Iterator.scala:727)
 at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
 at
 scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206)
 at
 org.apache.samza.container.SamzaContainer.startStores(SamzaContainer.scala:607)
 at
 org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:550)
 at
 org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer.scala:108)
 at
 org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:87)
 at
 org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)

 I have seen that the stepdb-changelog stream exists in Kafka. As a try to
 regenerate the missing offset and tes it I have connected through the
 command line and send a message to the stream. It was received correctly.
 Now I am seeing the following Exception:

 Exception in thread main java.lang.NullPointerException
 at
 scala.collection.mutable.ArrayOps$ofByte$.length$extension(ArrayOps.scala:126)
 at
 scala.collection.mutable.ArrayOps$ofByte.length(ArrayOps.scala:126)
 at scala.collection.SeqLike$class.size(SeqLike.scala:106)
 at
 scala.collection.mutable.ArrayOps$ofByte.size(ArrayOps.scala:120)
 at
 org.apache.samza.storage.kv.KeyValueStorageEngine$$anonfun$restore$1.apply(KeyValueStorageEngine.scala:94)
 at
 org.apache.samza.storage.kv.KeyValueStorageEngine$$anonfun$restore$1.apply(KeyValueStorageEngine.scala:79)
 at scala.collection.Iterator$class.foreach(Iterator.scala:727)
 at 

What happens after changelog reaches the Kafka retention

2015-08-04 Thread Yan Fang
Hi guys,

Have a question about the changelog topic. Currently we are restoring the
kv store by reading the whole changelog topic from the Kafka. So what will
happen after Kafka deletes some log segment after the retention time? Will
the changelog miss some values?

Thanks,

Fang, Yan
yanfang...@gmail.com


Re: log4j configuration

2015-08-04 Thread Yan Fang
Hi Jordi,

This is a little tricky. :)

1. If you want to specify the logs to specific locations, please use the
following two properties, such as

*task.opts*=-Dsamza.log.dir=/tmp/samza-logs
*yarn.am.opts*=-Dsamza.log.dir=/tmp/samza-master-logs

2. then why doesn't export SAMZA_LOG_DIR work?

Because this setting only takes effect in your host machine. When you
submit the job, AM and containers do not inherit the environment variables
from their host.

3. then why can you still see the gc logs?

Though this variable does not affect AM and containers, it is used by the
YarnClient, which is used to submit the job. So essentially the gc log is
for the YarnClient (it dies after submitting). The log of YarnClient is
taken care by the* /bin/log4j-console.xml* file, not lib/log4j.xml (this
one is for the AM and all containers).

Hope this makes you a little clear.

Thanks,






Fang, Yan
yanfang...@gmail.com

On Tue, Aug 4, 2015 at 12:44 AM, Jordi Blasi Uribarri jbl...@nextel.es
wrote:

 Hi,

 I guess this is just a howto question, but I am not able to find how it
 works. I am trying to trace the code of the job I want to execute in Samza.
 I have defined the environment variable as stated in the documentation:
 export SAMZA_LOG_DIR=/opt/logs

 I believe that this is working as I have seen that the garbage collector
 correctly generates the gc.o file and writes to it. The directories exists
 in all samza servers (two in my lab).

 I have added the following code to the job:

 Logger logger = Logger.getLogger(GetConfig.class);
 logger.info(Trace text message);

 Although the code is being executed (I can see the messages going through)
 I see no trace written. I don’t have experience with log4j and maybe it is
 there where I have to look but, I am missing something?

 Thanks,

 Jordi
 
 Jordi Blasi Uribarri
 Área I+D+i

 jbl...@nextel.es
 Oficina Bilbao

 [http://www.nextel.es/wp-content/uploads/Firma_Nextel_2015.png]



Review Request 37102: SAMZA-753: BrokerProxy stop should shutdown kafka consumer first

2015-08-04 Thread Yan Fang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/37102/
---

Review request for samza.


Bugs: SAMZA-753
https://issues.apache.org/jira/browse/SAMZA-753


Repository: samza


Description
---

shutdown the kafka consumer before interrupting the BrokerProxy


Diffs
-

  samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala 
614f33f 

Diff: https://reviews.apache.org/r/37102/diff/


Testing
---


Thanks,

Yan Fang



Re: What happens after changelog reaches the Kafka retention

2015-08-04 Thread Yan Fang
Aha, ok, that makes sense. Thanks, Yi.

Fang, Yan
yanfang...@gmail.com

On Tue, Aug 4, 2015 at 3:22 PM, Yi Pan nickpa...@gmail.com wrote:

 Hi, Yan,

 The changelog topic should be configured as log-compacted topic, which
 means that it will not be deleted due to time-retention.

 -Yi

 On Tue, Aug 4, 2015 at 3:15 PM, Yan Fang yanfang...@gmail.com wrote:

  Hi guys,
 
  Have a question about the changelog topic. Currently we are restoring the
  kv store by reading the whole changelog topic from the Kafka. So what
 will
  happen after Kafka deletes some log segment after the retention time?
 Will
  the changelog miss some values?
 
  Thanks,
 
  Fang, Yan
  yanfang...@gmail.com
 



Re: samza environment variable on containers

2015-08-03 Thread Yan Fang
Maybe @Eli Reisman can give you some insights, since he is writing the HDFS
producer. Because the exception looks like related to the hdfs consumer.

Thanks,

Fang, Yan
yanfang...@gmail.com

On Sun, Aug 2, 2015 at 12:41 PM, Chen Song chen.song...@gmail.com wrote:

 Thanks Yan.

 I ran into issues when testing jobs on kerberized cluster. The job reads
 from HDFS and it worked well before. After testing on kerberized cluster,
 the Samza container threw exception as below. I am not sure how kerberos
 has anything to do with this.

 java.lang.UnsatisfiedLinkError:
 org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy()Z
 at
 org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy(Native Method)
 at

 org.apache.hadoop.io.compress.SnappyCodec.checkNativeCodeLoaded(SnappyCodec.java:63)
 at

 org.apache.hadoop.io.compress.SnappyCodec.getDecompressorType(SnappyCodec.java:192)
 at
 org.apache.hadoop.io.compress.CodecPool.getDecompressor(CodecPool.java:176)
 at
 org.apache.hadoop.io.SequenceFile$Reader.init(SequenceFile.java:1916)
 at
 org.apache.hadoop.io.SequenceFile$Reader.initialize(SequenceFile.java:1811)
 at
 org.apache.hadoop.io.SequenceFile$Reader.init(SequenceFile.java:1760)
 at

 com.appnexus.data.samza.hdfs.HdfsProtobufSequenceFileReaderWriter.messages(HdfsProtobufSequenceFileReaderWriter.scala:20)
 at

 com.appnexus.data.samza.systems.HdfsSystemConsumer$$anonfun$poll$1$$anonfun$1.apply(HdfsSystemConsumer.scala:84)
 at

 com.appnexus.data.samza.systems.HdfsSystemConsumer$$anonfun$poll$1$$anonfun$1.apply(HdfsSystemConsumer.scala:76)

 After googling a lot, I stumbled upon this thread,

 http://stackoverflow.com/questions/22150417/hadoop-mapreduce-java-lang-unsatisfiedlinkerror-org-apache-hadoop-util-nativec
 .

 If anyone has any thoughts on this error, please advise.

 Chen

 On Thu, Jul 30, 2015 at 5:15 PM, Yan Fang yanfang...@gmail.com wrote:

  Hi Chen Song,
 
  I do not think there is a way in Samza with which you can specify the ENV
  for Samza container.
 
  And currently Samza does not read the LD_LIBRARY_PATH either.
 
  Samza only puts the files in lib/*.[jw]ar into the CLASSPATH.
 
  Though -Djava.library.path might work,  it will cause hadoop errors. :(
 
  Thanks,
 
  Fang, Yan
  yanfang...@gmail.com
 
  On Thu, Jul 30, 2015 at 7:05 AM, Chen Song chen.song...@gmail.com
 wrote:
 
   Maybe a dumb question.
  
   Is there a way to set an ENV for samza containers?
  
   We want to set LD_LIBRARY_PATH to include hadoop native libs.
  
   --
   Chen Song
  
 



 --
 Chen Song



Re: testThreadInterruptInOperationSleep on clean installation

2015-08-03 Thread Yan Fang
Hi Jordi,

Those two exceptions seems like caused by the race condition. Since I can
not reproduce it, can you try 1) kill all the GradleDaemon and
GradleWrapperMain processes when you rerun the build ? 2) can you try to
run those two tests in the eclipse (or some other ways) without gradle ? I
doubt both are related to the gradle.

Thanks,

Fang, Yan
yanfang...@gmail.com

On Sun, Aug 2, 2015 at 11:45 PM, Jordi Blasi Uribarri jbl...@nextel.es
wrote:

 Hi,

 I am trying to do a clean installation of Samza on a newly installed
 Debian 7.8 box. Following the stpes I collected in a previous 0.8.2 Samza
 installation I have performed the following steps:

 apt-get install openjdk-7-jdk openjdk-7-jre git maven curl
 vi /root/.bashrc
 export JAVA_HOME=/usr/lib/jvm/java-7-openjdk-amd64
 export CLASSPATH=$CLASSPATH:/usr/share/java

 cd /opt
 git clone http://git-wip-us.apache.org/repos/asf/samza.git
 cd samza
 ./gradlew clean build


 Every time I run it I get an error on the test the script runs:
 testThreadInterruptInOperationSleep
 va.lang.AssertionError: expected:1 but was:0
at org.junit.Assert.fail(Assert.java:91)
at org.junit.Assert.failNotEquals(Assert.java:645)
at org.junit.Assert.assertEquals(Assert.java:126)
at org.junit.Assert.assertEquals(Assert.java:470)
at org.junit.Assert.assertEquals(Assert.java:454)
at
 org.apache.samza.util.TestExponentialSleepStrategy.testThreadInterruptInOperationSleep(TestExponentialSleepStrategy.scala:158)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
 org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:44)
at
 org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:15)
at
 org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:41)
at
 org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:20)
at
 org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:76)
at
 org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:193)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:52)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:191)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:42)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:184)
at org.junit.runners.ParentRunner.run(ParentRunner.java:236)
at
 org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.runTestClass(JUnitTestClassExecuter.java:86)
at
 org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.execute(JUnitTestClassExecuter.java:49)
at
 org.gradle.api.internal.tasks.testing.junit.JUnitTestClassProcessor.processTestClass(JUnitTestClassProcessor.java:69)
at
 org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:48)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
 org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
at
 org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
at
 org.gradle.messaging.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
at
 org.gradle.messaging.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93)
at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
at
 org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:105)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
 org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
at
 org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
at
 org.gradle.messaging.remote.internal.hub.MessageHub$Handler.run(MessageHub.java:355)
at
 

Re: samza environment variable on containers

2015-07-30 Thread Yan Fang
Hi Chen Song,

I do not think there is a way in Samza with which you can specify the ENV
for Samza container.

And currently Samza does not read the LD_LIBRARY_PATH either.

Samza only puts the files in lib/*.[jw]ar into the CLASSPATH.

Though -Djava.library.path might work,  it will cause hadoop errors. :(

Thanks,

Fang, Yan
yanfang...@gmail.com

On Thu, Jul 30, 2015 at 7:05 AM, Chen Song chen.song...@gmail.com wrote:

 Maybe a dumb question.

 Is there a way to set an ENV for samza containers?

 We want to set LD_LIBRARY_PATH to include hadoop native libs.

 --
 Chen Song



Re: Review Request 35445: SAMZA-693: Very basic HDFS Producer service for Samza

2015-07-30 Thread Yan Fang


 On July 30, 2015, 6:59 p.m., Yan Fang wrote:
  samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/writer/JobNameDateTimeBucketer.scala,
   line 40
  https://reviews.apache.org/r/35445/diff/4/?file=1023371#file1023371line40
 
  My overall concern here is that, if there are more than one tasks are 
  running, is it possible that all the tasks are writing to one file at the 
  same time?
 
 Eli Reisman wrote:
 I don't think so, each registered source should be using it's own 
 HdfsWriter in write() calls even on the same Producer and the filenames per 
 writer are unique-ified in the writer impl. There are other ways to 
 accomplish that uniqueness though.

I see. We are using the UUID.randomUUID to make sure the writers writes to 
different files. This is fine unless we win the lottery. :)


 On July 30, 2015, 6:59 p.m., Yan Fang wrote:
  samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/writer/TextSequenceFileHdfsWriter.scala,
   line 37
  https://reviews.apache.org/r/35445/diff/4/?file=1023373#file1023373line37
 
  I would prefer the param idea because 1) Samza is already using this 
  fashion 2) less code especially when there are more SequenceFileHdfsWriter 
  come out (LongWritable, etc)
  
  like the casting of the outgoing message to something not-Writable 
  like Array[Byte] or String might require a third param and it might start 
  to get awkward
  
  -- We can always cast the outgoing msg to Array[Byte] using the serde 
  defined for this msg. So as long as the Wriable accepts Array[Byte], this 
  should be fine.
  
  Also there are some Writable types that would not allow us to 
  determine message size for batching purposes the way 
  
  -- I think we can either give it a default size (this can be 
  configurable) when there is not getLength method or use a subclass. Either 
  way will be fine.
 
 Eli Reisman wrote:
 I definitely agree on the less code point, and I think we can move 
 functions like the compression selection to the base class.
 
 But, I don't think we can't just cast to Array[Byte] for all the Writable 
 types to accept the message, even from the serde. Only Text and BytesWritable 
 will accept Array[Byte] messages, so we will be limited to just those two 
 types forever if we are only using that cast on the outgoing message before 
 wrapping it in the Writable. If that works (i.e. messages will never be 
 FloatWritable, LongWritable etc.) then generics will work there.
 
 But the getLength issue still presents a problem. We already have a 
 configuration to set a batch size default or user-defined one, but getLength 
 is called per-message-write, and it's how we track how big the current file 
 is. We won't know when to split or when we hit that configured size without 
 tracking it. Each Writable will need slightly different logic to pick up or 
 estimate message size, they don't all supply a getLength call for byte size.
 
 So again that seems to force us to only work with BytesWritable and Text 
 value types? If I'm completely missing something here please let me know and 
 we can make the desire changes. Thanks for the input!

I see. Considering those facts, IMO, moving the common code, such as 
getCompression, getBukets, to their parent class is sufficient.

BTW, def getBucketer(systemName: String, config: HdfsConfig) = {
new JobNameDateTimeBucketer(systemName, config)
  }
  
Should it be Bucketer.getInstance(systemName, config) ?

If we move it to the parent class, then just bucketer, I think.


- Yan


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/35445/#review93614
---


On July 28, 2015, 5:25 a.m., Eli Reisman wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/35445/
 ---
 
 (Updated July 28, 2015, 5:25 a.m.)
 
 
 Review request for samza.
 
 
 Repository: samza
 
 
 Description
 ---
 
 SAMZA-693: Very basic HDFS Producer service for Samza
 
 
 Diffs
 -
 
   build.gradle 0852adc 
   docs/learn/documentation/versioned/hdfs/producer.md PRE-CREATION 
   samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsConfig.scala 
 PRE-CREATION 
   
 samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemAdmin.scala 
 PRE-CREATION 
   
 samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemFactory.scala
  PRE-CREATION 
   
 samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemProducer.scala
  PRE-CREATION 
   
 samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemProducerMetrics.scala
  PRE-CREATION 
   
 samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/writer/BinarySequenceFileHdfsWriter.scala
  PRE-CREATION

Re: Coordinator URL always 127.0.0.1

2015-07-30 Thread Yan Fang
Created https://issues.apache.org/jira/browse/SAMZA-748

Fang, Yan
yanfang...@gmail.com

On Thu, Jul 30, 2015 at 7:17 PM, Yi Pan nickpa...@gmail.com wrote:

 +1 on the fix in 0.10.0. It should be an easy one.

 On Thu, Jul 30, 2015 at 7:08 PM, Yan Fang yanfang...@gmail.com wrote:

  Hi Thommy,
 
  {quote}
  Because I don't see how this is ever going to work in scenarios where the
  AM is on a different node than the containers.
  {quote}
 
  -- I do not quite understand this part. AM essentially is running in a
  container as well. And the http server is brought up in the same
 container.
 
  {quote}
  even if we can't get a better address for the AM from YARN, we could at
  least filter the addresses we get back from the JVM to exclude loopbacks.
  {quote}
 
  -- You are right. InetAddress.getLocalHost() gives back loopback address
  sometimes. We should filter this out. Just googling one possible solution
  http://www.coderanch.com/t/491883/java/java/IP .
 
  + @Yi, @Navina,
 
  Also, I think this fix should go to the 0.10.0 release.
 
  What do you guys think?
 
  Thanks,
 
  Fang, Yan
  yanfang...@gmail.com
 
  On Thu, Jul 30, 2015 at 6:39 PM, Yan Fang yanfang...@gmail.com wrote:
 
   Just one point to add:
  
   {quote}
   AM gets notified of container status from the RM.
   {quote}
  
   I think this is not 100% correct. AM can communicate with NM through
   NMClientAsync
   
 
 https://hadoop.apache.org/docs/r2.7.1/api/org/apache/hadoop/yarn/client/api/async/NMClientAsync.html
 
  to
   get container status, though Samza does not implement the
  CallbackHandler.
  
   Thanks,
  
   Fang, Yan
   yanfang...@gmail.com
  
   On Thu, Jul 30, 2015 at 6:06 PM, Navina Ramesh 
   nram...@linkedin.com.invalid wrote:
  
   The NM (and hence, by extension the container) heartbeats to the RM,
 not
   the AM. AM gets notified of container status from the RM.
   The AM starts / stops /releases a container process by communicating
 to
   the
   NM.
  
   Navina
  
  
   On Thu, Jul 30, 2015 at 5:55 PM, Thomas Becker tobec...@tivo.com
  wrote:
  
Ok, I thought there was some communication from the container to the
  AM,
it sounds like you're saying it's in the other direction only?
 Don't
containers heartbeat to the AM?  Regardless, even if we can't get a
   better
address for the AM from YARN, we could at least filter the addresses
  we
   get
back from the JVM to exclude loopbacks.
   
-Tommy

From: Navina Ramesh [nram...@linkedin.com.INVALID]
Sent: Thursday, July 30, 2015 8:40 PM
To: dev@samza.apache.org
Subject: Re: Coordinator URL always 127.0.0.1
   
Hi Tommy,
Yi is right. Container start is coordinated by the AppMaster using
 an
NMClient. Container host name and port is provided by the RM during
allocation.
In Yarn (at least, afaik), when the node joins a cluster, the NM
   registers
itself with the RM. So, the NM might still be using
getLocalhost.getAddress().
   
I don't know of any other way to programmatically fetch the
 machine's
hostname (apart from some hacky shell commands).
   
Cheers,
Navina
   
On Thu, Jul 30, 2015 at 5:23 PM, Yi Pan nickpa...@gmail.com
 wrote:
   
 Hi, Tommy,

 Yeah, I agree that the current implementation is not bullet-proof
 to
   any
 different networking configuration on the host. As for the AM -
container
 communication, if I am not mistaken, it is through the NMClient
 and
   the
 node HTTP address is wrapped within the Container object returned
  from
RM.
 I am not very familiar with that part of source code. Navina may
 be
   able
to
 help more here.

 -Yi

 On Thu, Jul 30, 2015 at 4:27 PM, Thomas Becker tobec...@tivo.com
 
wrote:

  Hi Yi,
  Thanks a lot for your reply.  I don't doubt we can get it to
 work
  by
  mucking with the networking configuration, but to me this feels
   like a
  workaround, not a solution.
InetAddress.getLocalHost().getHostAddress()
 is
  not a reliable way of obtaining an IP that other machines can
   connect
to.
  Just today I tested on several Linux distros and it did not work
  on
   any
 of
  them.  Can we do something more robust here?  How does the
  container
  communicate status to the AM?
 
  -Tommy
 
  
  From: Yi Pan [nickpa...@gmail.com]
  Sent: Thursday, July 30, 2015 6:48 PM
  To: dev@samza.apache.org
  Subject: Re: Coordinator URL always 127.0.0.1
 
  Hi, Tommy,
 
  I think that it might be a commonly asked question regarding to
multiple
  IPs on a single host. A common trick w/o changing code is
 (copied
   from
 SO:
 
 

   
  
 
 http://stackoverflow.com/questions/2381316/java-inetaddress-getlocalhost-returns-127-0-0-1-how-to-get-real-ip
  )
 
  {code

Re: Review Request 35445: SAMZA-693: Very basic HDFS Producer service for Samza

2015-07-30 Thread Yan Fang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/35445/#review93614
---



docs/learn/documentation/versioned/hdfs/producer.md (line 33)
https://reviews.apache.org/r/35445/#comment148011

com.etsy - org.apache.



docs/learn/documentation/versioned/hdfs/producer.md (line 65)
https://reviews.apache.org/r/35445/#comment148012

you also need to add a link in the document/index.html to make it 
reachable.



samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/writer/BinarySequenceFileHdfsWriter.scala
 (line 38)
https://reviews.apache.org/r/35445/#comment148015

can we use the camel case with an initial lower case character  for the 
varaible? Just to obey the same coding guide 
http://samza.apache.org/contribute/coding-guide.html

Thank you.



samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/writer/Bucketer.scala 
(line 38)
https://reviews.apache.org/r/35445/#comment148016

not necessary



samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/writer/JobNameDateTimeBucketer.scala
 (line 40)
https://reviews.apache.org/r/35445/#comment148018

My overall concern here is that, if there are more than one tasks are 
running, is it possible that all the tasks are writing to one file at the same 
time?



samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/writer/JobNameDateTimeBucketer.scala
 (line 55)
https://reviews.apache.org/r/35445/#comment148017

this will very possibly never be executed, because line 53 already makes 
currentDateTime == dateTime.



samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/writer/TextSequenceFileHdfsWriter.scala
 (line 37)
https://reviews.apache.org/r/35445/#comment148024

I would prefer the param idea because 1) Samza is already using this 
fashion 2) less code especially when there are more SequenceFileHdfsWriter come 
out (LongWritable, etc)

like the casting of the outgoing message to something not-Writable like 
Array[Byte] or String might require a third param and it might start to get 
awkward

-- We can always cast the outgoing msg to Array[Byte] using the serde 
defined for this msg. So as long as the Wriable accepts Array[Byte], this 
should be fine.

Also there are some Writable types that would not allow us to determine 
message size for batching purposes the way 

-- I think we can either give it a default size (this can be configurable) 
when there is not getLength method or use a subclass. Either way will be fine.



samza-hdfs/src/test/scala/org/apache/samza/system/hdfs/TestHdfsSystemProducerTestSuite.scala
 (line 106)
https://reviews.apache.org/r/35445/#comment148019

do we need to bring up the MiniCluster in every test? Is it be better if we 
just bring them up in the beforeClass and then shutdown it in afterClass?


- Yan Fang


On July 28, 2015, 5:25 a.m., Eli Reisman wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/35445/
 ---
 
 (Updated July 28, 2015, 5:25 a.m.)
 
 
 Review request for samza.
 
 
 Repository: samza
 
 
 Description
 ---
 
 SAMZA-693: Very basic HDFS Producer service for Samza
 
 
 Diffs
 -
 
   build.gradle 0852adc 
   docs/learn/documentation/versioned/hdfs/producer.md PRE-CREATION 
   samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsConfig.scala 
 PRE-CREATION 
   
 samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemAdmin.scala 
 PRE-CREATION 
   
 samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemFactory.scala
  PRE-CREATION 
   
 samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemProducer.scala
  PRE-CREATION 
   
 samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemProducerMetrics.scala
  PRE-CREATION 
   
 samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/writer/BinarySequenceFileHdfsWriter.scala
  PRE-CREATION 
   
 samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/writer/Bucketer.scala 
 PRE-CREATION 
   
 samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/writer/HdfsWriter.scala
  PRE-CREATION 
   
 samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/writer/JobNameDateTimeBucketer.scala
  PRE-CREATION 
   
 samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/writer/SequenceFileHdfsWriter.scala
  PRE-CREATION 
   
 samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/writer/TextSequenceFileHdfsWriter.scala
  PRE-CREATION 
   samza-hdfs/src/test/resources/samza-hdfs-test-batch-job-text.properties 
 PRE-CREATION 
   samza-hdfs/src/test/resources/samza-hdfs-test-batch-job.properties 
 PRE-CREATION 
   samza-hdfs/src/test/resources/samza-hdfs-test-job-text.properties 
 PRE-CREATION 
   samza-hdfs/src/test/resources/samza-hdfs-test

Re: Review Request 35445: SAMZA-693: Very basic HDFS Producer service for Samza

2015-07-30 Thread Yan Fang


 On July 29, 2015, 7:41 p.m., Navina Ramesh wrote:
  docs/learn/documentation/versioned/hdfs/producer.md, line 24
  https://reviews.apache.org/r/35445/diff/4/?file=1023362#file1023362line24
 
  Can you please update the list of available configuration for this 
  system at 
  http://samza.apache.org/learn/documentation/latest/jobs/configuration-table.html
   as well? 
  
  Like we did with elastic search, adding an example job to hello-samza 
  will be very useful for adoption. If not in this JIRA, please consider 
  opening a follow-up JIRA to add this example.
 
 Eli Reisman wrote:
 Yeah I can do that. I think adding an example to hello-samza is a great 
 idea, but I agree I would make that a separate ticket and then I'd claim that 
 ticket.
 
 Eli Reisman wrote:
 I'd also be interested in taking some follow up tickets for more output 
 formats than just seq files once this is out, and possibly the reader 
 implemention too although we'd need some up front discussion on the JIRA 
 ticket about what that looks like in a long-lived Samza job that needs to 
 read forever

That will be great ! :)


- Yan


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/35445/#review93501
---


On July 28, 2015, 5:25 a.m., Eli Reisman wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/35445/
 ---
 
 (Updated July 28, 2015, 5:25 a.m.)
 
 
 Review request for samza.
 
 
 Repository: samza
 
 
 Description
 ---
 
 SAMZA-693: Very basic HDFS Producer service for Samza
 
 
 Diffs
 -
 
   build.gradle 0852adc 
   docs/learn/documentation/versioned/hdfs/producer.md PRE-CREATION 
   samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsConfig.scala 
 PRE-CREATION 
   
 samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemAdmin.scala 
 PRE-CREATION 
   
 samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemFactory.scala
  PRE-CREATION 
   
 samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemProducer.scala
  PRE-CREATION 
   
 samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemProducerMetrics.scala
  PRE-CREATION 
   
 samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/writer/BinarySequenceFileHdfsWriter.scala
  PRE-CREATION 
   
 samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/writer/Bucketer.scala 
 PRE-CREATION 
   
 samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/writer/HdfsWriter.scala
  PRE-CREATION 
   
 samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/writer/JobNameDateTimeBucketer.scala
  PRE-CREATION 
   
 samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/writer/SequenceFileHdfsWriter.scala
  PRE-CREATION 
   
 samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/writer/TextSequenceFileHdfsWriter.scala
  PRE-CREATION 
   samza-hdfs/src/test/resources/samza-hdfs-test-batch-job-text.properties 
 PRE-CREATION 
   samza-hdfs/src/test/resources/samza-hdfs-test-batch-job.properties 
 PRE-CREATION 
   samza-hdfs/src/test/resources/samza-hdfs-test-job-text.properties 
 PRE-CREATION 
   samza-hdfs/src/test/resources/samza-hdfs-test-job.properties PRE-CREATION 
   
 samza-hdfs/src/test/scala/org/apache/samza/system/hdfs/TestHdfsSystemProducerTestSuite.scala
  PRE-CREATION 
   settings.gradle 19bff97 
 
 Diff: https://reviews.apache.org/r/35445/diff/
 
 
 Testing
 ---
 
 Updated: See JIRA SAMZA-693 for details, this latest update (693-4) addresses 
 post-review issues and adds more pluggable design, several default writer 
 implementations, and more (and more thorough) unit tests.
 
 Passes 'gradle clean test'.
 
 
 Thanks,
 
 Eli Reisman
 




Re: Review Request 36163: SAMZA-690: changelog topic creation should not be in the container code

2015-07-29 Thread Yan Fang


 On July 29, 2015, 2:45 p.m., Robert Zuljevic wrote:
  samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala,
   lines 121-125
  https://reviews.apache.org/r/36163/diff/2/?file=1003380#file1003380line121
 
  Did you mean something like this?
  
  for ((storeName, systemStream) - changeLogSystemStreams) {
val systemAdmin = Util.getObj[SystemFactory](config
  .getSystemFactory(systemStream.getSystem)
  .getOrElse(throw new SamzaException(A stream uses system %s, 
  which is missing from the configuration. format systemStream.getSystem))
  ).getAdmin(systemStream.getSystem, config)
  
systemAdmin.createChangelogStream(systemStream.getStream, 
  changeLogPartitions)
  }
  
  This is the only way I could thought of for simplifing this. I don't 
  think what you posted would work, because you're using String's map 
  function, but it did steer me in the right direction. Do you agree?

yes, this is correct. :) Thanks.


- Yan


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36163/#review93454
---


On July 9, 2015, 2:39 p.m., Robert Zuljevic wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/36163/
 ---
 
 (Updated July 9, 2015, 2:39 p.m.)
 
 
 Review request for samza.
 
 
 Repository: samza
 
 
 Description
 ---
 
 Removed trailing whitespaces
 
 
 Diffs
 -
 
   samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java 
 7a588ebc99b5f07d533e48e10061a3075a63665a 
   
 samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java
  249b8ae3a904716ea51a2b27c7701ac30d13b854 
   samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala 
 8ee034a642a13af1b0fdb4ebbb3b2592bb8e2be1 
   samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
 aeba61a95371faaba23c97d896321b8d95467f87 
   
 samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala
  097f41062f3432ae9dc9a9737b48ed7b2f709f20 
   
 samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala 
 8d54c4639fc226b34e64915935c1d90e5917af2e 
   
 samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
  d9ae187c7707673fe15c8cb7ea854e02c4a89a54 
   
 samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
  35086f54f526d5d88ad3bc312b71fce40260e7c6 
   samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java 
 b063366f0f60e401765a000fa265c59dee4a461e 
   
 samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
  1e936b42a5b9a4bfb43766c17847b2947ebdb21d 
 
 Diff: https://reviews.apache.org/r/36163/diff/
 
 
 Testing
 ---
 
 I wasn't really sure what kind of test (unit test / integration test) I 
 should make here, so any pointers would be greatly appreaciated! I tested the 
 change with the unit/integration tests already available.
 
 
 Thanks,
 
 Robert Zuljevic
 




Re: Review Request 36163: SAMZA-690: changelog topic creation should not be in the container code

2015-07-29 Thread Yan Fang


 On July 25, 2015, 1:33 a.m., Yan Fang wrote:
  samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala,
   lines 121-125
  https://reviews.apache.org/r/36163/diff/2/?file=1003380#file1003380line121
 
  this can be simplified a little:
  
  for ((storeName, systemStream) - changeLogSystemStreams) {
val systemAdmin = config
  .getSystemFactory(systemStream.getName)
  .getOrElse(throw new SamzaException(A stream uses system %s, 
  which is missing from the configuration. format 
  systemName)).map(Util.getObj[SystemFactory](_)).getOrElse(systemStream.getSystem,
throw new SamzaException(Unable to get systemAdmin for store 
   + storeName +  and systemStream + systemStream))


  Then  do not need line 104-109, line 117-119.
 
 Robert Zuljevic wrote:
 Did you mean something like this?
 
 for ((storeName, systemStream) - changeLogSystemStreams) {
   val systemAdmin = Util.getObj[SystemFactory](config
 .getSystemFactory(systemStream.getSystem)
 .getOrElse(throw new SamzaException(A stream uses system %s, 
 which is missing from the configuration. format systemStream.getSystem))
 ).getAdmin(systemStream.getSystem, config)
 
   systemAdmin.createChangelogStream(systemStream.getStream, 
 changeLogPartitions)
 }
 
 
 This is the only way I could thought of for simplifing this. I don't 
 think what you posted would work, because you're using String's map function, 
 but it did steer me in the right direction. Do you agree?

yes, you are right. This is what I was thinking. Not tested the code though. :)


- Yan


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36163/#review93019
---


On July 9, 2015, 2:39 p.m., Robert Zuljevic wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/36163/
 ---
 
 (Updated July 9, 2015, 2:39 p.m.)
 
 
 Review request for samza.
 
 
 Repository: samza
 
 
 Description
 ---
 
 Removed trailing whitespaces
 
 
 Diffs
 -
 
   samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java 
 7a588ebc99b5f07d533e48e10061a3075a63665a 
   
 samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java
  249b8ae3a904716ea51a2b27c7701ac30d13b854 
   samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala 
 8ee034a642a13af1b0fdb4ebbb3b2592bb8e2be1 
   samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
 aeba61a95371faaba23c97d896321b8d95467f87 
   
 samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala
  097f41062f3432ae9dc9a9737b48ed7b2f709f20 
   
 samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala 
 8d54c4639fc226b34e64915935c1d90e5917af2e 
   
 samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
  d9ae187c7707673fe15c8cb7ea854e02c4a89a54 
   
 samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
  35086f54f526d5d88ad3bc312b71fce40260e7c6 
   samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java 
 b063366f0f60e401765a000fa265c59dee4a461e 
   
 samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
  1e936b42a5b9a4bfb43766c17847b2947ebdb21d 
 
 Diff: https://reviews.apache.org/r/36163/diff/
 
 
 Testing
 ---
 
 I wasn't really sure what kind of test (unit test / integration test) I 
 should make here, so any pointers would be greatly appreaciated! I tested the 
 change with the unit/integration tests already available.
 
 
 Thanks,
 
 Robert Zuljevic
 




Re: [DISCUSS] Release 0.10.0

2015-07-29 Thread Yan Fang
Actually, I also want to include a few patch-available features, especially:

1. broadcast stream (SAMZA-676)
- waiting for review

2. graphite support (SAMZA-340)
3. meter and histogram (SAMZA-683)
4. utility (SAMZA-401)
- 2,3,4 belong to Luis, if he does not have time to update, since they
only need some small changes, we can edit it and get +1 from another
committer.

5. hdfs producer (SAMZA-693)
- I am reviewing.

6. upgrade yarn to 2.7.1 (SAMZA-563)
   - though I am reviewing, this ticket is negotiable if we want to put
into the 0.10.0 release. If we do not, I think, when users enable the
worker-persisting and container-persisting features, Samza will not be able
to handle it. (Some classes are only available after yarn 2.5.0 while Samza
currently only support yarn 2.4.0)

7. others: scrooge, class loader isolation, etc.
- those are waiting for reviewing too.

My opinion is that, if we can clean up all the patch-available tickets, it
will be great. Most of them have been already reviewed more than once. So I
think it should not be very time-consuming to have them in the 0.10.0
release.

What do you think?

Of course, another must-have is the bug-fix of the Stream Appender. :)

Thanks,

Fang, Yan
yanfang...@gmail.com

On Tue, Jul 28, 2015 at 10:27 PM, Roger Hoover roger.hoo...@gmail.com
wrote:

 Thanks, Yi.

 I propose that we also include SAMZA-741 for Elasticsearch versioning
 support with the new ES producer.  I think it's very close to being merged.

 Roger


 On Tue, Jul 28, 2015 at 10:08 PM, Yi Pan nickpa...@gmail.com wrote:

  Hi, all,
 
  I want to start the discussion on the release schedule for 0.10.0. There
  are a few important features that we plan to release in 0.10.0 and I want
  to start this thread s.t. we can agree on what to include in 0.10.0
  release.
 
  There are the following main features added in 0.10.0:
  - RocksDB TTL support
  - Add CoordinatorStream and disable CheckpointManager
  - Elasticsearch Producer
  - Host affinity
  And other 0.10.0 tickets:
 
 
 https://issues.apache.org/jira/issues/?jql=project%20%3D%20SAMZA%20AND%20status%20in%20(%22In%20Progress%22%2C%20Resolved%2C%20Closed)%20AND%20fixVersion%20%3D%200.10.0
 
  I propose to cut a 0.10.0 release after we get the following issues
  resolved:
  - SAMZA-615: Migrate checkpoint from checkpoint topic to Coordinator
 stream
  - SAMZA-617: YARN host affinity in Samza
 
  Thoughts?
 
  Thanks!
 
  -Yi
 



Re: Review Request 34974: SAMZA-676: implement broadcast stream

2015-07-29 Thread Yan Fang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34974/
---

(Updated July 29, 2015, 10:49 p.m.)


Review request for samza.


Changes
---

remove whiltespaces
update to latest master


Bugs: SAMZA-676
https://issues.apache.org/jira/browse/SAMZA-676


Repository: samza


Description
---

1. added offsetComparator method in SystemAdmin Interface

2. added task.global.inputs config

3. rewrote Grouper classes using Java; allows to assign global streams during 
grouping

4. used LinkedHashSet instead of HashSet in CoordinatorStreamSystemConsumer to 
preserve messages order

5. added taskNames to the offsets in OffsetManager

6. allowed to assign one SSP to multiple taskInstances

7. skipped already-processed messages in RunLoop

8. unit tests for all changes


Diffs (updated)
-

  checkstyle/import-control.xml 6654319 
  docs/learn/documentation/versioned/container/samza-container.md 9f46414 
  docs/learn/documentation/versioned/jobs/configuration-table.html ea73b40 
  samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java 7a588eb 
  
samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java
 249b8ae 
  samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java 
PRE-CREATION 
  
samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupByPartition.java
 PRE-CREATION 
  
samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupByPartitionFactory.java
 PRE-CREATION 
  
samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartition.java
 PRE-CREATION 
  
samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartitionFactory.java
 PRE-CREATION 
  samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala 
20e5d26 
  samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala e4b14f4 
  samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala c292ae4 
  samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
27b2517 
  samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala 
c5a5ea5 
  
samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala 
9dc7051 
  
samza-core/src/main/scala/org/apache/samza/container/grouper/stream/GroupByPartition.scala
 44e95fc 
  
samza-core/src/main/scala/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartition.scala
 3c0acad 
  
samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala
 097f410 
  samza-core/src/test/java/org/apache/samza/config/TestTaskConfigJava.java 
PRE-CREATION 
  
samza-core/src/test/java/org/apache/samza/container/grouper/stream/TestGroupByPartition.java
 PRE-CREATION 
  
samza-core/src/test/java/org/apache/samza/container/grouper/stream/TestGroupBySystemStreamPartition.java
 PRE-CREATION 
  samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala 
8d54c46 
  samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala 
64a5844 
  samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala 
84fdeaa 
  samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala 
7caad28 
  
samza-core/src/test/scala/org/apache/samza/container/grouper/stream/GroupByTestBase.scala
 a14169b 
  
samza-core/src/test/scala/org/apache/samza/container/grouper/stream/TestGroupByPartition.scala
 74daf72 
  
samza-core/src/test/scala/org/apache/samza/container/grouper/stream/TestGroupBySystemStreamPartition.scala
 deb3895 
  
samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala 
4097ac7 
  
samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemAdmin.java
 1fd5dd3 
  
samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala 
35086f5 
  
samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
 de00320 
  
samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
 1629035 
  
samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala
 2a84328 
  samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java 
b063366 
  
samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
 1e936b4 

Diff: https://reviews.apache.org/r/34974/diff/


Testing
---


Thanks,

Yan Fang



Re: no new topic created on Kafka

2015-07-28 Thread Yan Fang
 task.class=samza.http.demo.task.HttpDemoParserStreamTask ...

you are not using the StateStream class...

Fang, Yan
yanfang...@gmail.com

On Tue, Jul 28, 2015 at 11:48 AM, Job-Selina Wu swucaree...@gmail.com
wrote:

 Hi, Yan

 I like to correct my previous comment, when I comment out
 systems.kafka.streams.http-demo.samza.offset.default=oldest
 systems.kafka.streams.http-demo.samza.reset.offset=true

 *the logger is not show at *at samza-container-0.log, but it make sense.


 Sincerely,
 Seina

 On Tue, Jul 28, 2015 at 11:30 AM, Job-Selina Wu swucaree...@gmail.com
 wrote:

  Hi, Yan:
 
Thanks a lot for your reply.
   I tried to comment out
 systems.kafka.http-demo.samza.offset.default=oldest
  and then I tried to comment out
  systems.kafka.streams.http-demo.samza.offset.default=oldest
  systems.kafka.streams.http-demo.samza.reset.offset=true
 
   The result is same as before.  1. the checkoutpoint topic was created,
 2.
  the log created by Logger can be found at /samza-container-0.log. 3. no
  exception is at samza-container-0.log.
 
 I guess something conflict between HttpDemoParserStreamTask and
  HttpDemoStatsStreamTask? Is any resource registered by
  HttpDemoParserStreamTask and then HttpDemoStatsStreamTask can not
 recreate
  a topic?
 
  Sincerely,
  Selina
 
  On Tue, Jul 28, 2015 at 9:37 AM, Yan Fang yanfang...@gmail.com wrote:
 
  Can you comment out
 systems.kafka.http-demo.samza.offset.default=oldest
  to see how it works? This seems not a correct property.
 
  Thanks,
 
  Fang, Yan
  yanfang...@gmail.com
 
  On Mon, Jul 27, 2015 at 5:54 PM, Job-Selina Wu swucaree...@gmail.com
  wrote:
 
   Hi, Dear All:
  
  I have two Tasks at Samza. HttpDemoParserStreamTask and
   HttpDemoStatsStreamTask. They are almost same, except the output topic
  name
   is different and the task name are different at properties file. I am
   wondering how should I debug on it?
  
  More details are list below.
  
  All your help is highly appreciated.
  
   Sincerely,
   Selina
  
   Currently HttpDemoParserStreamTask run well.
   However HttpDemoStatsStreamTask can generate the log correctly
  withouot
   Exception at
  
  
 
 deploy/yarn/logs/userlogs/application_1438043584310_0001/container_1438043584310_0001_01_02/
   samza-container-0.log
  
   The last record as below is right, however there is no topic 
   demo-stats-temp was created.
   --
  
   2015-07-27 17:34:48 HttpDemoParserStreamTask [INFO]
   key=CAESEAbQ1pC2TBvb-4SLDjMqsZ8: message={timestamp:2015-07-27
  
  
 
 14:30:02:987,date:06-21-2015,id:CAESEAbQ1pC2TBvb-4SLDjMqsZ8,ip:22.231.113.69,browser:Chrome,postalCode:95131,url:
  
 
 http://somthing.sample2.com/whatever?someinfo\u003dwekio2icicicnenneniadidi
  
 
 ,language:ENG,mobileBrand:Samsung,carrierName:Tmobile,deviceName:Samsung
   Galaxy S6,operationSystem:Android
  
  
 
 5.0.2,screenSize:5.1-inch,resolution:1440p,campaignId:65681290456292569,count:5607}
  
  
   ---The demo-stats.properties
   files-
  
   # Job
   job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
   job.name=demo-stats-tmp
 
  
  
  
  
 
 task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory
task.checkpoint.system=kafka
# Normally, this would be 3, but we have only one broker.
task.checkpoint.replication.factor=1
  
# YARN
  
  
 
 yarn.package.path=file:///Users/selina/IdeaProjects/samza-Demo/target/hello-samza-0.9.1-dist.tar.gz
  
# Task
task.class=samza.http.demo.task.HttpDemoParserStreamTask
task.inputs=kafka.http-demo
  
# Serializers
  
  
 
 serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
  
# Kafka System
  
  
 
 systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
systems.kafka.samza.msg.serde=string
  
systems.kafka.samza.key.serde=string
systems.kafka.consumer.zookeeper.connect=localhost:2181/
systems.kafka.producer.bootstrap.servers=localhost:9092
  
#stream from begining
#systems.kafka.consumer.auto.offset.reset=smallest
   #http-demo from the oldest
systems.kafka.http-demo.samza.offset.default=oldest
   # all stream from the oldest
systems.kafka.streams.http-demo.samza.offset.default=oldest
systems.kafka.streams.http-demo.samza.reset.offset=true
  
  
  
   HttpDemoStatsStreamTask
   class
  
   public class HttpDemoStatsStreamTask implements StreamTask  {
  
   //output topic
   private static final SystemStream OUTPUT_STREAM = new
   SystemStream(kafka, demo-stats-temp);
   Logger logger =
  LoggerFactory.getLogger(HttpDemoStatsStreamTask.class);
  
   @SuppressWarnings(unchecked)
   @Override
   public void process(IncomingMessageEnvelope envelope,
   MessageCollector collector, TaskCoordinator coordinator) throws
   Exception

Re: kafka producer failed

2015-07-26 Thread Yan Fang
You are giving the Kafka code and the Samza log, which does not make sense
actually...

Fang, Yan
yanfang...@gmail.com

On Sat, Jul 25, 2015 at 10:31 PM, Job-Selina Wu swucaree...@gmail.com
wrote:

 Hi, Yi, Navina and Benjamin:

 Thanks a lot to spending your time to help me this issue.

 The configuration is below. Do you think it could be the configuration
 problem?
 I tried props.put(request.required.acks, 0); and
  props.put(request.required.acks, 1); both did not work.

 
 Properties props = new Properties();

 private final ProducerString, String producer;

 public KafkaProducer() {
 //BOOTSTRAP.SERVERS
 props.put(metadata.broker.list, localhost:9092);
 props.put(bootstrap.servers, localhost:9092 );
 props.put(serializer.class, kafka.serializer.StringEncoder);
 props.put(partitioner.class, com.kafka.SimplePartitioner);
 props.put(request.required.acks, 0);

 ProducerConfig config = new ProducerConfig(props);

 producer = new ProducerString, String(config);
 }

 --

  Exceptions at log are list below.

 Your help is highly appreciated.

 Sincerely,
 Selina Wu


 Exceptions at log

 deploy/yarn/logs/userlogs/application_1437886672214_0001/container_1437886672214_0001_01_01/samza-application-master.log

 2015-07-25 22:03:52 Shell [DEBUG] Failed to detect a valid hadoop home
 directory
 *java.io.IOException: HADOOP_HOME or hadoop.home.dir are not set*.
at org.apache.hadoop.util.Shell.checkHadoopHome(Shell.java:265)
at org.apache.hadoop.util.Shell.clinit(Shell.java:290)
at org.apache.hadoop.util.StringUtils.clinit(StringUtils.java:76)
at
 org.apache.hadoop.yarn.conf.YarnConfiguration.clinit(YarnConfiguration.java:517)
at
 org.apache.samza.job.yarn.SamzaAppMaster$.main(SamzaAppMaster.scala:77)
at org.apache.samza.job.yarn.SamzaAppMaster.main(SamzaAppMaster.scala)
 2015-07-25 22:03:52 Shell [DEBUG] setsid is not available on this
 machine. So not using it.
 2015-07-25 22:03:52 Shell [DEBUG] setsid exited with exit code 0
 2015-07-25 22:03:52 ClientHelper [INFO] trying to connect to RM
 127.0.0.1:8032
 2015-07-25 22:03:52 AbstractService [DEBUG] Service:
 org.apache.hadoop.yarn.client.api.impl.YarnClientImpl entered state
 INITED
 2015-07-25 22:03:52 RMProxy [INFO] Connecting to ResourceManager at
 /127.0.0.1:8032
 2015-07-25 22:03:52 MutableMetricsFactory [DEBUG] field
 org.apache.hadoop.metrics2.lib.MutableRate
 org.apache.hadoop.security.UserGroupInformation$UgiMetrics.loginSuccess
 with annotation
 @org.apache.hadoop.metrics2.annotation.Metric(sampleName=Ops, about=,
 always=false, type=DEFAULT, value=[Rate of successful kerberos logins
 and latency (milliseconds)], valueName=Time)
 2015-07-25 22:03:52 MutableMetricsFactory [DEBUG] field
 org.apache.hadoop.metrics2.lib.MutableRate
 org.apache.hadoop.security.UserGroupInformation$UgiMetrics.loginFailure
 with annotation
 @org.apache.hadoop.metrics2.annotation.Metric(sampleName=Ops, about=,
 always=false, type=DEFAULT, value=[Rate of failed kerberos logins and
 latency (milliseconds)], valueName=Time)
 2015-07-25 22:03:52 MutableMetricsFactory [DEBUG] field
 org.apache.hadoop.metrics2.lib.MutableRate
 org.apache.hadoop.security.UserGroupInformation$UgiMetrics.getGroups
 with annotation
 @org.apache.hadoop.metrics2.annotation.Metric(sampleName=Ops, about=,
 always=false, type=DEFAULT, value=[GetGroups], valueName=Time)
 2015-07-25 22:03:52 MetricsSystemImpl [DEBUG] UgiMetrics, User and
 group related metrics
 2015-07-25 22:03:52 KerberosName [DEBUG] Kerberos krb5 configuration
 not found, setting default realm to empty
 2015-07-25 22:03:52 Groups [DEBUG]  Creating new Groups object
 2015-07-25 22:03:52 NativeCodeLoader [DEBUG] Trying to load the
 custom-built native-hadoop library...
 2015-07-25 22:03:52 NativeCodeLoader [DEBUG] Failed to load
 native-hadoop with error: java.lang.UnsatisfiedLinkError: no hadoop in
 java.library.path
 2015-07-25 22:03:52 NativeCodeLoader [DEBUG]

 java.library.path=/home//Library/Java/Extensions:/Library/Java/Extensions:/Network/Library/Java/Extensions:/System/Library/Java/Extensions:/usr/lib/java:.
 2015-07-25 22:03:52 NativeCodeLoader [WARN] Unable to load
 native-hadoop library for your platform... using builtin-java classes
 where applicable


 2015-07-25 22:03:53 KafkaCheckpointManager [WARN] While trying to
 validate topic __samza_checkpoint_ver_1_for_demo-parser7_1:
 *kafka.common.LeaderNotAvailableException. Retrying.*
 2015-07-25 22:03:53 KafkaCheckpointManager [DEBUG] Exception detail:
 kafka.common.LeaderNotAvailableException
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at
 sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at
 sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at 

Re: kafka producer failed

2015-07-26 Thread Yan Fang
You may check the Kafka.log to see what's inside 

Yan Fang

 On Jul 26, 2015, at 2:01 AM, Job-Selina Wu swucaree...@gmail.com wrote:
 
 The exception is below:
 
 kafka.common.FailedToSendMessageException: Failed to send messages after 3
 tries.
 kafka.common.FailedToSendMessageException: Failed to send messages after 3
 tries.
 at
 kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
 at kafka.producer.Producer.send(Producer.scala:77)
 at kafka.javaapi.producer.Producer.send(Producer.scala:33)
 at http.server.HttpDemoHandler.doDemo(HttpDemoHandler.java:71)
 at http.server.HttpDemoHandler.handle(HttpDemoHandler.java:32)
 at
 org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:97)
 at org.eclipse.jetty.server.Server.handle(Server.java:498)
 at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:265)
 at
 org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:243)
 at
 org.eclipse.jetty.io.AbstractConnection$2.run(AbstractConnection.java:540)
 at
 org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:610)
 at
 org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:539)
 at java.lang.Thread.run(Thread.java:745)
 
 On Sun, Jul 26, 2015 at 12:42 AM, Job-Selina Wu swucaree...@gmail.com
 wrote:
 
 Hi, Yan:
 
  My Http Server send message to Kafka.
 
 The server.log at deploy/kafka/logs/server.log shown :
 
 [2015-07-26 00:33:51,910] INFO Closing socket connection to /127.0.0.1. 
 (kafka.network.Processor)
 [2015-07-26 00:33:51,984] INFO Closing socket connection to /127.0.0.1. 
 (kafka.network.Processor)
 [2015-07-26 00:33:52,011] INFO Closing socket connection to /127.0.0.1. 
 (kafka.network.Processor)
 
 .
 
 
 Your help is highly appreciated.
 
 Sincerely,
 
 Selina
 
 
 On Sun, Jul 26, 2015 at 12:01 AM, Yan Fang yanfang...@gmail.com wrote:
 
 You are giving the Kafka code and the Samza log, which does not make sense
 actually...
 
 Fang, Yan
 yanfang...@gmail.com
 
 On Sat, Jul 25, 2015 at 10:31 PM, Job-Selina Wu swucaree...@gmail.com
 wrote:
 
 Hi, Yi, Navina and Benjamin:
 
Thanks a lot to spending your time to help me this issue.
 
The configuration is below. Do you think it could be the
 configuration
 problem?
 I tried props.put(request.required.acks, 0); and
 props.put(request.required.acks, 1); both did not work.
 
 
 Properties props = new Properties();
 
private final ProducerString, String producer;
 
public KafkaProducer() {
//BOOTSTRAP.SERVERS
props.put(metadata.broker.list, localhost:9092);
props.put(bootstrap.servers, localhost:9092 );
props.put(serializer.class, kafka.serializer.StringEncoder);
props.put(partitioner.class, com.kafka.SimplePartitioner);
props.put(request.required.acks, 0);
 
ProducerConfig config = new ProducerConfig(props);
 
producer = new ProducerString, String(config);
}
 
 --
 
 Exceptions at log are list below.
 
 Your help is highly appreciated.
 
 Sincerely,
 Selina Wu
 
 
 Exceptions at log
 deploy/yarn/logs/userlogs/application_1437886672214_0001/container_1437886672214_0001_01_01/samza-application-master.log
 
 2015-07-25 22:03:52 Shell [DEBUG] Failed to detect a valid hadoop home
 directory
 *java.io.IOException: HADOOP_HOME or hadoop.home.dir are not set*.
   at org.apache.hadoop.util.Shell.checkHadoopHome(Shell.java:265)
   at org.apache.hadoop.util.Shell.clinit(Shell.java:290)
   at org.apache.hadoop.util.StringUtils.clinit(StringUtils.java:76)
   at
 org.apache.hadoop.yarn.conf.YarnConfiguration.clinit(YarnConfiguration.java:517)
   at
 org.apache.samza.job.yarn.SamzaAppMaster$.main(SamzaAppMaster.scala:77)
   at
 org.apache.samza.job.yarn.SamzaAppMaster.main(SamzaAppMaster.scala)
 2015-07-25 22:03:52 Shell [DEBUG] setsid is not available on this
 machine. So not using it.
 2015-07-25 22:03:52 Shell [DEBUG] setsid exited with exit code 0
 2015-07-25 22:03:52 ClientHelper [INFO] trying to connect to RM
 127.0.0.1:8032
 2015-07-25 22:03:52 AbstractService [DEBUG] Service:
 org.apache.hadoop.yarn.client.api.impl.YarnClientImpl entered state
 INITED
 2015-07-25 22:03:52 RMProxy [INFO] Connecting to ResourceManager at
 /127.0.0.1:8032
 2015-07-25 22:03:52 MutableMetricsFactory [DEBUG] field
 org.apache.hadoop.metrics2.lib.MutableRate
 org.apache.hadoop.security.UserGroupInformation$UgiMetrics.loginSuccess
 with annotation
 @org.apache.hadoop.metrics2.annotation.Metric(sampleName=Ops, about=,
 always=false, type=DEFAULT, value=[Rate of successful kerberos logins
 and latency (milliseconds)], valueName=Time)
 2015-07-25 22:03:52 MutableMetricsFactory [DEBUG] field
 org.apache.hadoop.metrics2.lib.MutableRate
 org.apache.hadoop.security.UserGroupInformation$UgiMetrics.loginFailure
 with annotation
 @org.apache.hadoop.metrics2.annotation.Metric(sampleName=Ops, about=,
 always=false, type=DEFAULT, value=[Rate of failed kerberos

Re: Can I get an example of using the ElasticSearch producer?

2015-07-24 Thread Yan Fang
Hi guys,

Thank you for being interested in this new producer. The producer is only
in the master branch, so if you are using the 0.9.1 version, you wont get
this support.

If by any chance you are using the latest version,

1) here are some configuration
http://samza.apache.org/learn/documentation/latest/jobs/configuration-table.html
, see the Using Elasticsearch for output streams part.

2) check this patch https://issues.apache.org/jira/browse/SAMZA-740 as
well. We haven't merged to hello-samza, but it will give you some idea how
to implement. :)

Thanks,

Fang, Yan
yanfang...@gmail.com

On Fri, Jul 24, 2015 at 1:20 PM, Job-Selina Wu swucaree...@gmail.com
wrote:

 Dear All:

 I like to have an example of using the ElasticSearch producer also.


 Thanks
 Selina
 swucaree...@gmail.com




 On Fri, Jul 24, 2015 at 1:03 PM, Woessner, Leo leo.woess...@pearson.com
 wrote:

  Can I get an example of using the ElasticSearch producer?
 
  leo.woess...@pearson.com
 



Re: Samza: can not produce new data to kafka

2015-07-24 Thread Yan Fang
{quote}
 I did not set auto.create.topics.enable anywhere
{quote}

Fine. Then its default to true. No worries.

{quote}
My job is listed as below. However I am wondering how can I know if my
method public void* process*(IncomingMessageEnvelope envelope,
MessageCollector collector, TaskCoordinator coordinator) was run or not.
{quote}

If you have log enabled (from the code, you did), you can check the
contain's log to see if it has the output. Assuming you are using the local
yarn like what hello-samza provides, you should be able to check the logs
in deploy/yarn/userlogs/application_Id.

If you use print.out method, you can see the result in the
deploy/yarn/userlogs/application_Id 's sysout file (if the StreamTask)
works.

If it does not work, you can check the logs in
deploy/yarn/userlogs/application_Id as well to see the exceptions if there
is any.

Thanks,

Fang, Yan
yanfang...@gmail.com

On Fri, Jul 24, 2015 at 1:45 PM, Job-Selina Wu swucaree...@gmail.com
wrote:

 Hi, Yan and Shadi:

 I made a mistake.  Actually, there is no log at /tmp/kafka-logs
 created by   logger.info(key=+key+: message=+message); .  The log I
 provided actually is log for input topic http-demo at
 /tmp/kafka-logs/http-demo-0

 My job is listed as below. However I am wondering how can I know if
 my method public void* process*(IncomingMessageEnvelope envelope,
 MessageCollector collector, TaskCoordinator coordinator) was run or not.

 I manually create topic demo-duplicate by command line, otherwise
 it will be created by samza code.

 I checked I did not set auto.create.topics.enable anywhere. Attached
 is my properties file for Kafka


Your help is highly appreciated

 Sincerely,
 Selina

 [image: Inline image 1]




 On Fri, Jul 24, 2015 at 11:56 AM, Yan Fang yanfang...@gmail.com wrote:

 The code and the property seem good to me. collector.send(new
 OutgoingMessageEnvelope(OUTPUT_STREAM, message));should work. So I am
 curious if you accidentally disabled auto.create.topics.enable  ...Can you
 also try to send msgs from cmd line to demo-duplicate to see if it gets
 anything.

 Let me know if it works.

 Thanks,

 Fang, Yan
 yanfang...@gmail.com

 On Fri, Jul 24, 2015 at 11:48 AM, Job-Selina Wu swucaree...@gmail.com
 wrote:

  Hi, Shadi:
 
Thans a lot for your reply.
  1. There is no error log at Kafka and Samza
 
  2.  this line   logger.info(key=+key+: message=+message);  write
  log correctly as below:
 
  [image: Inline image 1]
 
  This are my last two message with right count
 
  3. I tried both way below, none of them create topic, but I will try it
  again.
 
  collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, outgoingMap));
 
  //collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, message));
 
  4. I wrote a topic call http-demo to Kafka as my input, and the
 content
  can be show with command line below, so the Kafka should be OK.
  deploy/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181
  --from-beginning --topic http-demo
 
  Your help is highly appreciated.
 
  Sincerely,
  Selina
 
 
 
 
  On Fri, Jul 24, 2015 at 9:29 AM, Shadi Noghabi 
  snogh...@linkedin.com.invalid wrote:
 
  Selina,
 
  You should probably check a few things
  1. Your log files to see if you have any errors. Also, does you job
 fail
  or
  continues running?
  2. Does this line   logger.info(key=+key+: message=+message); 
  write
  any logs?
  3. This might not be the only reason, but you are sending messages of
  type MapString,
  String. However, in your config file, you defined 
  systems.kafka.samza.msg.serde=string which expects the message to be a
  String.
 
 
  Shadi
 
 
  On Thu, Jul 23, 2015 at 6:33 PM, Job-Selina Wu swucaree...@gmail.com
  wrote:
 
   Hi,  All
  
I am trying to write my first StreamTask class. I have a topic
 at
   Kafka called http-demo. I like to read the topic and write it to
  another
   topic called demo-duplicate
  
   Howeven there is not topic written to Kafka.
  
   My properties file and StreamTask are below.  Can anyone told me
  what
   is the bug?
   BTW, if I set checkpoint or Metrics at properties file. the
 topic of
   checkpoint and metrics could be written to Kafka.  And the content of
input topic -- http-demo could be show correctly.
  
   Your help is highly appreciated.
  
   Sincerely,
   Selina
  
  
   - - -- - - - - -
   # Job
   job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
   job.name=demo-parser
 
  
   # YARN
  
  
 
 yarn.package.path=file:///Users/selina/IdeaProjects/samza-Demo/target/hello-samza-0.9.1-dist.tar.gz
  
   # Task
   task.class=samza.http.demo.task.HttpDemoParserStreamTask
   task.inputs=kafka.http-demo
  
   # Serializers
  
  
 
 serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
  
   # Kafka System
  
  
 
 systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
   systems.kafka.samza.msg.serde=string

Re: Samza: can not produce new data to kafka

2015-07-24 Thread Yan Fang
The code and the property seem good to me. collector.send(new
OutgoingMessageEnvelope(OUTPUT_STREAM, message));should work. So I am
curious if you accidentally disabled auto.create.topics.enable  ...Can you
also try to send msgs from cmd line to demo-duplicate to see if it gets
anything.

Let me know if it works.

Thanks,

Fang, Yan
yanfang...@gmail.com

On Fri, Jul 24, 2015 at 11:48 AM, Job-Selina Wu swucaree...@gmail.com
wrote:

 Hi, Shadi:

   Thans a lot for your reply.
 1. There is no error log at Kafka and Samza

 2.  this line   logger.info(key=+key+: message=+message);  write
 log correctly as below:

 [image: Inline image 1]

 This are my last two message with right count

 3. I tried both way below, none of them create topic, but I will try it
 again.

 collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, outgoingMap));

 //collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, message));

 4. I wrote a topic call http-demo to Kafka as my input, and the content
 can be show with command line below, so the Kafka should be OK.
 deploy/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181
 --from-beginning --topic http-demo

 Your help is highly appreciated.

 Sincerely,
 Selina




 On Fri, Jul 24, 2015 at 9:29 AM, Shadi Noghabi 
 snogh...@linkedin.com.invalid wrote:

 Selina,

 You should probably check a few things
 1. Your log files to see if you have any errors. Also, does you job fail
 or
 continues running?
 2. Does this line   logger.info(key=+key+: message=+message); 
 write
 any logs?
 3. This might not be the only reason, but you are sending messages of
 type MapString,
 String. However, in your config file, you defined 
 systems.kafka.samza.msg.serde=string which expects the message to be a
 String.


 Shadi


 On Thu, Jul 23, 2015 at 6:33 PM, Job-Selina Wu swucaree...@gmail.com
 wrote:

  Hi,  All
 
   I am trying to write my first StreamTask class. I have a topic at
  Kafka called http-demo. I like to read the topic and write it to
 another
  topic called demo-duplicate
 
  Howeven there is not topic written to Kafka.
 
  My properties file and StreamTask are below.  Can anyone told me
 what
  is the bug?
  BTW, if I set checkpoint or Metrics at properties file. the topic of
  checkpoint and metrics could be written to Kafka.  And the content of
   input topic -- http-demo could be show correctly.
 
  Your help is highly appreciated.
 
  Sincerely,
  Selina
 
 
  - - -- - - - - -
  # Job
  job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
  job.name=demo-parser

 
  # YARN
 
 
 yarn.package.path=file:///Users/selina/IdeaProjects/samza-Demo/target/hello-samza-0.9.1-dist.tar.gz
 
  # Task
  task.class=samza.http.demo.task.HttpDemoParserStreamTask
  task.inputs=kafka.http-demo
 
  # Serializers
 
 
 serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
 
  # Kafka System
 
 
 systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
  systems.kafka.samza.msg.serde=string
  systems.kafka.samza.key.serde=string
  systems.kafka.consumer.zookeeper.connect=localhost:2181/
  systems.kafka.consumer.auto.offset.reset=largest
  systems.kafka.producer.bootstrap.servers=localhost:9092
  - - -- - - - - -
 
  My StreamTask class is simple also
 
  -
 
  /**
   *
   * Read data from http-demo topic and write it back to demo-duplicate
   */
  public class HttpDemoParserStreamTask implements StreamTask {
 
  private static final SystemStream OUTPUT_STREAM = new
  SystemStream(kafka, demo-duplicate);
  Logger logger =
  LoggerFactory.getLogger(HttpDemoParserStreamTask.class);
 
  @SuppressWarnings(unchecked)
  @Override
  public void process(IncomingMessageEnvelope envelope,
 MessageCollector
  collector, TaskCoordinator coordinator) throws Exception {
 
  String key = (String) envelope.getKey();
  String message = envelope.getMessage().toString();
  logger.info(key=+key+: message=+message);
 
  MapString, String outgoingMap = (MapString, String)
  (envelope.getMessage());
  collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM,
  outgoingMap));
  //collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM,
  message));
  }
 
  }
 
  ---
 





Re: Security on YARN

2015-07-24 Thread Yan Fang
Hi Chen Song,

If you can work on this issue, it will be great.

1. the related ticket is https://issues.apache.org/jira/browse/SAMZA-727

2. most of the change will happen in Yarn AM and Yarn client parts. The
code sits in the samza-yarn package
https://github.com/apache/samza/tree/master/samza-yarn/src/main/scala/org/apache/samza/job/yarn
.

3. when you implement this, make sure it does not affect the non-secure
Yarn implementation. Because non-secure cluster implementation has been
proved working, while the secure cluster may have the issue as Yi Pan
mentioned, For a long-running
Samza job, it does not work. We will need a way to refresh the Kerberos ticket
periodically, which is not supported yet.  But I am happy to see at least
we have some support for secure cluster. We can figure the issue out later.

If you want to have some help in understanding the existing code, let me
know.

Thanks,

Fang, Yan
yanfang...@gmail.com

On Fri, Jul 24, 2015 at 7:00 PM, Chen Song chen.song...@gmail.com wrote:

 Can someone give some context on this? I can volunteer myself and try
 working on this.

 Chen

 On Thu, Jul 2, 2015 at 4:29 AM, Qi Fu q...@talend.com wrote:

  Hi Yi  Yan,
 
  Many thanks for your information. I have created a jira for this:
  https://issues.apache.org/jira/browse/SAMZA-727
  I'm willing to test it if someone can work on this.
 
 
  -Qi
 
  
  From: Yi Pan nickpa...@gmail.com
  Sent: Thursday, July 2, 2015 1:38 AM
  To: dev@samza.apache.org
  Subject: Re: Security on YARN
 
  Hi, Yan,
 
  Your memory serves as well as mine. :) I remember that Chris and I
  discussed this Kerberos ticket expiration issue when we were brain
 storming
  on how to access HDFS data in Samza. At high-level, what happens is that
  the Kerberos ticket to access a secured Hadoop cluster is issued to Samza
  containers at the job start time, and will expire later. For a
 long-running
  Samza job, it does not work. We will need a way to refresh the Kerberos
  ticket periodically, which is not supported yet. Chris probably can chime
  in with more details.
 
  -Yi
 
  On Wed, Jul 1, 2015 at 4:08 PM, Yan Fang yanfang...@gmail.com wrote:
 
   Hi Qi,
  
   I think this is caused by the fact that Samza currently does not
 support
   Yarn with Kerberos. Feel free to open a ticket for this feature.
  
   But if my memory serves, there was an issue mentioned about the
 Kerberos.
   Seems when the Kerberos ticket expires, Samza will have some issues?
 Can
   not find the resource. Anyone remember this?
  
   Cheers,
  
   Fang, Yan
   yanfang...@gmail.com
  
   On Wed, Jul 1, 2015 at 3:41 AM, Qi Fu q...@talend.com wrote:
  
Hi all,
   
   
I'm testing Samza on YARN and I have encountered a problem on the
   security
setting of YARN (Kerberos). Here is the detail:
   
1. My cluster is secured by Kerberos, and I deploy my samza job from
  one
of the cluster.
   
   
2. My config file is in ~/.samza/conf/(yarn-site.xml, core-site.xml,
hdfs-site.xml)
   
   
3. The job is deployed successfully, and I can get the info such as:
   
ClientHelper [INFO] set package url to scheme: hdfs port: -1
  file:
/user/test/samzatest.tar.gz for application_1435680272316_0003
   
ClientHelper [INFO] set package size to 212924524 for
application_1435680272316_0003
   
   
   
I think the security setting is correct as it can get the file
 size
from HDFS.
   
   
4. But I get the error from YARN job manager as following:
   
   
Application application_1435680272316_0003 failed 2 times due to
 AM
Container for appattempt_1435680272316_0003_02 exited with
  exitCode:
-1000
   
For more detailed output, check application tracking page:
http://cdh-namenode:8088/proxy/application_1435680272316_0003/Then,
   click
on links to logs of each attempt.
   
Diagnostics: Failed on local exception: java.io.IOException:
org.apache.hadoop.security.AccessControlException: Client cannot
authenticate via:[TOKEN, KERBEROS]; Host Details : local host is:
talend-cdh-datanode8/62.210.141.237; destination host is:
talend-cdh-namenode:8020;
   
java.io.IOException: Failed on local exception: java.io.IOException:
org.apache.hadoop.security.AccessControlException: Client cannot
authenticate via:[TOKEN, KERBEROS]; Host Details : local host is:
cdh-datanode8/62.210.141.237; destination host is:
   cdh-namenode:8020;
   
at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:772)
   
..
   
   
   
Anyone knows how to solve this?
   
   
Qi FU
   
  
 



 --
 Chen Song



Re: Review Request 36163: SAMZA-690: changelog topic creation should not be in the container code

2015-07-24 Thread Yan Fang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36163/#review93019
---



samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala 
(line 80)
https://reviews.apache.org/r/36163/#comment147281

it should be config, not coordinatorSystemConfig because we need to update 
the config from the stream.



samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala 
(line 101)
https://reviews.apache.org/r/36163/#comment147282

its private because its only used by this class.

Also move this to the end of the class because it is good to put all the 
private methods together.



samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala 
(lines 121 - 125)
https://reviews.apache.org/r/36163/#comment147283

this can be simplified a little:

for ((storeName, systemStream) - changeLogSystemStreams) {
  val systemAdmin = config
.getSystemFactory(systemStream.getName)
.getOrElse(throw new SamzaException(A stream uses system %s, which 
is missing from the configuration. format 
systemName)).map(Util.getObj[SystemFactory](_)).getOrElse(systemStream.getSystem,
  throw new SamzaException(Unable to get systemAdmin for store  + 
storeName +  and systemStream + systemStream))
  
  
Then  do not need line 104-109, line 117-119.



samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala 
(line 126)
https://reviews.apache.org/r/36163/#comment147284

add logs for the case where the topic is already existied. Log the metadata 
information. (like the original createStream code does)


- Yan Fang


On July 9, 2015, 2:39 p.m., Robert Zuljevic wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/36163/
 ---
 
 (Updated July 9, 2015, 2:39 p.m.)
 
 
 Review request for samza.
 
 
 Repository: samza
 
 
 Description
 ---
 
 Removed trailing whitespaces
 
 
 Diffs
 -
 
   samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java 
 7a588ebc99b5f07d533e48e10061a3075a63665a 
   
 samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java
  249b8ae3a904716ea51a2b27c7701ac30d13b854 
   samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala 
 8ee034a642a13af1b0fdb4ebbb3b2592bb8e2be1 
   samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
 aeba61a95371faaba23c97d896321b8d95467f87 
   
 samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala
  097f41062f3432ae9dc9a9737b48ed7b2f709f20 
   
 samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala 
 8d54c4639fc226b34e64915935c1d90e5917af2e 
   
 samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
  d9ae187c7707673fe15c8cb7ea854e02c4a89a54 
   
 samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
  35086f54f526d5d88ad3bc312b71fce40260e7c6 
   samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java 
 b063366f0f60e401765a000fa265c59dee4a461e 
   
 samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
  1e936b42a5b9a4bfb43766c17847b2947ebdb21d 
 
 Diff: https://reviews.apache.org/r/36163/diff/
 
 
 Testing
 ---
 
 I wasn't really sure what kind of test (unit test / integration test) I 
 should make here, so any pointers would be greatly appreaciated! I tested the 
 change with the unit/integration tests already available.
 
 
 Thanks,
 
 Robert Zuljevic
 




Re: Review Request 36545: SAMZA-682 Refactor Coordinator stream messages

2015-07-23 Thread Yan Fang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36545/#review92825
---



samza-core/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java 
(line 43)
https://reviews.apache.org/r/36545/#comment147063

To be consistent, lets go with TaskName, not the String.



samza-core/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java 
(line 57)
https://reviews.apache.org/r/36545/#comment147064

Any reason that you do not want to use the TaskName class? TaskName seems 
fine here.



samza-core/src/main/java/org/apache/samza/container/LocalityManager.java (line 
50)
https://reviews.apache.org/r/36545/#comment147065

sourceSuffix is more descriptive.



samza-core/src/main/java/org/apache/samza/manager/CoordinatorStreamManager.java 
(line 20)
https://reviews.apache.org/r/36545/#comment147066

I think it makes sense that this class stays in its original package: 
samza-core/src/main/java/org/apache/samza/coordinator/stream . Because its only 
about the coordinatorStream, not the overall manager of the samza.



samza-core/src/main/java/org/apache/samza/manager/CoordinatorStreamManager.java 
(line 29)
https://reviews.apache.org/r/36545/#comment147071

a little more in the doc. This class is not really manages the 
coordinator stream, it is an abstract class that other stream managers want to 
extend.

Also, renaming it to AbstractCoordinatorStreamManager maybe helpful too.



samza-core/src/main/java/org/apache/samza/manager/CoordinatorStreamManager.java 
(line 65)
https://reviews.apache.org/r/36545/#comment147072

typo, sends



samza-core/src/main/java/org/apache/samza/manager/CoordinatorStreamManager.java 
(line 96)
https://reviews.apache.org/r/36545/#comment147073

no +



samza-core/src/main/java/org/apache/samza/manager/CoordinatorStreamManager.java 
(line 112)
https://reviews.apache.org/r/36545/#comment147074

I think, taskName maybe more general. In case we have more information in 
the TaskName, or other rules of registering. Just personal idea.



samza-core/src/main/java/org/apache/samza/storage/ChangelogPartitionManager.java
 (line 52)
https://reviews.apache.org/r/36545/#comment147067

going with the taskName is fine.



samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala 
(line 151)
https://reviews.apache.org/r/36545/#comment147068

if we use TaskName in the regitser method, do not need to change this one.



samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala (line 
245)
https://reviews.apache.org/r/36545/#comment147069

same



samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala 
(line 185)
https://reviews.apache.org/r/36545/#comment147070

same


- Yan Fang


On July 16, 2015, 1:33 p.m., József Márton Jung wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/36545/
 ---
 
 (Updated July 16, 2015, 1:33 p.m.)
 
 
 Review request for samza.
 
 
 Repository: samza
 
 
 Description
 ---
 
 The following has been refactored: 
 1. Static inner classes from CoordinatorStreamMessage has been extracted
 2. Common functionality from CheckpointManager, ChangelogMappingManager and 
 LocalityManager has benn moved to a base class
 
 
 Diffs
 -
 
   checkstyle/import-control.xml eef3370 
   samza-core/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java 
 7445996 
   samza-core/src/main/java/org/apache/samza/container/LocalityManager.java 
 55c258f 
   
 samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java
  6bd1bd3 
   
 samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
  b1078bd 
   
 samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemProducer.java
  92f8907 
   
 samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/CoordinatorStreamMessage.java
  PRE-CREATION 
   
 samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/Delete.java
  PRE-CREATION 
   
 samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetChangelogMapping.java
  PRE-CREATION 
   
 samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetCheckpoint.java
  PRE-CREATION 
   
 samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetConfig.java
  PRE-CREATION 
   
 samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetContainerHostMapping.java
  PRE-CREATION 
   samza-core/src/main/java/org/apache/samza/job/model/JobModel.java ad6387d 
   
 samza-core/src/main/java/org/apache/samza/manager/CoordinatorStreamManager.java
  PRE-CREATION 
   
 samza-core/src

Re: Review Request 36473: SAMZA-733 Add metrics to Elasticsearch System Producer

2015-07-22 Thread Yan Fang


 On July 21, 2015, 5:42 p.m., Yan Fang wrote:
  samza-core/src/main/scala/org/apache/samza/metrics/MetricsHelper.scala, 
  lines 36-37
  https://reviews.apache.org/r/36473/diff/3/?file=1017436#file1017436line36
 
  though it works, prefer to use the def here, not only because it has 
  leff overhead, but also keep all the methods consistent for better 
  readability. What do you think?
 
 Roger Hoover wrote:
 Sounds good.  I only baulked on it the first time because I'm not that 
 skilled with Scala type decarations yet. :)  I can make this work
 
 Roger Hoover wrote:
 I take it back.  It seems it [can't be 
 done](http://www.scala-lang.org/old/node/5082)

aha, sorry for the misleading. I think, what I mean here is to change the val 
to def: val newCounter = metricGroup.newCounter(_) == def newCounter(name: 
String).


- Yan


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36473/#review92429
---


On July 22, 2015, 4:07 a.m., Roger Hoover wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/36473/
 ---
 
 (Updated July 22, 2015, 4:07 a.m.)
 
 
 Review request for samza.
 
 
 Repository: samza
 
 
 Description
 ---
 
 SAMZA-733 Add metrics to Elasticsearch System Producer
 
 
 Diffs
 -
 
   samza-core/src/main/java/org/apache/samza/metrics/MetricGroup.java 
 PRE-CREATION 
   samza-core/src/main/java/org/apache/samza/metrics/MetricsBase.java 
 PRE-CREATION 
   samza-core/src/main/scala/org/apache/samza/metrics/MetricsHelper.scala 
 8eac8ef 
   
 samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemFactory.java
  a277b69 
   
 samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java
  7eb14a2 
   
 samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetrics.java
  PRE-CREATION 
   
 samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetricsTest.java
  PRE-CREATION 
   
 samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerTest.java
  e63d62c 
 
 Diff: https://reviews.apache.org/r/36473/diff/
 
 
 Testing
 ---
 
 Tested that metrics for Elasticsearch producer appear in JMX and the metrics 
 stream and that the metrics correctly count how many Elasticsearch documents 
 were created and indexed.
 
 
 Thanks,
 
 Roger Hoover
 




Re: question on commit on changelog

2015-07-22 Thread Yan Fang
Hi Chen Song,

There are two different concepts: *checkpoint* and *changelog*. Checkpoint
is for the offset of the messages, while the changelog is for the kv-store.
The code snippet you show is for the checkpoint , not for the changelog.

{quote}
1. When implementing our Samza task, does each call of process method
triggers a call to TaskInstance.commit?
{quote}

TaskInstance.commit triggers the *checkpoint* . It is triggered every
task.commit.ms , (default is 6ms). The code is here
https://github.com/apache/samza/blob/master/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala#L149-166
. Basically, the RunLoop class calls the commit method, but only trigger
the commit behavior every configured time.

If you are talking about the *changelog*, it's not controlled by the commit
method. Instead, every put/delete calls the send
https://github.com/apache/samza/blob/master/samza-api/src/main/java/org/apache/samza/system/SystemProducer.java#L51
of the system Producer. (code is here
https://github.com/apache/samza/blob/master/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala#L62-L66).
In terms of how often the send really *send *to the broker (e.g. kafka),
it depends on your producer's configuration. For example, in Kafka, you can
have the producer send a batch (setting async), or send one msg a time
(setting sync). What it means is that, it leaves the System to decide how
to deal with the send method.


{quote}
2. Is there a way to buffer these commit activities in memory and flush
periodically? Our job is joining 1mm messages per second using a KV store
and we have a lot of concern for the changelog size, as in the worst case,
the change log will grow as fast as the input log.
{quote}

If you are talking about the checkpoint, you can change the task.commit.ms .

If you are thinking of the changelog (kv-store), you can change the
producer's config to batch a few changes and send to the broker.

I think the guys in the community with more operational experience are able
to tell you what is the best practice.

Thanks,

Fang, Yan
yanfang...@gmail.com

On Wed, Jul 22, 2015 at 9:00 AM, Chen Song chen.song...@gmail.com wrote:

 We are trying to understand the order of commits when processing each
 message in a Samza job.

 T1: input offset commit
 T2: changelog commit
 T3: output commit

 By looking at the code snippet in

 https://github.com/apache/samza/blob/master/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala#L155-L171
 ,
 my understanding is that for each input message, Samza always send update
 message on changelog, send the output message and then commit the input
 offset. It makes sense to me at the high level in terms of at least once
 processing.

 Specifically, we have two dumb questions:

 1. When implementing our Samza task, does each call of process method
 triggers a call to TaskInstance.commit?
 2. Is there a way to buffer these commit activities in memory and flush
 periodically? Our job is joining 1mm messages per second using a KV store
 and we have a lot of concern for the changelog size, as in the worst case,
 the change log will grow as fast as the input log.

 Chen

 --
 Chen Song



Re: Review Request 36473: SAMZA-733 Add metrics to Elasticsearch System Producer

2015-07-21 Thread Yan Fang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36473/#review92429
---



samza-core/src/main/java/org/apache/samza/metrics/MetricsBase.java (line 23)
https://reviews.apache.org/r/36473/#comment146595

of the class the extends - of the class that extends



samza-core/src/main/java/org/apache/samza/metrics/MetricsBase.java (line 29)
https://reviews.apache.org/r/36473/#comment146607

can we also have a constructor with the default prefix ?



samza-core/src/main/scala/org/apache/samza/metrics/MetricsHelper.scala (line 29)
https://reviews.apache.org/r/36473/#comment146598

the extends - that extends



samza-core/src/main/scala/org/apache/samza/metrics/MetricsHelper.scala (lines 
36 - 37)
https://reviews.apache.org/r/36473/#comment146606

though it works, prefer to use the def here, not only because it has leff 
overhead, but also keep all the methods consistent for better readability. What 
do you think?


- Yan Fang


On July 21, 2015, 5:41 a.m., Roger Hoover wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/36473/
 ---
 
 (Updated July 21, 2015, 5:41 a.m.)
 
 
 Review request for samza.
 
 
 Repository: samza
 
 
 Description
 ---
 
 SAMZA-733 Add metrics to Elasticsearch System Producer
 
 
 Diffs
 -
 
   samza-core/src/main/java/org/apache/samza/metrics/MetricGroup.java 
 PRE-CREATION 
   samza-core/src/main/java/org/apache/samza/metrics/MetricsBase.java 
 PRE-CREATION 
   samza-core/src/main/scala/org/apache/samza/metrics/MetricsHelper.scala 
 8eac8ef 
   
 samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemFactory.java
  a277b69 
   
 samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java
  7eb14a2 
   
 samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetrics.java
  PRE-CREATION 
   
 samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetricsTest.java
  PRE-CREATION 
   
 samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerTest.java
  e63d62c 
 
 Diff: https://reviews.apache.org/r/36473/diff/
 
 
 Testing
 ---
 
 Tested that metrics for Elasticsearch producer appear in JMX and the metrics 
 stream and that the metrics correctly count how many Elasticsearch documents 
 were created and indexed.
 
 
 Thanks,
 
 Roger Hoover
 




Re: Review Request 33419: SAMZA-625: Provide tool to consume changelog and materialize a state store

2015-07-15 Thread Yan Fang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33419/
---

(Updated July 16, 2015, 12:56 a.m.)


Review request for samza.


Changes
---

fix nits.
get rid of writing the latest offset to the file


Bugs: SAMZA-625
https://issues.apache.org/jira/browse/SAMZA-625


Repository: samza


Description
---

Implemented in Java.

* modified build.gradle to have the gradle compile scala first. Because some 
jave code has dependencies to Scala code
* change the state store name by removing the space ( in TaskManager )
* add scala java conversion method in Util because some classes only accept 
scala map
* add java version of some configs 
* remove duplicated config in samza-log4j
* add StorageRevoery class, which does most of the recoverying job. The logic 
mimics what happens in SamzaContainer.
* add StateStorageTool, for the commandline usage
* unit tests
* docs


Diffs (updated)
-

  checkstyle/import-control.xml 3374f0c 
  docs/learn/documentation/versioned/container/state-management.md 79067bb 
  samza-core/src/main/java/org/apache/samza/config/JavaStorageConfig.java 
PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/config/JavaSystemConfig.java 
PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/storage/StateStorageTool.java 
PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java 
PRE-CREATION 
  samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
aeba61a 
  samza-core/src/main/scala/org/apache/samza/util/Util.scala 2feb65b 
  samza-core/src/test/java/org/apache/samza/config/TestJavaStorageConfig.java 
PRE-CREATION 
  samza-core/src/test/java/org/apache/samza/config/TestJavaSystemConfig.java 
PRE-CREATION 
  samza-core/src/test/java/org/apache/samza/storage/MockStorageEngine.java 
PRE-CREATION 
  
samza-core/src/test/java/org/apache/samza/storage/MockStorageEngineFactory.java 
PRE-CREATION 
  samza-core/src/test/java/org/apache/samza/storage/MockSystemConsumer.java 
PRE-CREATION 
  samza-core/src/test/java/org/apache/samza/storage/MockSystemFactory.java 
PRE-CREATION 
  samza-core/src/test/java/org/apache/samza/storage/TestStorageRecovery.java 
PRE-CREATION 
  samza-log4j/src/main/java/org/apache/samza/config/Log4jSystemConfig.java 
d5e24f2 
  samza-shell/src/main/bash/state-storage-tool.sh PRE-CREATION 

Diff: https://reviews.apache.org/r/33419/diff/


Testing
---

tested with multiple partitions and multiple stores recovery.


Thanks,

Yan Fang



Re: Another checkpoint tool question

2015-07-14 Thread Yan Fang
Hi Jae,

Do you want to reset the offset to the latest one when you *start *the job
or when the just is *running*?

If it's the former one, you can use

systems.system-name.samza.offset.default=upcoming
systems.system-name.streams.stream-name.samza.reset.offset=true

What is the reason that you do not want to use
systems.system-name.streams.stream-name.samza.reset.offset
?

Thanks,

Fang, Yan
yanfang...@gmail.com

On Tue, Jul 14, 2015 at 12:49 AM, Bae, Jae Hyeon metac...@gmail.com wrote:

 Hi

 I want to reset the offset for the job to the latest one, which means I
 want to ignore them without using
 systems.system-name.streams.stream-name.samza.reset.offset option.

 If I use checkpoint tool and reset the offset as -1 or Long.MAX_VALUE, in
 my theory, kafka consumer will throw an exception and it will reset them as
 the latest one, am I right? Otherwise, please let me know how to reset
 offsets to the latest one.

 Thank you
 Best, Jae



Re: Review Request 36473: SAMZA-733 Add metrics to Elasticsearch System Producer

2015-07-14 Thread Yan Fang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36473/#review91670
---



samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java
 (line 152)
https://reviews.apache.org/r/36473/#comment145224

remove the space



samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetrics.java
 (line 24)
https://reviews.apache.org/r/36473/#comment145225

can this class extends MetricsHelper? This can simplifies a little.


- Yan Fang


On July 14, 2015, 6:12 a.m., Roger Hoover wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/36473/
 ---
 
 (Updated July 14, 2015, 6:12 a.m.)
 
 
 Review request for samza.
 
 
 Repository: samza
 
 
 Description
 ---
 
 SAMZA-733 Add metrics to Elasticsearch System Producer
 
 
 Diffs
 -
 
   
 samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemFactory.java
  a277b69 
   
 samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java
  7eb14a2 
   
 samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetrics.java
  PRE-CREATION 
 
 Diff: https://reviews.apache.org/r/36473/diff/
 
 
 Testing
 ---
 
 Tested that metrics for Elasticsearch producer appear in JMX and the metrics 
 stream and that the metrics correctly count how many Elasticsearch documents 
 were created and indexed.
 
 
 Thanks,
 
 Roger Hoover
 




Re: Review Request 36473: SAMZA-733 Add metrics to Elasticsearch System Producer

2015-07-14 Thread Yan Fang


 On July 14, 2015, 9:44 p.m., Yan Fang wrote:
  samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetrics.java,
   line 24
  https://reviews.apache.org/r/36473/diff/1/?file=1010788#file1010788line24
 
  can this class extends MetricsHelper? This can simplifies a little.
 
 Roger Hoover wrote:
 I don't see how it simplifies things because I have to implement all the 
 methods in the Scala trait.  I'm having trouble getting the newGauge 
 signatures to match.
 
 ```
 public class ElasticsearchSystemProducerMetrics implements MetricsHelper {
 public final Counter bulkSendSuccess;
 public final Counter inserts;
 public final Counter updates;
 private final MetricsRegistry registry;
 private final String group;
 private final String systemName;
 
 public interface JFunctionR {
 R apply();
 }
 
 public ElasticsearchSystemProducerMetrics(String systemName, 
 MetricsRegistry registry) {
 group = this.getClass().getName();
 this.registry = registry;
 this.systemName = systemName;
 
 bulkSendSuccess = newCounter(bulk-send-success);
 inserts = newCounter(docs-inserted);
 updates = newCounter(docs-updated);
 }
 
 @Override
 public Counter newCounter(String name) {
 return MetricsHelper$class.newCounter(this, name);
 }
 
 @Override
 public T GaugeT newGauge(String name, T value) {
 return MetricsHelper$class.newGauge(this, name, value);
 }
 
 @Override
 public T GaugeT newGauge(String name, JFunctionT value) {
 return null;
 }
 
 @Override
 public Timer newTimer(String name) {
 return MetricsHelper$class.newTimer(this, name);
 }
 
 @Override
 public String getPrefix() {
 return systemName + -;
 }
 
 @Override
 public MetricsRegistry registry() {
 return registry;
 }
 
 @Override
 public String group() {
 return group;
 }
 }
 ```
 
 Roger Hoover wrote:
 We really only need counters for this class but have to figure out how to 
 implement the Scala newGauge methods which are tricky.  Would appreciate help 
 if you know how to do it.

Oh, I see. Sorry for the confusion. I did not realize there is a java-scala 
issue sitting here. Ok. I am fine with going the original approach, which seems 
clear enough


- Yan


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36473/#review91670
---


On July 14, 2015, 6:12 a.m., Roger Hoover wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/36473/
 ---
 
 (Updated July 14, 2015, 6:12 a.m.)
 
 
 Review request for samza.
 
 
 Repository: samza
 
 
 Description
 ---
 
 SAMZA-733 Add metrics to Elasticsearch System Producer
 
 
 Diffs
 -
 
   
 samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemFactory.java
  a277b69 
   
 samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java
  7eb14a2 
   
 samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetrics.java
  PRE-CREATION 
 
 Diff: https://reviews.apache.org/r/36473/diff/
 
 
 Testing
 ---
 
 Tested that metrics for Elasticsearch producer appear in JMX and the metrics 
 stream and that the metrics correctly count how many Elasticsearch documents 
 were created and indexed.
 
 
 Thanks,
 
 Roger Hoover
 




Re: Review Request 36473: SAMZA-733 Add metrics to Elasticsearch System Producer

2015-07-14 Thread Yan Fang


 On 七月 14, 2015, 9:44 p.m., Yan Fang wrote:
  samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetrics.java,
   line 24
  https://reviews.apache.org/r/36473/diff/1/?file=1010788#file1010788line24
 
  can this class extends MetricsHelper? This can simplifies a little.
 
 Roger Hoover wrote:
 I don't see how it simplifies things because I have to implement all the 
 methods in the Scala trait.  I'm having trouble getting the newGauge 
 signatures to match.
 
 ```
 public class ElasticsearchSystemProducerMetrics implements MetricsHelper {
 public final Counter bulkSendSuccess;
 public final Counter inserts;
 public final Counter updates;
 private final MetricsRegistry registry;
 private final String group;
 private final String systemName;
 
 public interface JFunctionR {
 R apply();
 }
 
 public ElasticsearchSystemProducerMetrics(String systemName, 
 MetricsRegistry registry) {
 group = this.getClass().getName();
 this.registry = registry;
 this.systemName = systemName;
 
 bulkSendSuccess = newCounter(bulk-send-success);
 inserts = newCounter(docs-inserted);
 updates = newCounter(docs-updated);
 }
 
 @Override
 public Counter newCounter(String name) {
 return MetricsHelper$class.newCounter(this, name);
 }
 
 @Override
 public T GaugeT newGauge(String name, T value) {
 return MetricsHelper$class.newGauge(this, name, value);
 }
 
 @Override
 public T GaugeT newGauge(String name, JFunctionT value) {
 return null;
 }
 
 @Override
 public Timer newTimer(String name) {
 return MetricsHelper$class.newTimer(this, name);
 }
 
 @Override
 public String getPrefix() {
 return systemName + -;
 }
 
 @Override
 public MetricsRegistry registry() {
 return registry;
 }
 
 @Override
 public String group() {
 return group;
 }
 }
 ```
 
 Roger Hoover wrote:
 We really only need counters for this class but have to figure out how to 
 implement the Scala newGauge methods which are tricky.  Would appreciate help 
 if you know how to do it.
 
 Yan Fang wrote:
 Oh, I see. Sorry for the confusion. I did not realize there is a 
 java-scala issue sitting here. Ok. I am fine with going the original 
 approach, which seems clear enough

Another way is to add a Java version of MetricsHelper. Then 
ElasticsearchSystemProducerMetrics and other future java-version metrics can 
extend it. This helps future implementation. Otherwise, I think the first patch 
is fine. Let you make the decision. Thanks. :)


- Yan


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36473/#review91670
---


On 七月 14, 2015, 6:12 a.m., Roger Hoover wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/36473/
 ---
 
 (Updated 七月 14, 2015, 6:12 a.m.)
 
 
 Review request for samza.
 
 
 Repository: samza
 
 
 Description
 ---
 
 SAMZA-733 Add metrics to Elasticsearch System Producer
 
 
 Diffs
 -
 
   
 samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemFactory.java
  a277b69 
   
 samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java
  7eb14a2 
   
 samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetrics.java
  PRE-CREATION 
 
 Diff: https://reviews.apache.org/r/36473/diff/
 
 
 Testing
 ---
 
 Tested that metrics for Elasticsearch producer appear in JMX and the metrics 
 stream and that the metrics correctly count how many Elasticsearch documents 
 were created and indexed.
 
 
 Thanks,
 
 Roger Hoover
 




Re: Another checkpoint tool question

2015-07-14 Thread Yan Fang
Actually I have not tried this. But I do not think it will work. Because
the next offset is the offset you set + 1. There is no mechanism inside
Samza that treats -1 as the latest (though in Kafka it does). If it works,
let me know. :) I may miss something.

Another way of doing this is to set the latest offset using the checkpoint
tool. You can get the latest offset of the topic from Kafka using
bin/kafka-run-class.sh kafka.tools.GetOffsetShell
https://cwiki.apache.org/confluence/display/KAFKA/System+Tools#SystemTools-ConsumerOffsetChecker

Thanks,

Fang, Yan
yanfang...@gmail.com

On Tue, Jul 14, 2015 at 10:33 AM, Bae, Jae Hyeon metac...@gmail.com wrote:

 Hi Yan

 Thanks for your response. I want to reset the offset just once now. If I
 use systems.system-name.streams.stream-name.samza.reset.offset in the
 configuration file, before restarting the job, I need to remove those
 properties again. That's why I don't want to use that property.

 Is it OK setting invalid offset such as -1 or Long.MAX_VALUE?

 On Tue, Jul 14, 2015 at 10:16 AM, Yan Fang yanfang...@gmail.com wrote:

  Hi Jae,
 
  Do you want to reset the offset to the latest one when you *start *the
 job
  or when the just is *running*?
 
  If it's the former one, you can use
 
  systems.system-name.samza.offset.default=upcoming
  systems.system-name.streams.stream-name.samza.reset.offset=true
 
  What is the reason that you do not want to use
  systems.system-name.streams.stream-name.samza.reset.offset
  ?
 
  Thanks,
 
  Fang, Yan
  yanfang...@gmail.com
 
  On Tue, Jul 14, 2015 at 12:49 AM, Bae, Jae Hyeon metac...@gmail.com
  wrote:
 
   Hi
  
   I want to reset the offset for the job to the latest one, which means I
   want to ignore them without using
   systems.system-name.streams.stream-name.samza.reset.offset option.
  
   If I use checkpoint tool and reset the offset as -1 or Long.MAX_VALUE,
 in
   my theory, kafka consumer will throw an exception and it will reset
 them
  as
   the latest one, am I right? Otherwise, please let me know how to reset
   offsets to the latest one.
  
   Thank you
   Best, Jae
  
 



Re: Thoughts and obesrvations on Samza

2015-07-13 Thread Yan Fang
 discussion in the Kafka list (but makes
 sense
  to
   figure out first if it is what Samza wants).
  
   Irrespective of how it's implemented, though, to me the important
 things
   are the following:
   1. Unify the website, config, naming, docs, metrics, etc--basically fix
   the product experience so the stream and the processing feel like a
   single user experience and brand. This seems minor but I think is a
  really
   big deal.
   2. Make standalone mode a first class citizen and have a real
 technical
   plan to be able to support cluster managers other than YARN.
   3. Make the config and out-of-the-box experience more usable
  
   I think that prototype gives a practical example of how 1-3 could be
 done
   and we should pursue it. This is a pretty radical change, so I wouldn't
  be
   shocked if people didn't want to take a step like that.
  
   Maybe it would make sense to see if people are on board with that
 general
   idea, and then try to get some advice on sub-projects in parallel and
  nail
   down those details?
  
   -Jay
  
   On Sun, Jul 12, 2015 at 5:54 PM, Chris Riccomini 
 criccom...@apache.org
   wrote:
  
   Hey all,
  
   I want to start by saying that I'm absolutely thrilled to be a part of
   this
   community. The amount of level-headed, thoughtful, educated discussion
   that's gone on over the past ~10 days is overwhelming. Wonderful.
  
   It seems like discussion is waning a bit, and we've reached some
   conclusions. There are several key emails in this threat, which I want
  to
   call out:
  
   1. Jakob's summary of the three potential ways forward.
  
  
  
 
 http://mail-archives.apache.org/mod_mbox/samza-dev/201507.mbox/%3CCADiKvVu-hxdBfyQ4qm3LDC55cUQbPdmbe4zGzTOOatYF1Pz43A%40mail.gmail.com%3E
   2. Julian's call out that we should be focusing on community over
 code.
  
  
  
 
 http://mail-archives.apache.org/mod_mbox/samza-dev/201507.mbox/%3CCAPSgeESZ_7bVFbwN%2Bzqi5MH%3D4CWu9MZUSanKg0-1woMqt55Fvg%40mail.gmail.com%3E
   3. Martin's summary about the benefits of merging communities.
  
  
  
 
 http://mail-archives.apache.org/mod_mbox/samza-dev/201507.mbox/%3CBFB866B6-D9D8-4578-93C0-FFAEB1DF00FC%40kleppmann.com%3E
   4. Jakob's comments about the distinction between community and code
   paths.
  
  
  
 
 http://mail-archives.apache.org/mod_mbox/samza-dev/201507.mbox/%3CCADiKvVtWPjHLLDsmxvz9KggVA5DfBi-nUvfqB6QdA-du%2B_a9Ng%40mail.gmail.com%3E
  
   I agree with the comments on all of these emails. I think Martin's
  summary
   of his position aligns very closely with my own. To that end, I think
 we
   should get concrete about what the proposal is, and call a vote on it.
   Given that Jay, Martin, and I seem to be aligning fairly closely, I
  think
   we should start with:
  
   1. [community] Make Samza a subproject of Kafka.
   2. [community] Make all Samza PMC/committers committers of the
  subproject.
   3. [community] Migrate Samza's website/documentation into Kafka's.
   4. [code] Have the Samza community and the Kafka community start a
   from-scratch reboot together in the new Kafka subproject. We can
   borrow/copy   paste significant chunks of code from Samza's code
 base.
   5. [code] The subproject would intentionally eliminate support for
 both
   other streaming systems and all deployment systems.
   6. [code] Attempt to provide a bridge from our SystemConsumer to
 KIP-26
   (copy cat)
   7. [code] Attempt to provide a bridge from the new subproject's
  processor
   interface to our legacy StreamTask interface.
   8. [code/community] Sunset Samza as a TLP when we have a working Kafka
   subproject that has a fault-tolerant container with state management.
  
   It's likely that (6) and (7) won't be fully drop-in. Still, the closer
  we
   can get, the better it's going to be for our existing community.
  
   One thing that I didn't touch on with (2) is whether any Samza PMC
  members
   should be rolled into Kafka PMC membership as well (though, Jay and
  Jakob
   are already PMC members on both). I think that Samza's community
  deserves
   a
   voice on the PMC, so I'd propose that we roll at least a few PMC
 members
   into the Kafka PMC, but I don't have a strong framework for which
 people
   to
   pick.
  
   Before (8), I think that Samza's TLP can continue to commit bug fixes
  and
   patches as it sees fit, provided that we openly communicate that we
  won't
   necessarily migrate new features to the new subproject, and that the
 TLP
   will be shut down after the migration to the Kafka subproject occurs.
  
   Jakob, I could use your guidance here about about how to achieve this
  from
   an Apache process perspective (sorry).
  
   * Should I just call a vote on this proposal?
   * Should it happen on dev or private?
   * Do committers have binding votes, or just PMC?
  
   Having trouble finding much detail on the Apache wikis. :(
  
   Cheers,
   Chris
  
   On Fri, Jul 10, 2015 at 2:38 PM, Yan Fang yanfang...@gmail.com
 wrote

Re: Review Request 36274: SAMZA-401: getCpuTime to truly calculate duty cycle of the event loop

2015-07-13 Thread Yan Fang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36274/#review91527
---



samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala (lines 75 - 
76)
https://reviews.apache.org/r/36274/#comment144953

for more accurate, I think the activeNs should go before totalNs.


- Yan Fang


On July 7, 2015, 7:08 p.m., Luis De Pombo wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/36274/
 ---
 
 (Updated July 7, 2015, 7:08 p.m.)
 
 
 Review request for samza.
 
 
 Repository: samza
 
 
 Description
 ---
 
 SAMZA-401: getCpuTime to truly calculate duty cycle of the event loop
 
 
 Diffs
 -
 
   samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala 
 c292ae47cd89ef0f25dc682c02dd288e2ba6dcc5 
   samza-core/src/main/scala/org/apache/samza/util/TimerUtils.scala 
 1643070dd710efb9ade9eb5812dabd6fa60ce023 
   samza-core/src/main/scala/org/apache/samza/util/Util.scala 
 2feb65b729b45fbc3b83a75c4072527e3c4e60be 
   samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala 
 64a5844bdb343a3c509cba059b9f3b9a19dc9eff 
 
 Diff: https://reviews.apache.org/r/36274/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Luis De Pombo
 




Re: Review Request 36089: SAMZA-670 Allow easier access to JMX port

2015-07-13 Thread Yan Fang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36089/#review91533
---



samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala (line 
634)
https://reviews.apache.org/r/36089/#comment144956

I think a better way, which requires much fewer changes, is to call 
something like jmxServer.getJmxUrl, jmxServer.jmxTunelingUrl.

jmxServer can be a variable of SamzaContainer Object.

Then we do not need to change ContainerModel, JobModel, SamzaContext. 
Because there is no reason that we want to contain jmx information into those 
three objects.


- Yan Fang


On July 1, 2015, 2:07 p.m., József Márton Jung wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/36089/
 ---
 
 (Updated July 1, 2015, 2:07 p.m.)
 
 
 Review request for samza.
 
 
 Repository: samza
 
 
 Description
 ---
 
 JMX address of application master and the containers is available through AM 
 UI
 
 
 Diffs
 -
 
   checkstyle/import-control.xml 3374f0c 
   
 samza-api/src/main/java/org/apache/samza/container/SamzaContainerContext.java 
 fd7333b 
   samza-core/src/main/java/org/apache/samza/container/LocalityManager.java 
 e661e12 
   
 samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java
  6c1e488 
   samza-core/src/main/java/org/apache/samza/job/model/ContainerModel.java 
 98a34bc 
   samza-core/src/main/java/org/apache/samza/job/model/JobModel.java 95a2ce5 
   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
 cbacd18 
   samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala 
 8ee034a 
   samza-core/src/main/scala/org/apache/samza/metrics/JmxServer.scala f343faf 
   
 samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala 
 9fb1aa9 
   samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala 
 7caad28 
   
 samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala
  1ce7d25 
   samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml cf0d2fc 
   samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala 
 20aa373 
   
 samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala 
 1445605 
 
 Diff: https://reviews.apache.org/r/36089/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 József Márton Jung
 




Re: Thoughts and obesrvations on Samza

2015-07-10 Thread Yan Fang
Thanks, Jay. This argument persuaded me actually. :)

Fang, Yan
yanfang...@gmail.com

On Fri, Jul 10, 2015 at 2:33 PM, Jay Kreps j...@confluent.io wrote:

 Hey Yan,

 Yeah philosophically I think the argument is that you should capture the
 stream in Kafka independent of the transformation. This is obviously a
 Kafka-centric view point.

 Advantages of this:
 - In practice I think this is what e.g. Storm people often end up doing
 anyway. You usually need to throttle any access to a live serving database.
 - Can have multiple subscribers and they get the same thing without
 additional load on the source system.
 - Applications can tap into the stream if need be by subscribing.
 - You can debug your transformation by tailing the Kafka topic with the
 console consumer
 - Can tee off the same data stream for batch analysis or Lambda arch style
 re-processing

 The disadvantage is that it will use Kafka resources. But the idea is
 eventually you will have multiple subscribers to any data source (at least
 for monitoring) so you will end up there soon enough anyway.

 Down the road the technical benefit is that I think it gives us a good path
 towards end-to-end exactly once semantics from source to destination.
 Basically the connectors need to support idempotence when talking to Kafka
 and we need the transactional write feature in Kafka to make the
 transformation atomic. This is actually pretty doable if you separate
 connector=kafka problem from the generic transformations which are always
 kafka=kafka. However I think it is quite impossible to do in a all_things
 = all_things environment. Today you can say well the semantics of the
 Samza APIs depend on the connectors you use but it is actually worse then
 that because the semantics actually depend on the pairing of connectors--so
 not only can you probably not get a usable exactly once guarantee
 end-to-end it can actually be quite hard to reverse engineer what property
 (if any) your end-to-end flow has if you have heterogenous systems.

 -Jay

 On Fri, Jul 10, 2015 at 2:00 PM, Yan Fang yanfang...@gmail.com wrote:

  {quote}
  maintained in a separate repository and retaining the existing
  committership but sharing as much else as possible (website, etc)
  {quote}
 
  Overall, I agree on this idea. Now the question is more about how to do
  it.
 
  On the other hand, one thing I want to point out is that, if we decide to
  go this way, how do we want to support
  otherSystem-transformation-otherSystem use case?
 
  Basically, there are four user groups here:
 
  1. Kafka-transformation-Kafka
  2. Kafka-transformation-otherSystem
  3. otherSystem-transformation-Kafka
  4. otherSystem-transformation-otherSystem
 
  For group 1, they can easily use the new Samza library to achieve. For
  group 2 and 3, they can use copyCat - transformation - Kafka or Kafka-
  transformation - copyCat.
 
  The problem is for group 4. Do we want to abandon this or still support
 it?
  Of course, this use case can be achieved by using copyCat -
 transformation
  - Kafka - transformation - copyCat, the thing is how we persuade them
 to
  do this long chain. If yes, it will also be a win for Kafka too. Or if
  there is no one in this community actually doing this so far, maybe ok to
  not support the group 4 directly.
 
  Thanks,
 
  Fang, Yan
  yanfang...@gmail.com
 
  On Fri, Jul 10, 2015 at 12:58 PM, Jay Kreps j...@confluent.io wrote:
 
   Yeah I agree with this summary. I think there are kind of two questions
   here:
   1. Technically does alignment/reliance on Kafka make sense
   2. Branding wise (naming, website, concepts, etc) does alignment with
  Kafka
   make sense
  
   Personally I do think both of these things would be really valuable,
 and
   would dramatically alter the trajectory of the project.
  
   My preference would be to see if people can mostly agree on a direction
   rather than splintering things off. From my point of view the ideal
  outcome
   of all the options discussed would be to make Samza a closely aligned
   subproject, maintained in a separate repository and retaining the
  existing
   committership but sharing as much else as possible (website, etc). No
  idea
   about how these things work, Jacob, you probably know more.
  
   No discussion amongst the Kafka folks has happened on this, but likely
 we
   should figure out what the Samza community actually wants first.
  
   I admit that this is a fairly radical departure from how things are.
  
   If that doesn't fly, I think, yeah we could leave Samza as it is and do
  the
   more radical reboot inside Kafka. From my point of view that does leave
   things in a somewhat confusing state since now there are two stream
   processing systems more or less coupled to Kafka in large part made by
  the
   same people. But, arguably that might be a cleaner way to make the
  cut-over
   and perhaps less risky for Samza community since if it works people can
   switch and if it doesn't nothing

Re: Question on newBlockingQueue in BlockingEnvelopeMap

2015-07-10 Thread Yan Fang
Hi Jae,

I think the messages are not lost, instead, they all go to one partition,
in your shared queue implementation.

If you check the code in BlockingEnvelopeMap line 123
https://github.com/apache/samza/blob/master/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java#L123
,
it puts all the messages in the queue in one partition.

Thanks,

Fang, Yan
yanfang...@gmail.com

On Fri, Jul 10, 2015 at 12:36 PM, Bae, Jae Hyeon metac...@gmail.com wrote:

 Hi Samza devs and users

 I wrote customized Samza S3 consumer which downloads files from S3 and put
 messages in BlockedEnvelopeMap. It was straightforward because there's a
 nice example, filereader. I tried to a little optimize with
 newBlockingQueue() method because I guess that single queue shared could be
 fine because Samza container is single threaded. I added the following
 code:


 public S3Consumer(String systemName, Config config, MetricsRegistry
 registry) {
 queueSize = config.getInt(systems. + systemName + .queue.size,
 1);
 bucket = config.get(systems. + systemName + .bucket);
 prefix = config.get(systems. + systemName + .prefix);

 queue = new LinkedBlockingQueue(queueSize);

 recordCounter = registry.newCounter(this.getClass().getName(),
 processed_records);
 }

 @Override
 protected BlockingQueueIncomingMessageEnvelope newBlockingQueue() {
 return queue; // single queue
 }

 Unfortunately, I observed significant message loss with this
 implementation. I suspected its queue might have dropped messages, so I
 changed newBlockingQueue() implementation same as filereader.

 @Override
 protected BlockingQueueIncomingMessageEnvelope newBlockingQueue() {
 return new LinkedBlockingQueue(queueSize);
 }

 Then, message loss didn't happen again.

 Do you have any idea why it went wrong?

 Thank you
 Best, Jae



Re: Thoughts and obesrvations on Samza

2015-07-10 Thread Yan Fang
 into and out of Kafka --  was to avoid  having
 to
worry
   about
the
  lifecycle management of external clients. If there
 is
  a
 generic
Kafka
  ingress/egress layer that I can plug a new connector
  into
and
   have
a
  lot of
  the heavy lifting re scale and reliability done for
 me
   then
 it
gives
 me
  all
  the pushing new consumers/producers would. If not
  then it
complicates
 my
  operational deployments.
 
  Which is similar to my other question with the
  proposal
   --
if
  we
 build a
  fully available/stand-alone Samza plus the requisite
   shims
to
 integrate
  with Slider etc I suspect the former may be a lot
 more
   work
  than
   we
  think.
  We may make it much easier for a newcomer to get
   something
   running
but
  having them step up and get a reliable production
deployment
  may
still
  dominate mailing list  traffic, if for different
  reasons
than
today.
 
  Don't get me wrong -- I'm comfortable with making
 the
   Samza
dependency
  on
  Kafka much more explicit and I absolutely see the
   benefits
 in
   the
  reduction of duplication and clashing
  terminologies/abstractions
that
  Chris/Jay describe. Samza as a library would likely
  be a
very
   nice
 tool
  to
  add to the Kafka ecosystem. I just have the concerns
   above
re
  the
  operational side.
 
  Garry
 
  -Original Message-
  From: Gianmarco De Francisci Morales [mailto:
g...@apache.org
 ]
  Sent: 02 July 2015 12:56
  To: dev@samza.apache.org
  Subject: Re: Thoughts and obesrvations on Samza
 
  Very interesting thoughts.
  From outside, I have always perceived Samza as a
   computing
  layer
over
  Kafka.
 
  The question, maybe a bit provocative, is should
  Samza
   be
a
 sub-project
  of Kafka then?
  Or does it make sense to keep it as a separate
 project
with a
separate
  governance?
 
  Cheers,
 
  --
  Gianmarco
 
  On 2 July 2015 at 08:59, Yan Fang 
  yanfang...@gmail.com
  wrote:
 
  Overall, I agree to couple with Kafka more tightly.
Because
   Samza
de
  facto is based on Kafka, and it should leverage
 what
   Kafka
  has.
   At
 the
  same time, Kafka does not need to reinvent what
 Samza
 already
has. I
  also like the idea of separating the ingestion and
   transformation.
 
  But it is a little difficult for me to image how
 the
   Samza
  will
look
  like.
  And I feel Chris and Jay have a little difference
 in
   terms
 of
   how
  Samza should look like.
 
  *** Will it look like what Jay's code shows (A
  client of
  Kakfa)
   ?
And
  user's application code calls this client?
 
  1. If we make Samza be a library of Kafka (like
 what
  the
 code
shows),
  how do we implement auto-balance and
 fault-tolerance?
   Are
 they
taken
  care by the Kafka broker or other mechanism, such
 as
Samza
worker
  (just make up the name) ?
 
  2. What about other features, such as auto-scaling,
   shared
   state,
  monitoring?
 
 
  *** If we have Samza standalone, (is this what
 Chris
  suggests?)
 
  1. we still need to ingest data from Kakfa and
  produce
   to
 it.
Then it
  becomes the same as what Samza looks like now,
  except it
 does
   not
 rely
  on Yarn anymore.
 
  2. if it is standalone, how can it leverage Kafka's
metrics,
   logs,
  etc? Use Kafka code as the dependency?
 
 
  Thanks,
 
  Fang, Yan
  yanfang...@gmail.com
 
  On Wed, Jul 1, 2015 at 5:46 PM, Guozhang Wang 
   wangg...@gmail.com

  wrote:
 
  Read through the code example and it looks good to
  me.
   A
 few
  thoughts regarding deployment:
 
  Today Samza deploys as executable runnable like:
 
  deploy/samza/bin/run-job.sh --config-factory=...
  --config-path=file://...
 
  And this proposal advocate for deploying Samza
 more
  as
  embedded
  libraries in user application code (ignoring the
 terminology

Re: Review Request 35933: SAMZA-449 Expose RocksDB statistic

2015-07-03 Thread Yan Fang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/35933/#review90373
---



samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
 (line 133)
https://reviews.apache.org/r/35933/#comment143410

we can remove the KeyValueStoreMetrics here, right? change to new 
RocksDbStatisticMetrics(storeName, options, metrics)



samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
 (line 245)
https://reviews.apache.org/r/35933/#comment143411

the [[]] is not very common. And I do not see anywere in Samza we are 
using. It's better to remove them, or use {@link} if you want to link to the 
class.



samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbStatisticMetrics.scala
 (line 213)
https://reviews.apache.org/r/35933/#comment143409

is any code is using those methods (line 213-219)? If not, it's safe to 
remove them. Also, users can directly call the variable name to get the value, 
if they want.


- Yan Fang


On July 3, 2015, 11:05 p.m., Gustavo Anatoly F. V. Solís wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/35933/
 ---
 
 (Updated July 3, 2015, 11:05 p.m.)
 
 
 Review request for samza.
 
 
 Repository: samza
 
 
 Description
 ---
 
 RocksDb expose statistic
 
 
 Diffs
 -
 
   
 samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
  a423f7bd6c43461e051b5fd1f880dd01db785991 
   
 samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbStatisticMetrics.scala
  PRE-CREATION 
   
 samza-kv-rocksdb/src/test/scala/org/apache/samza/storage/kv/TestRocksDbKeyValueStore.scala
  a428a16bc1e9ab4980a6f17db4fd810057d31136 
 
 Diff: https://reviews.apache.org/r/35933/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Gustavo Anatoly F. V. Solís
 




Re: Samza and sliding window

2015-07-01 Thread Yan Fang
Do you have

serializers.registry.json.class
=org.apache.samza.serializers.JsonSerdeFactory

in your config file?


Fang, Yan
yanfang...@gmail.com

On Wed, Jul 1, 2015 at 2:59 PM, Shekar Tippur ctip...@gmail.com wrote:

 Yi/Milinda,

 I am trying to initialize a kv store. I have the following properties
 defined:

 stores.store-name.key.serde=json

 stores.store-name.msg.serde=json

 stores.store-name.changelog=argos.windowchangelog
 How do I define a key serde as I am getting this exception:

 Exception in thread main org.apache.samza.SamzaException: Must define a
 key serde when using key value storage.

 at

 org.apache.samza.storage.kv.BaseKeyValueStorageEngineFactory$class.getStorageEngine(BaseKeyValueStorageEngineFactory.scala:86)

 at

 org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory.getStorageEngine(RocksDbKeyValueStorageEngineFactory.scala:28)

 at

 org.apache.samza.container.SamzaContainer$$anonfun$35$$anonfun$38.apply(SamzaContainer.scala:455)

 at

 org.apache.samza.container.SamzaContainer$$anonfun$35$$anonfun$38.apply(SamzaContainer.scala:439)

 at

 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)

 at

 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)

 at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)

 at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)

 at scala.collection.AbstractTraversable.map(Traversable.scala:105)

 at

 org.apache.samza.container.SamzaContainer$$anonfun$35.apply(SamzaContainer.scala:439)

 at

 org.apache.samza.container.SamzaContainer$$anonfun$35.apply(SamzaContainer.scala:416)

 at

 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)

 at

 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)

 at scala.collection.immutable.Set$Set1.foreach(Set.scala:74)

 at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)

 at

 scala.collection.AbstractSet.scala$collection$SetLike$$super$map(Set.scala:47)

 at scala.collection.SetLike$class.map(SetLike.scala:93)

 at scala.collection.AbstractSet.map(Set.scala:47)

 at
 org.apache.samza.container.SamzaContainer$.apply(SamzaContainer.scala:416)

 at

 org.apache.samza.job.local.ThreadJobFactory.getJob(ThreadJobFactory.scala:63)

 at org.apache.samza.job.JobRunner.run(JobRunner.scala:62)

 at org.apache.samza.job.JobRunner$.main(JobRunner.scala:37)

 at org.apache.samza.job.JobRunner.main(JobRunner.scala)

 On Mon, Jun 29, 2015 at 12:41 PM, Shekar Tippur ctip...@gmail.com wrote:

  Yi,
 
  My use case is more of the latter. Your explanation makes sense now. I
 was
  also looking into Milinda's wiki. She has a section for Kafka
  partition SimplePartitioner, which is simple enough as well.
 
  Thanks for all the inputs. Let me see what I come up with while
  implementing it.
 
  - Shekar
 
  On Mon, Jun 29, 2015 at 10:42 AM, Yi Pan nickpa...@gmail.com wrote:
 
  Hi, Shekar,
 
  First, I would like to clarify what you meant by sliding window: is it
  defined as windows with size N and advance step size of 1 (which means
  that
  windows overlap and each input message would contribute to multiple
 counts
  in different windows)? Or windows with size N and advance step size of N
  (i.e. each incoming message only contribute to one counter in a single
  window)?
 
  If your use case falls into the first category, you will need something
  more sophisticated as discussed in SAMZA-552. If your use case is the
  second one, there could be a simpler version of SAMZA-552 that you can
 go
  with:
 
  1) Initiate a KV-store that uses the application name as the key
  2) For each incoming message, look for the windows that the message by
 the
  application name
  3) Update the counter and update the value in the KV-store based on the
  application name
  4) Every 5 min when window() method is triggered, set all counters to
 zero
  (this can be done in a lazy way as well, by keeping the last reset
  timestamp in the record in the KV-store, keyed by application name.
 Then,
  resetting counter to zero can be done when next time the application
  counter is updated again)
 
  Hope that makes sense.
 
  -Yi
 
  On Mon, Jun 29, 2015 at 10:06 AM, Shekar Tippur ctip...@gmail.com
  wrote:
 
   Benjamin,
  
   Thanks for the explanation. We dont have any specific partition scheme
  as
   yet. We just have 2 topics - raw and processed and we use default
   partitioning scheme.
   Can you share any code snippet so I can understand it better?
  
   - Shekar
  
 
 
 



Re: Samza and sliding window

2015-07-01 Thread Yan Fang
So do you use the store-name as the kv storage name in your StreamTask
code?

Fang, Yan
yanfang...@gmail.com

On Wed, Jul 1, 2015 at 3:41 PM, Shekar Tippur ctip...@gmail.com wrote:

 Yan,

 yes. I do have it.

 - Shekar

 On Wed, Jul 1, 2015 at 3:09 PM, Yan Fang yanfang...@gmail.com wrote:

  Do you have
 
  serializers.registry.json.class
  =org.apache.samza.serializers.JsonSerdeFactory
 
  in your config file?
 
 
  Fang, Yan
  yanfang...@gmail.com
 
  On Wed, Jul 1, 2015 at 2:59 PM, Shekar Tippur ctip...@gmail.com wrote:
 
   Yi/Milinda,
  
   I am trying to initialize a kv store. I have the following properties
   defined:
  
   stores.store-name.key.serde=json
  
   stores.store-name.msg.serde=json
  
   stores.store-name.changelog=argos.windowchangelog
   How do I define a key serde as I am getting this exception:
  
   Exception in thread main org.apache.samza.SamzaException: Must
 define a
   key serde when using key value storage.
  
   at
  
  
 
 org.apache.samza.storage.kv.BaseKeyValueStorageEngineFactory$class.getStorageEngine(BaseKeyValueStorageEngineFactory.scala:86)
  
   at
  
  
 
 org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory.getStorageEngine(RocksDbKeyValueStorageEngineFactory.scala:28)
  
   at
  
  
 
 org.apache.samza.container.SamzaContainer$$anonfun$35$$anonfun$38.apply(SamzaContainer.scala:455)
  
   at
  
  
 
 org.apache.samza.container.SamzaContainer$$anonfun$35$$anonfun$38.apply(SamzaContainer.scala:439)
  
   at
  
  
 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
  
   at
  
  
 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
  
   at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
  
   at
 scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
  
   at scala.collection.AbstractTraversable.map(Traversable.scala:105)
  
   at
  
  
 
 org.apache.samza.container.SamzaContainer$$anonfun$35.apply(SamzaContainer.scala:439)
  
   at
  
  
 
 org.apache.samza.container.SamzaContainer$$anonfun$35.apply(SamzaContainer.scala:416)
  
   at
  
  
 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
  
   at
  
  
 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
  
   at scala.collection.immutable.Set$Set1.foreach(Set.scala:74)
  
   at
 scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
  
   at
  
  
 
 scala.collection.AbstractSet.scala$collection$SetLike$$super$map(Set.scala:47)
  
   at scala.collection.SetLike$class.map(SetLike.scala:93)
  
   at scala.collection.AbstractSet.map(Set.scala:47)
  
   at
  
 
 org.apache.samza.container.SamzaContainer$.apply(SamzaContainer.scala:416)
  
   at
  
  
 
 org.apache.samza.job.local.ThreadJobFactory.getJob(ThreadJobFactory.scala:63)
  
   at org.apache.samza.job.JobRunner.run(JobRunner.scala:62)
  
   at org.apache.samza.job.JobRunner$.main(JobRunner.scala:37)
  
   at org.apache.samza.job.JobRunner.main(JobRunner.scala)
  
   On Mon, Jun 29, 2015 at 12:41 PM, Shekar Tippur ctip...@gmail.com
  wrote:
  
Yi,
   
My use case is more of the latter. Your explanation makes sense now.
 I
   was
also looking into Milinda's wiki. She has a section for Kafka
partition SimplePartitioner, which is simple enough as well.
   
Thanks for all the inputs. Let me see what I come up with while
implementing it.
   
- Shekar
   
On Mon, Jun 29, 2015 at 10:42 AM, Yi Pan nickpa...@gmail.com
 wrote:
   
Hi, Shekar,
   
First, I would like to clarify what you meant by sliding window: is
 it
defined as windows with size N and advance step size of 1 (which
 means
that
windows overlap and each input message would contribute to multiple
   counts
in different windows)? Or windows with size N and advance step size
  of N
(i.e. each incoming message only contribute to one counter in a
 single
window)?
   
If your use case falls into the first category, you will need
  something
more sophisticated as discussed in SAMZA-552. If your use case is
 the
second one, there could be a simpler version of SAMZA-552 that you
 can
   go
with:
   
1) Initiate a KV-store that uses the application name as the key
2) For each incoming message, look for the windows that the message
 by
   the
application name
3) Update the counter and update the value in the KV-store based on
  the
application name
4) Every 5 min when window() method is triggered, set all counters
 to
   zero
(this can be done in a lazy way as well, by keeping the last reset
timestamp in the record in the KV-store, keyed by application name.
   Then,
resetting counter to zero can be done when next time the application
counter is updated again)
   
Hope that makes sense.
   
-Yi
   
On Mon, Jun 29, 2015 at 10:06 AM, Shekar Tippur ctip...@gmail.com
wrote:
   
 Benjamin

Re: Review Request 35933: SAMZA-449 Expose RocksDB statistic

2015-06-30 Thread Yan Fang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/35933/#review89902
---



samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
 (line 150)
https://reviews.apache.org/r/35933/#comment142781

it should accept the metrics (from line 143) as the third parameter.



samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
 (line 243)
https://reviews.apache.org/r/35933/#comment142780

add some doc here, saying something like, calling this method will expose 
RocksDB statistic to a metric, etc.



samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbStatistic.scala
 (line 25)
https://reviews.apache.org/r/35933/#comment142776

It's better to use the name RocksDbStatisticMetrics to indicate it is a 
metrics.



samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbStatistic.scala
 (line 40)
https://reviews.apache.org/r/35933/#comment142777

can we use the lower case for the all names of metrics? It's better to keep 
the name convension consistent in the Samza.


- Yan Fang


On June 26, 2015, 5:56 p.m., Gustavo Anatoly F. V. Solís wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/35933/
 ---
 
 (Updated June 26, 2015, 5:56 p.m.)
 
 
 Review request for samza.
 
 
 Repository: samza
 
 
 Description
 ---
 
 RocksDB statistic
 
 
 Diffs
 -
 
   
 samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
  a423f7bd6c43461e051b5fd1f880dd01db785991 
   
 samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbStatistic.scala
  PRE-CREATION 
   
 samza-kv-rocksdb/src/test/scala/org/apache/samza/storage/kv/TestRocksDbKeyValueStore.scala
  a428a16bc1e9ab4980a6f17db4fd810057d31136 
 
 Diff: https://reviews.apache.org/r/35933/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Gustavo Anatoly F. V. Solís
 




Re: [VOTE] Apache Samza 0.9.1 RC1

2015-06-30 Thread Yan Fang
+1

Verified MD5, Signature.

Tested locally.

Thanks,

Fang, Yan
yanfang...@gmail.com

On Sun, Jun 28, 2015 at 12:31 PM, Yi Pan nickpa...@gmail.com wrote:

 Hey all,

 This is a call for a vote on a release of Apache Samza 0.9.1. This is a
 bug-fix release against 0.9.0.

 The release candidate can be downloaded from here:

 http://people.apache.org/~nickpan47/samza-0.9.1-rc1/

 The release candidate is signed with pgp key 911402D8, which is
 included in the repository's KEYS file:


 https://git-wip-us.apache.org/repos/asf?p=samza.git;a=blob_plain;f=KEYS;hb=6f5bafb6cd93934781161eb6b1868d11ea347c95

 and can also be found on keyservers:

 http://pgp.mit.edu/pks/lookup?op=getsearch=0x911402D8

 The git tag is release-0.9.1-rc1 and signed with the same pgp key:


 https://git-wip-us.apache.org/repos/asf?p=samza.git;a=tag;h=e78b9e7f34650538b4bb68b338eb472b98a5709e

 Test binaries have been published to Maven's staging repository, and are
 available here:
 https://repository.apache.org/content/repositories/orgapachesamza-1007/

 Note release 0.9.1 is still supporting JDK6 and the binaries were built
 with JDK6 without incident.

 6 critical bugs were resolved for this release:


 https://issues.apache.org/jira/browse/SAMZA-715?jql=project%20%3D%20SAMZA%20AND%20fixVersion%20%3D%200.9.1%20AND%20status%20in%20%28Resolved%2C%20Closed%29

 The vote will be open for 72 hours ( end in 12:00pm Wed, 07/01/2015 ).
 Please download the release candidate, check the hashes/signature, build it
 and test it, and then please vote:

 [ ] +1 approve
 [ ] +0 no opinion
 [ ] -1 disapprove (and reason why)



Re: Review Request 35918: SAMZA-709 Monitoring page for REST API and the dashboard

2015-06-26 Thread Yan Fang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/35918/#review89590
---



docs/learn/documentation/versioned/jobs/web-ui-rest-api.md (line 33)
https://reviews.apache.org/r/35918/#comment142228

after SAMZA-418, the dashboard is a little different with new information, 
will you be able to update the dashboard screenshot ?



docs/learn/documentation/versioned/jobs/web-ui-rest-api.md (lines 39 - 44)
https://reviews.apache.org/r/35918/#comment142229

This seems not working correclty. It loses format. I think we can direclty 
use table html tags like what samza-container.md does.


- Yan Fang


On June 26, 2015, 11:50 a.m., Aleksandar Bircakovic wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/35918/
 ---
 
 (Updated June 26, 2015, 11:50 a.m.)
 
 
 Review request for samza.
 
 
 Repository: samza
 
 
 Description
 ---
 
 Added new monitoring page for REST API and the dashboard and removed 
 dashboard from ApplicationMaster. Also added table that shortly explains REST 
 service.
 
 
 Diffs
 -
 
   docs/learn/documentation/versioned/index.html e1b9f2d 
   docs/learn/documentation/versioned/jobs/reprocessing.md 28d9925 
   docs/learn/documentation/versioned/jobs/web-ui-rest-api.md PRE-CREATION 
 
 Diff: https://reviews.apache.org/r/35918/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Aleksandar Bircakovic
 




Re: [VOTE] Apache Samza 0.9.1 RC0

2015-06-25 Thread Yan Fang
no objection from me. :)

Thanks,

Fang, Yan
yanfang...@gmail.com

On Thu, Jun 25, 2015 at 4:18 PM, Yi Pan nickpa...@gmail.com wrote:

 Hi, all,

 I have been preparing for the new 0.9.1 RC1 and it is close to be done. I
 am going to cancel this VOTE, if no objections.

 Thanks!

 On Mon, Jun 22, 2015 at 5:41 PM, Yan Fang yanfang...@gmail.com wrote:

  Hi Yi,
 
  This only publishes the artifacts to the staging repository for testing.
  After completing the vote, you can release the artifacts to the public
  repository by clicking the release button. :)
 
  Thanks,
 
  Fang, Yan
  yanfang...@gmail.com
 
  On Mon, Jun 22, 2015 at 5:30 PM, Yi Pan nickpa...@gmail.com wrote:
 
   Hi, Yan,
  
   Thanks for point out that! Actually I saw that last time and had the
   following question: should we publish the artifacts after the VOTE is
   completed or together w/ the VOTE?
   It seems like that we want to publish the binary artifacts together w/
  the
   VOTE, right?
  
   -Yi
  
  
   On Mon, Jun 22, 2015 at 5:25 PM, Yan Fang yanfang...@gmail.com
 wrote:
  
Hi Yi Pan,
   
 Is there any document regarding to how to publish the maven staging
   link?

   
  -- Yes. Check the last part of the
https://github.com/apache/samza/blob/master/RELEASE.md . Not sure if
  you
have seen this. I should have pointed it out earlier. *_*
   
Thanks,
   
Fang, Yan
yanfang...@gmail.com
   
On Mon, Jun 22, 2015 at 3:47 PM, Yi Pan nickpa...@gmail.com wrote:
   
 Hi, guys,

 I am working on the list of things posted by Yan:

 1. I have the difficulty in building the 0.9.1 branch. I think this
  is
 mainly related to SAMZA-721
 https://issues.apache.org/jira/browse/SAMZA-721.

 This seems to be an invalid case in 0.9.1. We only need the
 joint-compilation option in master.

 2. Also, https://issues.apache.org/jira/browse/SAMZA-712 seems
   bothering
 people as well.

 Committed to master and backported to 0.9.1.

 3. https://issues.apache.org/jira/browse/SAMZA-720 is a critical
 bug
   we
 need to fix. Have already attached a patch.

 Plan to backport to 0.9.1.

 4. There is no maven staging link.

 Is there any document regarding to how to publish the maven staging
   link?

 On Mon, Jun 22, 2015 at 3:02 PM, Naveen Somasundaram 
 nsomasunda...@linkedin.com.invalid wrote:

  Hey Yan,
 SAMZA-721 might be because you checkout master and
 switched
  to 0.9.1 branch, and you still have some files from master which
  git
   is
 not
  tracking. Can you try a git clean before you build 0.9.1 ?  AFAIK
  you
 don't
  need joint compilation for core in 0.9.1.
 
  On Mon, Jun 22, 2015 at 1:25 PM, Roger Hoover 
   roger.hoo...@gmail.com
  wrote:
 
   Yan,
  
   I tested to patch locally and it looks good.  Creating a
 patched
 release
   for myself to test in our environment.  Thanks, again.
  
   Sent from my iPhone
  
On Jun 22, 2015, at 10:59 AM, Yi Pan nickpa...@gmail.com
   wrote:
   
Hi, Yan,
   
Thanks a lot for the quick fix on the mentioned bugs. It
 seems
   the
 fix
   for
SAMZA-720 is pretty localized and I am OK to push it into
  0.9.1.
   I
 will
   be
working on back porting those changes to 0.9.1 later today
 and
   fix
 all
   the
release related issues.
   
Thanks!
   
-Yi
   
On Mon, Jun 22, 2015 at 10:30 AM, Roger Hoover 
 roger.hoo...@gmail.com
  
wrote:
   
Yan,
   
You rock.  Thank you so much for the quick fix.  I'm working
  on
  building
and testing the patch.
   
Cheers,
   
Roger
   
On Mon, Jun 22, 2015 at 1:09 AM, Yan Fang 
   yanfang...@gmail.com
   wrote:
   
Hi guys,
   
1. I have the difficulty in building the 0.9.1 branch. I
  think
this
  is
mainly related to SAMZA-721
https://issues.apache.org/jira/browse/SAMZA-721.
   
2. Also, https://issues.apache.org/jira/browse/SAMZA-712
  seems
   bothering
people as well.
   
3. https://issues.apache.org/jira/browse/SAMZA-720 is a
   critical
 bug
   we
need to fix. Have already attached a patch.
   
4. There is no maven staging link.
   
Thanks,
   
Fang, Yan
yanfang...@gmail.com
   
On Sun, Jun 21, 2015 at 1:53 PM, Roger Hoover 
  roger.hoo...@gmail.com
wrote:
   
Hi all,
   
Do you think we could get this bootstrapping bug fixed
  before
 0.9.1
release?  It seems like a critical bug.
   
https://issues.apache.org/jira/browse/SAMZA-720
   
Thanks,
   
Roger
   
On Sat, Jun

Re: Review Request 35492: SAMZA-701 : Hello Samza - Port docker setup from hadoop-common

2015-06-25 Thread Yan Fang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/35492/#review89451
---



README.md (line 9)
https://reviews.apache.org/r/35492/#comment142082

prefer to set use the name start-docker-env.sh to make it more speicific.



dev-support/docker/Dockerfile (line 29)
https://reviews.apache.org/r/35492/#comment142083

single line apt-get is not recommended. 

From https://docs.docker.com/articles/dockerfile_best-practices/ : Don’t 
do RUN apt-get update on a single line. This will cause caching issues if the 
referenced archive gets updated, which will make your subsequent apt-get 
install fail without comment.


- Yan Fang


On June 16, 2015, 7:36 a.m., Darrell Taylor wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/35492/
 ---
 
 (Updated June 16, 2015, 7:36 a.m.)
 
 
 Review request for samza.
 
 
 Repository: samza-hello-samza
 
 
 Description
 ---
 
 Take the really useful docker setup from avro and hadoop-common and make it 
 work for hello samza
 
 
 Diffs
 -
 
   README.md 4463454 
   conf/yarn-site.xml 9028590 
   dev-support/docker/Dockerfile PRE-CREATION 
   dev-support/docker/hadoop_env_checks.sh PRE-CREATION 
   start-env.sh PRE-CREATION 
 
 Diff: https://reviews.apache.org/r/35492/diff/
 
 
 Testing
 ---
 
 * Run ./start-env.sh from the top level directory
 * Followed the instructions from Start a Grid on thsi page : 
 http://samza.apache.org/startup/hello-samza/0.8/
 
 
 Thanks,
 
 Darrell Taylor
 




Re: 3 processed message per incoming message

2015-06-23 Thread Yan Fang
Hi Shekar,

My guess is that, in order to get this property to take effect, you may
need to restart the yarn. Otherwise, send this question to the Yarn mailing
list, you may get more valuable answers.

Thanks,

Fang, Yan
yanfang...@gmail.com

On Mon, Jun 22, 2015 at 2:43 PM, Shekar Tippur ctip...@gmail.com wrote:

 Looks like when I kill an application ID, the next application ID in the
 pending queue takes over.
 As there are 2 running jobs, I see that for every in coming event, there
 are 2 outputs.
 a. How do I force to have just 1 running job
 b. How to force the number of application pending to a smaller number?

 I looked at the document -

 https://hadoop.apache.org/docs/r2.4.1/hadoop-yarn/hadoop-yarn-site/CapacityScheduler.html
 I have changed the property on capacity-scheduler.xml to a low number. I
 still see 10k apps in pending state.


   property

 nameyarn.scheduler.capacity.maximum-applications/name

 value2/value

 description

   Maximum number of applications that can be pending and running.

 /description

   /property



Re: 3 processed message per incoming message

2015-06-23 Thread Yan Fang
Hi Shekar,

Cool. Let me know if it works.

Thanks,

Fang, Yan
yanfang...@gmail.com

On Tue, Jun 23, 2015 at 12:40 PM, Shekar Tippur ctip...@gmail.com wrote:

 Yan,

 I have restarted Yarn but I still see the same issue. I will post this
 question on Yarn mailing list.
 yarn-...@hadoop.apache.org

 - Shekar



Re: Review Request 34974: SAMZA-676: implement broadcast stream

2015-06-22 Thread Yan Fang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34974/
---

(Updated June 22, 2015, 6:07 a.m.)


Review request for samza.


Changes
---

updated according to Navina's comments. Added docs.


Bugs: SAMZA-676
https://issues.apache.org/jira/browse/SAMZA-676


Repository: samza


Description
---

1. added offsetComparator method in SystemAdmin Interface

2. added task.global.inputs config

3. rewrote Grouper classes using Java; allows to assign global streams during 
grouping

4. used LinkedHashSet instead of HashSet in CoordinatorStreamSystemConsumer to 
preserve messages order

5. added taskNames to the offsets in OffsetManager

6. allowed to assign one SSP to multiple taskInstances

7. skipped already-processed messages in RunLoop

8. unit tests for all changes


Diffs (updated)
-

  checkstyle/import-control.xml 3374f0c 
  docs/learn/documentation/versioned/container/samza-container.md 9f46414 
  docs/learn/documentation/versioned/jobs/configuration-table.html 405e2ce 
  samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java 7a588eb 
  
samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java
 249b8ae 
  samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java 
PRE-CREATION 
  
samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupByPartition.java
 PRE-CREATION 
  
samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupByPartitionFactory.java
 PRE-CREATION 
  
samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartition.java
 PRE-CREATION 
  
samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartitionFactory.java
 PRE-CREATION 
  samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala 
20e5d26 
  samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala e4b14f4 
  samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala c292ae4 
  samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
cbacd18 
  samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala 
c5a5ea5 
  
samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala 
9dc7051 
  
samza-core/src/main/scala/org/apache/samza/container/grouper/stream/GroupByPartition.scala
 44e95fc 
  
samza-core/src/main/scala/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartition.scala
 3c0acad 
  
samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala
 097f410 
  samza-core/src/test/java/org/apache/samza/config/TestTaskConfigJava.java 
PRE-CREATION 
  
samza-core/src/test/java/org/apache/samza/container/grouper/stream/TestGroupByPartition.java
 PRE-CREATION 
  
samza-core/src/test/java/org/apache/samza/container/grouper/stream/TestGroupBySystemStreamPartition.java
 PRE-CREATION 
  samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala 
8d54c46 
  samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala 
64a5844 
  samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala 
9fb1aa9 
  samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala 
7caad28 
  
samza-core/src/test/scala/org/apache/samza/container/grouper/stream/GroupByTestBase.scala
 a14169b 
  
samza-core/src/test/scala/org/apache/samza/container/grouper/stream/TestGroupByPartition.scala
 74daf72 
  
samza-core/src/test/scala/org/apache/samza/container/grouper/stream/TestGroupBySystemStreamPartition.scala
 deb3895 
  
samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala 
d9ae187 
  
samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala 
35086f5 
  
samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
 de00320 
  
samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
 1629035 
  
samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala
 2a84328 
  samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java 
b063366 
  
samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
 1e936b4 

Diff: https://reviews.apache.org/r/34974/diff/


Testing
---


Thanks,

Yan Fang



Re: Review Request 35723: SAMZA-720: fix bootstrap hangs when container number 1

2015-06-22 Thread Yan Fang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/35723/
---

(Updated June 23, 2015, 4:53 a.m.)


Review request for samza.


Changes
---

added the unit test.


Bugs: SAMZA-720
https://issues.apache.org/jira/browse/SAMZA-720


Repository: samza


Description
---

remove the unregistered ssps in the laggingSsp set.


Diffs (updated)
-

  
samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala
 dd500b9 
  
samza-core/src/test/scala/org/apache/samza/system/chooser/TestBootstrappingChooser.scala
 3c2693c 

Diff: https://reviews.apache.org/r/35723/diff/


Testing
---


Thanks,

Yan Fang



Re: Samza hung after bootstrapping

2015-06-21 Thread Yan Fang
Hi Roger,

I will try to look at the issue tomorrow if my time allows.

First thing first:

The build has some unexpected results. A quick fix:

1. apply https://issues.apache.org/jira/browse/SAMZA-712
2. add

sourceSets.main.scala.srcDir src/main/java sourceSets.main.java.srcDirs =
[]

at line 126 of build.gradle.

Sorry for the inconvenience.

Thanks,

Fang, Yan
yanfang...@gmail.com

On Sun, Jun 21, 2015 at 3:55 PM, Roger Hoover roger.hoo...@gmail.com
wrote:

 Was looking through the code a little and it looks like the
 BootstrappingChooser could use the list of SSPs passed into it's register()
 method to figure out which partitions it need to monitor.

 I wanted to try to build Samza to play around with it but I'm getting error
 trying to build off of both the 0.9.0 and 0.9.1 branches.

 thedude:samza (0.9.1) $ ./gradlew clean build

 To honour the JVM settings for this build a new JVM will be forked. Please
 consider using the daemon:
 http://gradle.org/docs/2.0/userguide/gradle_daemon.html.

 :clean

 :samza-api:clean

 :samza-core_2.10:clean

 :samza-kafka_2.10:clean UP-TO-DATE

 :samza-kv-inmemory_2.10:clean UP-TO-DATE

 :samza-kv-rocksdb_2.10:clean UP-TO-DATE

 :samza-kv_2.10:clean UP-TO-DATE

 :samza-log4j:clean UP-TO-DATE

 :samza-shell:clean UP-TO-DATE

 :samza-test_2.10:clean UP-TO-DATE

 :samza-yarn_2.10:clean UP-TO-DATE

 :assemble UP-TO-DATE

 :rat

 Rat report: build/rat/rat-report.html

 :check

 :build

 :samza-api:compileJava

 :samza-api:processResources UP-TO-DATE

 :samza-api:classes

 :samza-api:jar

 :samza-api:javadoc


 /Users/rhoover/Work/samza/samza-api/src/main/java/org/apache/samza/task/TaskContext.java:49:
 warning: no @param for ssp

   void setStartingOffset(SystemStreamPartition ssp, String offset);

^


 /Users/rhoover/Work/samza/samza-api/src/main/java/org/apache/samza/task/TaskContext.java:49:
 warning: no @param for offset

   void setStartingOffset(SystemStreamPartition ssp, String offset);

^

 2 warnings

 :samza-api:javadocJar

 :samza-api:sourcesJar

 :samza-api:signArchives SKIPPED

 :samza-api:assemble

 :samza-api:compileTestJava

 :samza-api:processTestResources UP-TO-DATE

 :samza-api:testClasses

 :samza-api:test

 :samza-api:check

 :samza-api:build

 :samza-core_2.10:compileJava

 :samza-core_2.10:compileScala

 [ant:scalac]

 /Users/rhoover/Work/samza/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala:43:
 error: object SamzaObjectMapper is not a member of package
 org.apache.samza.serializers.model

 [ant:scalac] import org.apache.samza.serializers.model.SamzaObjectMapper

 [ant:scalac]^

 [ant:scalac]

 /Users/rhoover/Work/samza/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala:40:
 error: object TaskModel is not a member of package
 org.apache.samza.job.model

 [ant:scalac] import org.apache.samza.job.model.TaskModel

 [ant:scalac]^

 ...


 I've got JDK 8 installed.  Wondering that makes a difference or not.  I'd
 appreciate any help.

 Thanks,

 Roger



 On Sun, Jun 21, 2015 at 1:02 PM, Roger Hoover roger.hoo...@gmail.com
 wrote:

  I think I see what's happening.
 
  When there are 8 tasks and I set yarn.container.count=8, then each
  container is responsible for a single task.  However, the
  systemStreamLagCounts map (
 
 https://github.com/apache/samza/blob/0.9.0/samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala#L77
 )
  and laggingSystemStreamPartitions (
 
 https://github.com/apache/samza/blob/0.9.0/samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala#L83
 )
  are configured to track all partitions for the bootstrap topic rather
 than
  just the one partition assigned to this task.
 
  Later in the log, we see that the task/container completed bootstrap for
  it's own partition.
 
  2015-06-21 12:28:55 org.apache.samza.system.chooser.BootstrappingChooser
  [DEBUG] Bootstrap stream partition is fully caught up:
  SystemStreamPartition [kafka, deploy.svc.tlrnsZOYQA6wrwAA4FLqZA, 0]
 
  but the Bootstrapping Chooser still thinks that the remaining partitions
  (assigned to other tasks in other containers) need to be completed.  JMX
 at
  this point shows 7 lagging partitions of the 8 original partition count.
 
  I'm wondering why no one has run into this.  Doesn't LinkedIn use
  partitioned bootstrapped topics?
 
  Thanks,
 
  Roger
 
  On Sun, Jun 21, 2015 at 12:22 PM, Roger Hoover roger.hoo...@gmail.com
  wrote:
 
  Hi Yan,
 
  I've uploaded a file with TRACE level logging here:
  http://filebin.ca/261yhsTZcZQZ/samza-container-0.log.gz
 
  I really appreciate your help as this is a critical issue for me.
 
  Thanks,
 
  Roger
 
  On Fri, Jun 19, 2015 at 12:05 PM, Yan Fang yanfang...@gmail.com
 wrote:
 
  Hi Roger,
 
   but it only spawns one container and still hangs after bootstrap
  -- this probably is due to your local machine does not have enough
  resource for the second container

Re: Review Request 33419: SAMZA-625: Provide tool to consume changelog and materialize a state store

2015-06-21 Thread Yan Fang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33419/
---

(Updated June 21, 2015, 6:10 a.m.)


Review request for samza.


Changes
---

update based on Navina's comment.


Bugs: SAMZA-625
https://issues.apache.org/jira/browse/SAMZA-625


Repository: samza


Description
---

Implemented in Java.

* modified build.gradle to have the gradle compile scala first. Because some 
jave code has dependencies to Scala code
* change the state store name by removing the space ( in TaskManager )
* add scala java conversion method in Util because some classes only accept 
scala map
* add java version of some configs 
* remove duplicated config in samza-log4j
* add StorageRevoery class, which does most of the recoverying job. The logic 
mimics what happens in SamzaContainer.
* add StateStorageTool, for the commandline usage
* unit tests
* docs


Diffs (updated)
-

  checkstyle/import-control.xml 3374f0c 
  docs/learn/documentation/versioned/container/state-management.md 79067bb 
  samza-core/src/main/java/org/apache/samza/config/JavaStorageConfig.java 
PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/config/JavaSystemConfig.java 
PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/storage/StateStorageTool.java 
PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java 
PRE-CREATION 
  samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
aeba61a 
  samza-core/src/main/scala/org/apache/samza/util/Util.scala 2feb65b 
  samza-core/src/test/java/org/apache/samza/config/TestJavaStorageConfig.java 
PRE-CREATION 
  samza-core/src/test/java/org/apache/samza/config/TestJavaSystemConfig.java 
PRE-CREATION 
  samza-core/src/test/java/org/apache/samza/storage/MockStorageEngine.java 
PRE-CREATION 
  
samza-core/src/test/java/org/apache/samza/storage/MockStorageEngineFactory.java 
PRE-CREATION 
  samza-core/src/test/java/org/apache/samza/storage/MockSystemConsumer.java 
PRE-CREATION 
  samza-core/src/test/java/org/apache/samza/storage/MockSystemFactory.java 
PRE-CREATION 
  samza-core/src/test/java/org/apache/samza/storage/TestStorageRecovery.java 
PRE-CREATION 
  samza-log4j/src/main/java/org/apache/samza/config/Log4jSystemConfig.java 
d5e24f2 
  samza-shell/src/main/bash/state-storage-tool.sh PRE-CREATION 

Diff: https://reviews.apache.org/r/33419/diff/


Testing
---

tested with multiple partitions and multiple stores recovery.


Thanks,

Yan Fang



Re: Review Request 33419: SAMZA-625: Provide tool to consume changelog and materialize a state store

2015-06-21 Thread Yan Fang


 On May 21, 2015, 6:45 p.m., Navina Ramesh wrote:
  samza-core/src/main/java/org/apache/samza/config/JavaStorageConfig.java, 
  line 28
  https://reviews.apache.org/r/33419/diff/3/?file=965787#file965787line28
 
  Is moving away from scala based configs a motivation for all the 
  Java prefixed classes?
  
  If the plan is to refactor configs everywhere with Java based configs, 
  then we should probably extend these classes from 
  org.apache.samza.config.MapConfig. It provides some convenient methods 
  because it is backed by a Map. Just curious if you explored that option.

This makes sense. I omitted this option. Thanks. Using MapConfig now. Also 
added unit test to verify that.


 On May 21, 2015, 6:45 p.m., Navina Ramesh wrote:
  samza-core/src/main/java/org/apache/samza/config/JavaSystemConfig.java, 
  line 33
  https://reviews.apache.org/r/33419/diff/3/?file=965788#file965788line33
 
  Why is Config protected here and private in JavaStorageConfig?

fixed in above change.


 On May 21, 2015, 6:45 p.m., Navina Ramesh wrote:
  samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala,
   line 39
  https://reviews.apache.org/r/33419/diff/3/?file=965791#file965791line39
 
  why not sanitize the entire file name, instead of just the task name?
  Personally, I think we should follow a standard convention for 
  sanitizing names, irrespective of whether it is storename or taskname. Just 
  my two cents.

aggreed.


- Yan


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33419/#review84612
---


On June 21, 2015, 6:10 a.m., Yan Fang wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/33419/
 ---
 
 (Updated June 21, 2015, 6:10 a.m.)
 
 
 Review request for samza.
 
 
 Bugs: SAMZA-625
 https://issues.apache.org/jira/browse/SAMZA-625
 
 
 Repository: samza
 
 
 Description
 ---
 
 Implemented in Java.
 
 * modified build.gradle to have the gradle compile scala first. Because some 
 jave code has dependencies to Scala code
 * change the state store name by removing the space ( in TaskManager )
 * add scala java conversion method in Util because some classes only accept 
 scala map
 * add java version of some configs 
 * remove duplicated config in samza-log4j
 * add StorageRevoery class, which does most of the recoverying job. The logic 
 mimics what happens in SamzaContainer.
 * add StateStorageTool, for the commandline usage
 * unit tests
 * docs
 
 
 Diffs
 -
 
   checkstyle/import-control.xml 3374f0c 
   docs/learn/documentation/versioned/container/state-management.md 79067bb 
   samza-core/src/main/java/org/apache/samza/config/JavaStorageConfig.java 
 PRE-CREATION 
   samza-core/src/main/java/org/apache/samza/config/JavaSystemConfig.java 
 PRE-CREATION 
   samza-core/src/main/java/org/apache/samza/storage/StateStorageTool.java 
 PRE-CREATION 
   samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java 
 PRE-CREATION 
   samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
 aeba61a 
   samza-core/src/main/scala/org/apache/samza/util/Util.scala 2feb65b 
   samza-core/src/test/java/org/apache/samza/config/TestJavaStorageConfig.java 
 PRE-CREATION 
   samza-core/src/test/java/org/apache/samza/config/TestJavaSystemConfig.java 
 PRE-CREATION 
   samza-core/src/test/java/org/apache/samza/storage/MockStorageEngine.java 
 PRE-CREATION 
   
 samza-core/src/test/java/org/apache/samza/storage/MockStorageEngineFactory.java
  PRE-CREATION 
   samza-core/src/test/java/org/apache/samza/storage/MockSystemConsumer.java 
 PRE-CREATION 
   samza-core/src/test/java/org/apache/samza/storage/MockSystemFactory.java 
 PRE-CREATION 
   samza-core/src/test/java/org/apache/samza/storage/TestStorageRecovery.java 
 PRE-CREATION 
   samza-log4j/src/main/java/org/apache/samza/config/Log4jSystemConfig.java 
 d5e24f2 
   samza-shell/src/main/bash/state-storage-tool.sh PRE-CREATION 
 
 Diff: https://reviews.apache.org/r/33419/diff/
 
 
 Testing
 ---
 
 tested with multiple partitions and multiple stores recovery.
 
 
 Thanks,
 
 Yan Fang
 




Re: [VOTE] Apache Samza 0.9.1 RC0

2015-06-20 Thread Yan Fang
Agree. I will test it this weekend.

Thanks,

Fang, Yan
yanfang...@gmail.com

On Sat, Jun 20, 2015 at 3:46 PM, Guozhang Wang wangg...@gmail.com wrote:

 Since we only get one vote so far, I think I have to extend the vote
 deadline. Let's set it to next Monday 6pm.

 Please check the candidate and vote for your opinions.

 Guozhang

 On Fri, Jun 19, 2015 at 10:03 AM, Yi Pan nickpa...@gmail.com wrote:

  +1. Ran the Samza failure test suite and succeeded over night.
 
  On Wed, Jun 17, 2015 at 5:54 PM, Guozhang Wang wangg...@gmail.com
 wrote:
 
   Hey all,
  
   This is a call for a vote on a release of Apache Samza 0.9.1. This is a
   bug-fix release against 0.9.0.
  
   The release candidate can be downloaded from here:
  
   http://people.apache.org/~guozhang/samza-0.9.1-rc0/
  
   The release candidate is signed with pgp key 911402D8, which is
   included in the repository's KEYS file:
  
  
  
 
 https://git-wip-us.apache.org/repos/asf?p=samza.git;a=blob_plain;f=KEYS;hb=6f5bafb6cd93934781161eb6b1868d11ea347c95
  
   and can also be found on keyservers:
  
   http://pgp.mit.edu/pks/lookup?op=getsearch=0x911402D8
  
   The git tag is release-0.9.1-rc0 and signed with the same pgp key:
  
  
  
 
 https://git-wip-us.apache.org/repos/asf?p=samza.git;a=tag;h=e78b9e7f34650538b4bb68b338eb472b98a5709e
  
   Test binaries have been published to Maven's staging repository, and
 are
   available here:
  
   5 critical bugs were resolved for this release:
  
  
  
 
 https://issues.apache.org/jira/browse/SAMZA-715?jql=project%20%3D%20SAMZA%20AND%20fixVersion%20%3D%200.9.1%20AND%20status%20in%20%28Resolved%2C%20Closed%29
  
   The vote will be open for 72 hours ( end in 6:00pm Saturday, 06/20/2015
  ).
   Please download the release candidate, check the hashes/signature,
 build
  it
   and test it, and then please vote:
  
   [ ] +1 approve
   [ ] +0 no opinion
   [ ] -1 disapprove (and reason why)
  
   -- Guozhang
  
 



 --
 -- Guozhang



Re: Samza hung after bootstrapping

2015-06-19 Thread Yan Fang
Hi Roger,

 but it only spawns one container and still hangs after bootstrap
-- this probably is due to your local machine does not have enough
resource for the second container. Because I checked your log file, each
container is about 4GB.

When I run it on our YARN cluster with a single container, it works
correctly.  When I tried it with 5 containers, it gets hung after consuming
the bootstrap topic.
   -- Have you figure it out? I have a looked at your log and also the
code. My suspect is that, there is a null enveloper somehow blocking the
process. If you can paste the trace level log, it will be more helpful
because many logs in chooser are trace level.

Thanks,

Fang, Yan
yanfang...@gmail.com

On Thu, Jun 18, 2015 at 5:20 PM, Roger Hoover roger.hoo...@gmail.com
wrote:

 I need some help.  I have a job which bootstraps one stream and then is
 supposed to read from two.  When I run it on our YARN cluster with a single
 container, it works correctly.  When I tried it with 5 containers, it gets
 hung after consuming the bootstrap topic.  I ran it with the grid script on
 my laptop (Mac OS X) with yarn.container.count=2 but it only spawns one
 container and still hangs after bootstrap.

 Debug logs are here: http://pastebin.com/af3KPvju

 I looked at JMX metrics and see:
 - Task Metrics - no value for kafka offset of non-bootstrapped stream
 -  SystemConsumerMetrics
 - choose null keeps incrementing
  - ssps-needed-by-chooser 1
   - unprocessed-messages 62k
 - Bootstrapping Chooser
   - lagging partitions 4
   - laggin-batch-streams - 4
   - batch-resets - 0

 Has anyone seen this or can offer ideas of how to better debug it?

 I'm using Samza 0.9.0 and YARN 2.4.0.

 Thanks!

 Roger



Re: Review Request 35606: SAMZA-716 One Link in Spark Streaming and Samza comparison page is broken

2015-06-18 Thread Yan Fang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/35606/#review88404
---

Ship it!


Ship It!

- Yan Fang


On June 18, 2015, 2:21 p.m., Aleksandar Bircakovic wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/35606/
 ---
 
 (Updated June 18, 2015, 2:21 p.m.)
 
 
 Review request for samza.
 
 
 Repository: samza
 
 
 Description
 ---
 
 The post is now pointing to the correct link.
 
 
 Diffs
 -
 
   docs/learn/documentation/versioned/comparisons/spark-streaming.md e1ccc3e 
 
 Diff: https://reviews.apache.org/r/35606/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Aleksandar Bircakovic
 




Re: Review Request 35397: Fix SAMZA-697

2015-06-17 Thread Yan Fang


 On June 17, 2015, 9:03 p.m., Yan Fang wrote:
  samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala, 
  lines 103-111
  https://reviews.apache.org/r/35397/diff/2/?file=985837#file985837line103
 
  a little concernted about this. This means we will load the class every 
  time a message comes. Is this too much? 
  
  My suggestion is to put this code in RunLoop.runs, before the loop 
  starts. What do you think?
 
 Guozhang Wang wrote:
 A class will only be loaded once, so I think the overhead should be 
 minimal.
 
 My concern for putting this code in RunLoop is that some of the logic in 
 RunLoop like consumerMultiplexer.choose are platform code and any of its 
 referenced classes should not be loaded by the task class loader.

Oh, I guess my concern is more about the time spent in getContextClassLoader, 
setContextClassLoader, etc. They actually are useless after loading the class 
but called every time a message comes. Maybe we want to optimize it a little.

Another question is, why do we need to have taskClassLoader here if we already 
use it to instantiate the XXXStreamTask, which should reslove all the 
dependencies in the XXXStreamTask? Guess I have some misunderstanding in the 
classLoader.


- Yan


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/35397/#review88260
---


On June 16, 2015, 5:22 p.m., Guozhang Wang wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/35397/
 ---
 
 (Updated June 16, 2015, 5:22 p.m.)
 
 
 Review request for samza.
 
 
 Bugs: SAMZA-697
 https://issues.apache.org/jira/browse/SAMZA-697
 
 
 Repository: samza
 
 
 Description
 ---
 
 Use a separate post-delegate classloader for user-defined tasks
 
 
 Diffs
 -
 
   bin/check-all.sh 0725b82ed4f70b155f8bfe65cc6938d4647142d0 
   docs/learn/documentation/versioned/jobs/logging.md 
 1d13d151316bd51c7a3730e4045433b7968e 
   samza-core/src/main/java/org/apache/samza/task/TaskClassLoader.java 
 PRE-CREATION 
   samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala 
 0b3a235b5ab1d6bd60669bfe6023f6b0b4e943d3 
   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
 cbacd183420e9d1d72b05693b55a8f0a62d59fc5 
   samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala 
 c5a5ea5dea9a950fc741625238f5bf8b1f362180 
   samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala 
 1c178a661e449c6bdfc4ce431aef9bb2d261a6c2 
   
 samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala 
 4fac154709d72ab594485dad93c912b55fb1617e 
   samza-core/src/test/java/org/apache/samza/task/TestTaskClassLoader.java 
 PRE-CREATION 
   
 samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala 
 9fb1aa98fcd14397e8a4cb00c67537482e95fa53 
   samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala 
 7caad28c9298485753ab861da76793cf925953ed 
 
 Diff: https://reviews.apache.org/r/35397/diff/
 
 
 Testing
 ---
 
 unit tests
 
 
 Thanks,
 
 Guozhang Wang
 




Re: Review Request 35397: Fix SAMZA-697

2015-06-17 Thread Yan Fang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/35397/#review88260
---



samza-core/src/main/java/org/apache/samza/task/TaskClassLoader.java (line 44)
https://reviews.apache.org/r/35397/#comment140682

change the scope?



samza-core/src/main/java/org/apache/samza/task/TaskClassLoader.java (line 74)
https://reviews.apache.org/r/35397/#comment140715

also canonicalize the classNames in the  blacklistClassnames list?



samza-core/src/main/java/org/apache/samza/task/TaskClassLoader.java (line 75)
https://reviews.apache.org/r/35397/#comment140684

use if to be consistent?



samza-core/src/main/java/org/apache/samza/task/TaskClassLoader.java (line 115)
https://reviews.apache.org/r/35397/#comment140685

to be if? Just my $0.02



samza-core/src/main/java/org/apache/samza/task/TaskClassLoader.java (line 156)
https://reviews.apache.org/r/35397/#comment140704

log here



samza-core/src/main/java/org/apache/samza/task/TaskClassLoader.java (lines 162 
- 163)
https://reviews.apache.org/r/35397/#comment140705

doc may not be very precise. It's also possible that this class is 
blacklisted not ClassNotFoundException, right? I guess this is the reason you 
do not put line 164 in line 156 to simplify the code.



samza-core/src/main/java/org/apache/samza/task/TaskClassLoader.java (line 164)
https://reviews.apache.org/r/35397/#comment140706

you mean, parent.loadClass(name), right?



samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala (lines 40 - 
42)
https://reviews.apache.org/r/35397/#comment140717

add those to configuration table as well, including what kind of format we 
accept.



samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala (line 112)
https://reviews.apache.org/r/35397/#comment140707

remove the space



samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
(lines 425 - 427)
https://reviews.apache.org/r/35397/#comment140708

do we want the TaskClassLoader to take care of the StreamTask itself? I 
thought we only used the TaskClassLoader to take care of what happens *inside* 
the StreamTask.



samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala (lines 
97 - 105)
https://reviews.apache.org/r/35397/#comment140709

a little concernted about this. This means we will load the class every 
time a message comes. Is this too much? 

My suggestion is to put this code in RunLoop.runs, before the loop starts. 
What do you think?



samza-core/src/test/java/org/apache/samza/task/TestTaskClassLoader.java (line 
88)
https://reviews.apache.org/r/35397/#comment140716

also want to test org/example/foo format


- Yan Fang


On June 16, 2015, 5:22 p.m., Guozhang Wang wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/35397/
 ---
 
 (Updated June 16, 2015, 5:22 p.m.)
 
 
 Review request for samza.
 
 
 Bugs: SAMZA-697
 https://issues.apache.org/jira/browse/SAMZA-697
 
 
 Repository: samza
 
 
 Description
 ---
 
 Use a separate post-delegate classloader for user-defined tasks
 
 
 Diffs
 -
 
   bin/check-all.sh 0725b82ed4f70b155f8bfe65cc6938d4647142d0 
   docs/learn/documentation/versioned/jobs/logging.md 
 1d13d151316bd51c7a3730e4045433b7968e 
   samza-core/src/main/java/org/apache/samza/task/TaskClassLoader.java 
 PRE-CREATION 
   samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala 
 0b3a235b5ab1d6bd60669bfe6023f6b0b4e943d3 
   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
 cbacd183420e9d1d72b05693b55a8f0a62d59fc5 
   samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala 
 c5a5ea5dea9a950fc741625238f5bf8b1f362180 
   samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala 
 1c178a661e449c6bdfc4ce431aef9bb2d261a6c2 
   
 samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala 
 4fac154709d72ab594485dad93c912b55fb1617e 
   samza-core/src/test/java/org/apache/samza/task/TestTaskClassLoader.java 
 PRE-CREATION 
   
 samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala 
 9fb1aa98fcd14397e8a4cb00c67537482e95fa53 
   samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala 
 7caad28c9298485753ab861da76793cf925953ed 
 
 Diff: https://reviews.apache.org/r/35397/diff/
 
 
 Testing
 ---
 
 unit tests
 
 
 Thanks,
 
 Guozhang Wang
 




Re: 3 processed message per incoming message

2015-06-16 Thread Yan Fang
Hi Shekar,

Ok. If there is only one application is running, if you kill this one, will
you still be able to see the processed messages coming? If not, I think the
code in your application maybe the cause of the problem. We can have a
further look at your code to see where the problem is.

Thanks,

Fang, Yan
yanfang...@gmail.com

On Tue, Jun 16, 2015 at 7:45 AM, Shekar Tippur ctip...@gmail.com wrote:

 This is what I see on Yarn monitoring page:

 As we can see, there are 9998 apps pending. There is some 10k limit we are
 hitting. I see only 1 app running.


 Apps SubmittedApps PendingApps RunningApps CompletedContainers
 RunningMemory
 UsedMemory TotalMemory ReservedVCores UsedVCores TotalVCores ReservedActive
 NodesDecommissioned NodesLost NodesUnhealthy NodesRebooted
 Nodes1000199981222
 GB8 GB0 B2801 http://sprdargas403.corp.intuit.net:8088/cluster/nodes0
 http://sprdargas403.corp.intuit.net:8088/cluster/nodes/decommissioned0
 http://sprdargas403.corp.intuit.net:8088/cluster/nodes/lost0
 http://sprdargas403.corp.intuit.net:8088/cluster/nodes/unhealthy0
 http://sprdargas403.corp.intuit.net:8088/cluster/nodes/rebooted
 Show 20406080100 entries
 Search:
 ID
 User
 Name
 Application Type
 Queue
 StartTime
 FinishTime
 State
 FinalStatus
 Progress
 Tracking UI
 application_1431716639228_0120
 
 http://sprdargas403.corp.intuit.net:8088/cluster/app/application_1431716639228_0120
 
 rootArgos_1SamzadefaultFri, 15 May 2015 19:10:21 GMTN/ARUNNINGUNDEFINED
 ApplicationMaster
 
 http://sprdargas403.corp.intuit.net:8088/proxy/application_1431716639228_0120/
 



  1   2   >