[jira] [Commented] (FLUME-2811) Taildir source doesn't call stop() on graceful shutdown

2016-10-06 Thread Mike Percy (JIRA)

[ 
https://issues.apache.org/jira/browse/FLUME-2811?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15551868#comment-15551868
 ] 

Mike Percy commented on FLUME-2811:
---

OK I think the bug is with PseudoTxnMemoryChannel. It is eating an 
InterruptedException and AFAICT that is one way the Flume lifecycle tries to 
kill its components. However I didn't do a full investigation.

Anyway my suggestion is to NOT use the PseudoTxnMemoryChannel at all -- in 
fact, I think it should be removed from the Flume tree.

Mike

> Taildir source doesn't call stop() on graceful shutdown
> ---
>
> Key: FLUME-2811
> URL: https://issues.apache.org/jira/browse/FLUME-2811
> Project: Flume
>  Issue Type: Bug
>  Components: Sinks+Sources
>Affects Versions: v1.7.0
>Reporter: Jun Seok Hong
>Assignee: Umesh Chaudhary
>Priority: Critical
>  Labels: newbie
> Fix For: v1.7.0
>
>
> Taildir source doesn't call stop() on graceful shutdown.
> Test configuration.
> source - taildir
> channel - PseudoTxnMemoryChannel / flume-kafka-channel
> sink - none
> I found that flume sometimes doesn't terminate with Taildir source. 
> I had to kill the process to terminate it.
> tailFileProcess() function in TaildirSource.java has a infinite loop.
> When the process interrupted, ChannelException will happen, but it can't 
> breaks the infinite loop.
> I think that's the reason why Taildir can't call stop() function.
> {code:title=TaildirSource.java|borderStyle=solid}
>  private void tailFileProcess(TailFile tf, boolean backoffWithoutNL)
>   throws IOException, InterruptedException {
> while (true) {
>   reader.setCurrentFile(tf);
>   List events = reader.readEvents(batchSize, backoffWithoutNL);
>   if (events.isEmpty()) {
> break;
>   }
>   sourceCounter.addToEventReceivedCount(events.size());
>   sourceCounter.incrementAppendBatchReceivedCount();
>   try {
> getChannelProcessor().processEventBatch(events);
> reader.commit();
>   } catch (ChannelException ex) {
> logger.warn("The channel is full or unexpected failure. " +
>   "The source will try again after " + retryInterval + " ms");
> TimeUnit.MILLISECONDS.sleep(retryInterval);
> retryInterval = retryInterval << 1;
> retryInterval = Math.min(retryInterval, maxRetryInterval);
> continue;
>   }
>   retryInterval = 1000;
>   sourceCounter.addToEventAcceptedCount(events.size());
>   sourceCounter.incrementAppendBatchAcceptedCount();
>   if (events.size() < batchSize) {
> break;
>   }
> }
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLUME-2811) Taildir source doesn't call stop() on graceful shutdown

2016-10-06 Thread Mike Percy (JIRA)

[ 
https://issues.apache.org/jira/browse/FLUME-2811?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15551640#comment-15551640
 ] 

Mike Percy commented on FLUME-2811:
---

Hi [~siefried12] it would be very helpful if you could provide the full Flume 
configuration file as well as the output of several successive runs of 
{{jstack}} on the Flume process while it is hung. I tried to reproduce this and 
so far have not been able to do it.

