[jira] [Commented] (KAFKA-5155) Messages can be deleted prematurely when some producers use timestamps and some not

2017-05-27 Thread Michal Borowiecki (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5155?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16027521#comment-16027521
 ] 

Michal Borowiecki commented on KAFKA-5155:
--

Hi [~plavjanik], do you care to submit a pull request with the test and the fix?

> Messages can be deleted prematurely when some producers use timestamps and 
> some not
> ---
>
> Key: KAFKA-5155
> URL: https://issues.apache.org/jira/browse/KAFKA-5155
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.10.2.0
>Reporter: Petr Plavjaník
>
> Some messages can be deleted prematurely and never read in following 
> scenario. A producer uses timestamps and produces messages that are appended 
> to the beginning of a log segment. Other producer produces messages without a 
> timestamp. In that case the largest timestamp is made by the old messages 
> with a timestamp and new messages with the timestamp does not influence and 
> the log segment with old and new messages can be delete immediately after the 
> last new message with no timestamp is appended. When all appended messages 
> have no timestamp, then they are not deleted because {{lastModified}} 
> attribute of a {{LogSegment}} is used.
> New test case to {{kafka.log.LogTest}} that fails:
> {code}
>   @Test
>   def 
> shouldNotDeleteTimeBasedSegmentsWhenTimestampIsNotProvidedForSomeMessages() {
> val retentionMs = 1000
> val old = TestUtils.singletonRecords("test".getBytes, timestamp = 0)
> val set = TestUtils.singletonRecords("test".getBytes, timestamp = -1, 
> magicValue = 0)
> val log = createLog(set.sizeInBytes, retentionMs = retentionMs)
> // append some messages to create some segments
> log.append(old)
> for (_ <- 0 until 12)
>   log.append(set)
> assertEquals("No segment should be deleted", 0, log.deleteOldSegments())
>   }
> {code}
> It can be prevented by using {{def largestTimestamp = 
> Math.max(maxTimestampSoFar, lastModified)}} in LogSegment, or by using 
> current timestamp when messages with timestamp {{-1}} are appended.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-5155) Messages can be deleted prematurely when some producers use timestamps and some not

2017-05-03 Thread JIRA

[ 
https://issues.apache.org/jira/browse/KAFKA-5155?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15994831#comment-15994831
 ] 

Petr Plavjaník commented on KAFKA-5155:
---

Hi [~huxi_2b] and [~mihbor],

this defect is about the potential data loss that has occurred in our test 
scenario and not about the ordering of messages based on their timestamps. We 
were using a non-Java producer that used version 0 of the message format 
(without timestamp) but the first message in each partition was written by 
KafkaProducer in Java that used version 1 message format with timestamp. Some 
messages were lost (written to the log but deleted before they were read) when 
the first retention was done. The fix for it should not change how timestamps 
are used elsewhere just make sure that time-based retention.
It is listed in section _Potential breaking changes in 0.10.1.0_: {quote}The 
log retention time is no longer based on last modified time of the log 
segments. Instead it will be based on the largest timestamp of the messages in 
a log segment.{quote} 

But it can be surprising that old producers create messages with no timestamps 
and that that these are not taken into consideration when the segment is 
deleted. When I first read it I thought that timestamp of messages is the log 
append timestamp. The circumstances when the data loss has occurred are quite 
rare (a segment where the message at the beginning of the log segment has a 
timestamp and the rest do not) but data loss is not good in any case.

I am not sure what is the right way to fix it. One way is just to change the 
{{deleteRetenionMsBreachedSegments()}} to account for {{lastModified}} 
timestamp as before.

{code}
  private def deleteRetenionMsBreachedSegments() : Int = {
if (config.retentionMs < 0) return 0
val startMs = time.milliseconds
deleteOldSegments(startMs - Math.max(_.largestTimestamp, _.lastModified) > 
config.retentionMs)
  }
{code}

The other way is to use current time if {{appendInfo.maxTimestamp}} is {{-1}} 
in {{Log.append()}}. This also affects log segment rolling. But 
{{LogSegment.maxTimestampSoFar}} can be changed by {{LogSegment.truncateTo()}} 
to lower values and then {{Log.deleteRetenionMsBreachedSegments()}} that used 
{{LogSegment.largestTimestamp}} (uses {{LogSegment.maxTimestampSoFar}}) will 
not take messages with no timestamp into account.