> Taildir source doesn't call stop() on graceful shutdown
> ---
>
> Key: FLUME-2811
> URL: https://issues.apache.org/jira/browse/FLUME-2811
> Project: Flume
>  Issue Type: Bug
>  Components: Sinks+Sources
>Affects Versions: v1.7.0
>Reporter: Jun Seok Hong
>Assignee: Umesh Chaudhary
>Priority: Critical
>  Labels: newbie
> Fix For: v1.7.0
>
>
> Taildir source doesn't call stop() on graceful shutdown.
> Test configuration.
> source - taildir
> channel - PseudoTxnMemoryChannel / flume-kafka-channel
> sink - none
> I found that flume sometimes doesn't terminate with Taildir source. 
> I had to kill the process to terminate it.
> tailFileProcess() function in TaildirSource.java has a infinite loop.
> When the process interrupted, ChannelException will happen, but it can't 
> breaks the infinite loop.
> I think that's the reason why Taildir can't call stop() function.
> {code:title=TaildirSource.java|borderStyle=solid}
>  private void tailFileProcess(TailFile tf, boolean backoffWithoutNL)
>   throws IOException, InterruptedException {
> while (true) {
>   reader.setCurrentFile(tf);
>   List events = reader.readEvents(batchSize, backoffWithoutNL);
>   if (events.isEmpty()) {
> break;
>   }
>   sourceCounter.addToEventReceivedCount(events.size());
>   sourceCounter.incrementAppendBatchReceivedCount();
>   try {
> getChannelProcessor().processEventBatch(events);
> reader.commit();
>   } catch (ChannelException ex) {
> logger.warn("The channel is full or unexpected failure. " +
>   "The source will try again after " + retryInterval + " ms");
> TimeUnit.MILLISECONDS.sleep(retryInterval);
> retryInterval = retryInterval << 1;
> retryInterval = Math.min(retryInterval, maxRetryInterval);
> continue;
>   }
>   retryInterval = 1000;
>   sourceCounter.addToEventAcceptedCount(events.size());
>   sourceCounter.incrementAppendBatchAcceptedCount();
>   if (events.size() < batchSize) {
> break;
>   }
> }
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] Flume 1.7 release plan

2016-10-06 Thread Balazs Donat Bessenyei
Thank you all for the responses.

@Lior: I have reviewed the change you have mentioned (FLUME-2994). If
we manage to have the code done and +1/2-d soon, I'll commit the
patch.

@Denes: the change looks good to me. I'll leave some time for others
to review, though.

@Saikat: I've wanted to put integrations in the User Guide for Flume
for so long. For example Flume with Spark (
http://spark.apache.org/docs/latest/streaming-flume-integration.html
).
I know it is available on the internet and I don't think we should
increase redundancy this way, but I'd really like to have such
examples in the docs.
What do you all think of this?

Misc: any help with the tickets I've mentioned in my last e-mail would
be super welcome. (Even just triaging them and maybe deciding if they
are not that important as I first thought.)


Thank you,

Donat

On Wed, Oct 5, 2016 at 2:03 AM, Hari Shreedharan
 wrote:
> +1 for the release, and the branching and RC dates
>
> On Tue, Oct 4, 2016 at 10:54 AM Saikat Kanjilal  wrote:
>
>> Hi Donat,
>>
>> I can pickup any documentation related issues, are there anymore at this
>> point outside of the FLUME-2971, I would be willing to pickup one of the
>> ones below but cant guarantee that it'll be fixed in the timeframe for 1.7
>> , let me know the best path for helping.
>>
>> Thanks
>>
>>
>> 
>> From: Balazs Donat Bessenyei 
>> Sent: Tuesday, October 4, 2016 8:01 AM
>> To: dev@flume.apache.org
>> Subject: Re: [DISCUSS] Flume 1.7 release plan
>>
>> As there have been no objections, I am going to proceed with the plan
>> I have outlined in my original mail.
>>
>> I will try to work a little more on some flaky tests to help with the
>> release process. (Such as FLUME-3002: Some tests in TestBucketWriter
>> are flaky.)
>>
>> It would be awesome if someone could fix FLUME-2971 (Document Kerberos
>> set-up for Kafka Sink and Kafka Source).
>>
>> Also, I have found a few more tickets that could use a little care:
>> FLUME-2689 (reloading conf file leads syslogTcpSource not receives any
>> event) - probably only needs some more reviews,
>> FLUME-2716 (File Channel cannot handle capacity Integer.MAX_VALUE) -
>> missing a test,
>> FLUME-2461 (memoryChannel bytesRemaining counting error) - missing test
>> Patch-less (Needs some investigation and a fix. I am not sure about
>> their complexity):
>> FLUME-2912 (thrift Sources/Sinks can only authenticate with kerberos
>> principal in format with hostname)
>> FLUME-2871 (avro sink reset-connection-interval cause
>> EventDeliveryException)
>> FLUME-2811 (Taildir source doesn't call stop() on graceful shutdown)
>>
>> However, none of these seem to be a release blocker, so if they are
>> not done by Oct 7-10, it would still be fine.
>>
>>
>> Please, let me know your thoughts
>>
>> Donat
>>
>> On Mon, Oct 3, 2016 at 3:23 PM, Mike Percy  wrote:
>> > On Fri, Sep 30, 2016 at 7:01 PM, Saikat Kanjilal 
>> > wrote:
>> >
>> >> I'd be willing to help, I've been heads down on other stuff and have had
>> >> to postpone the graph sink implementation (https://issues.apache.org/
>> >> jira/browse/FLUME-2035) but am looking to resume work and targeting an
>> >> initial implementation by mid November sometime.  In the meantime let me
>> >> know how I can get more deeply involved in the next release.
>> >>
>> >
>> > I'd recommend taking a look at things like documentation and filling in
>> > gaps so that we don't release with undocumented stuff. Other types of
>> > polish seem like a helpful thing to do right before a release as well.
>> >
>> > Mike
>>


[jira] [Commented] (FLUME-2716) File Channel cannot handle capacity Integer.MAX_VALUE

2016-10-06 Thread Attila Simon (JIRA)

[ 
https://issues.apache.org/jira/browse/FLUME-2716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15551480#comment-15551480
 ] 

Attila Simon commented on FLUME-2716:
-

My previous comment might have been confusing so let me clarify:

Following the source code in EventQueueBackingStoreFile#86-103 : 
{noformat}
long totalBytes = (capacity + HEADER_SIZE) * Serialization.SIZE_OF_LONG; //the 
formula I wrote above
...
allocate(checkpointFile, totalBytes);  //allocate the disk space, essentially 
create a file with size of totalBytes, no upper limit
...
mappedBuffer = checkpointFileHandle.getChannel().map(MapMode.READ_WRITE, 0,
checkpointFile.length());  // checkpointFile.length() == totalBytes and 
map function has an Integer.MAX_VALUE limit on the size of mapped content
{noformat}

>From that limitation we have a constraint: 
Integer.MAX_VALUE >= checkpointFile.length() 
Integer.MAX_VALUE >= totalBytes
Integer.MAX_VALUE >= (capacity + HEADER_SIZE) * Serialization.SIZE_OF_LONG
floor(Integer.MAX_VALUE/Serialization.SIZE_OF_LONG) - HEADER_SIZE >= capacity
268434426 >= capacity

I recommend to add an extra(additional to the long cast) check to 
EventQueueBackingStoreFile before the allocation which checks the totalsize < 
integer.max_value and if fails then print out the formula similarly to the 
message constructed on lines EventQueueBackingStoreFile#96-100 as well as 
throwing an exception to prevent channel startup. It has to be before the 
allocation as otherwise flume would allocate a system resource(file) which 
couldn't be used afterwards thus resulting a resource leak. 

To relax this limitation the way how memory mapping of the checkpoint file was 
requested from java.nio.channels.FileChannel has to be reconsidered, eg mapping 
only different parts of same checkpoint file by multiple map calls or split the 
checkpoints to multiple files are alternative solutions to this problem.

> File Channel cannot handle capacity Integer.MAX_VALUE
> -
>
> Key: FLUME-2716
> URL: https://issues.apache.org/jira/browse/FLUME-2716
> Project: Flume
>  Issue Type: Bug
>  Components: Channel, File Channel
>Affects Versions: v1.6.0, v1.7.0
>Reporter: Dong Zhao
>  Labels: unit-test-missing
> Fix For: v1.7.0
>
> Attachments: FLUME-2716.patch
>
>
> if capacity is set to Integer.MAX_VALUE(2147483647), checkpoint file size is 
> calculated wrongly to 8224. The calculation should first cast int to long, 
> then calculate the totalBytes. See the patch for details. Thanks.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Review Request 52598: FLUME-2999 - Kafka channel and sink should enable statically assigned partition per event via header

2016-10-06 Thread Tristan Stevens

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/52598/
---

Review request for Flume.


Repository: flume-git


Description
---

This feature is useful for anyone who needs greater control of which partitions 
are being written to - normally in a situation where multiple Flume agents are 
being deployed in order to horizontally scale, or alternatively if there is a 
scenario where there is a skew in data that might lead to one or more 
partitions hotspotting.
We also have the ability to specify custom partitions on to the Kafka Producer 
itself using the kafka.* configuration properties.

The Kafka Producer provides the ability to set the partition ID using the 
following constructor 
(https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html#ProducerRecord%28java.lang.String,%20java.lang.Integer,%20K,%20V%29
 ), this is just a matter of providing the option to use this constructor.

This is specified in one of two ways: either via the staticPartition 
configuration property, which means that every message goes to the specified 
partition, or via the partitionHeader configuration property, which directs the 
implementation to retrieve the partitionId from one of the event headers.


Diffs
-

  
flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
 66b553a 
  
flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannelConfiguration.java
 3ab807b 
  
flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java
 57c0b28 
  flume-ng-doc/sphinx/FlumeUserGuide.rst ab71d38 
  
flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
 89bdd84 
  
flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkConstants.java
 1bf380c 
  
flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java
 76eca37 
  
flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/KafkaConsumer.java
 d5dfbd6 

Diff: https://reviews.apache.org/r/52598/diff/


Testing
---

Unit testing done for both Kafka Channel and Kafka Sink.


Thanks,

Tristan Stevens



[jira] [Commented] (FLUME-2994) flume-taildir-source: support for windows

2016-10-06 Thread JIRA

[ 
https://issues.apache.org/jira/browse/FLUME-2994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15551288#comment-15551288
 ] 

Bessenyei Balázs Donát commented on FLUME-2994:
---

[~jkushmaul]: from the code snippet you have posted it looks like there could 
be collisions. (I might be wrong, though)

It seems to me that keys in the Map we have (tailFiles) should be unique for 
the source to work correctly.

I think we should find a solution that is portable and has very low (possibly 
~zero) chances of malfunctioning.

> flume-taildir-source: support for windows
> -
>
> Key: FLUME-2994
> URL: https://issues.apache.org/jira/browse/FLUME-2994
> Project: Flume
>  Issue Type: Improvement
>  Components: Sinks+Sources, Windows
>Affects Versions: v1.7.0
>Reporter: Jason Kushmaul
>Assignee: Jason Kushmaul
>Priority: Trivial
> Fix For: v1.7.0
>
> Attachments: FLUME-2994-2.patch, taildir-mac.conf, taildir-win8.1.conf
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> The current implementation of flume-taildir-source does not support windows.
> The only reason for this from what I can see is a simple call to 
> Files.getAttribute(file.toPath(), "unix:ino");
> I've tested an equivalent for windows (which of course does not work on 
> non-windows).  With an OS switch we should be able to identify a file 
> independent of file name on either system.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLUME-3003) testSourceCounter in TestSyslogUdpSource is flaky

2016-10-06 Thread JIRA

[ 
https://issues.apache.org/jira/browse/FLUME-3003?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15551252#comment-15551252
 ] 

Bessenyei Balázs Donát commented on FLUME-3003:
---

[~denes]: thank you for the patch!

The change looks good to me. (It would be better if we didn't have to do the 
sleep(), but I found no easy way to do that.)

I'll leave some time for other people to review it. If nobody has any 
improvement ideas, I'll commit this tomorrow.

> testSourceCounter in TestSyslogUdpSource is flaky
> -
>
> Key: FLUME-3003
> URL: https://issues.apache.org/jira/browse/FLUME-3003
> Project: Flume
>  Issue Type: Test
>  Components: Sinks+Sources
>Affects Versions: v1.6.0
>Reporter: Denes Arvay
>Assignee: Denes Arvay
> Fix For: v1.7.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)