> Messages can be deleted prematurely when some producers use timestamps and 
> some not
> ---
>
> Key: KAFKA-5155
> URL: https://issues.apache.org/jira/browse/KAFKA-5155
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.10.2.0
>Reporter: Petr Plavjaník
>
> Some messages can be deleted prematurely and never read in following 
> scenario. A producer uses timestamps and produces messages that are appended 
> to the beginning of a log segment. Other producer produces messages without a 
> timestamp. In that case the largest timestamp is made by the old messages 
> with a timestamp and new messages with the timestamp does not influence and 
> the log segment with old and new messages can be delete immediately after the 
> last new message with no timestamp is appended. When all appended messages 
> have no timestamp, then they are not deleted because {{lastModified}} 
> attribute of a {{LogSegment}} is used.
> New test case to {{kafka.log.LogTest}} that fails:
> {code}
>   @Test
>   def 
> shouldNotDeleteTimeBasedSegmentsWhenTimestampIsNotProvidedForSomeMessages() {
> val retentionMs = 1000
> val old = TestUtils.singletonRecords("test".getBytes, timestamp = 0)
> val set = TestUtils.singletonRecords("test".getBytes, timestamp = -1, 
> magicValue = 0)
> val log = createLog(set.sizeInBytes, retentionMs = retentionMs)
> // append some messages to create some segments
> log.append(old)
> for (_ <- 0 until 12)
>   log.append(set)
> assertEquals("No segment should be deleted", 0, log.deleteOldSegments())
>   }
> {code}
> It can be prevented by using {{def largestTimestamp = 
> Math.max(maxTimestampSoFar, lastModified)}} in LogSegment, or by using 
> current timestamp when messages with timestamp {{-1}} are appended.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-5155) Messages can be deleted prematurely when some producers use timestamps and some not

2017-05-02 Thread huxi (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5155?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15994177#comment-15994177
 ] 

huxi commented on KAFKA-5155:
-

This is very similar with a jira 
issue([kafka-4398|https://issues.apache.org/jira/browse/KAFKA-4398]) reported 
by me complaining of the fact that Kafka cannot broker side cannot honor the 
order of timestamp.   
Sounds like you cannot mix up the new timestamps and old timestamps based on 
the current design.

> Messages can be deleted prematurely when some producers use timestamps and 
> some not
> ---
>
> Key: KAFKA-5155
> URL: https://issues.apache.org/jira/browse/KAFKA-5155
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.10.2.0
>Reporter: Petr Plavjaník
>
> Some messages can be deleted prematurely and never read in following 
> scenario. A producer uses timestamps and produces messages that are appended 
> to the beginning of a log segment. Other producer produces messages without a 
> timestamp. In that case the largest timestamp is made by the old messages 
> with a timestamp and new messages with the timestamp does not influence and 
> the log segment with old and new messages can be delete immediately after the 
> last new message with no timestamp is appended. When all appended messages 
> have no timestamp, then they are not deleted because {{lastModified}} 
> attribute of a {{LogSegment}} is used.
> New test case to {{kafka.log.LogTest}} that fails:
> {code}
>   @Test
>   def 
> shouldNotDeleteTimeBasedSegmentsWhenTimestampIsNotProvidedForSomeMessages() {
> val retentionMs = 1000
> val old = TestUtils.singletonRecords("test".getBytes, timestamp = 0)
> val set = TestUtils.singletonRecords("test".getBytes, timestamp = -1, 
> magicValue = 0)
> val log = createLog(set.sizeInBytes, retentionMs = retentionMs)
> // append some messages to create some segments
> log.append(old)
> for (_ <- 0 until 12)
>   log.append(set)
> assertEquals("No segment should be deleted", 0, log.deleteOldSegments())
>   }
> {code}
> It can be prevented by using {{def largestTimestamp = 
> Math.max(maxTimestampSoFar, lastModified)}} in LogSegment, or by using 
> current timestamp when messages with timestamp {{-1}} are appended.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)