Re: [DISCUSS] KIP-415: Incremental Cooperative Rebalancing in Kafka Connect

2019-02-06 Thread Boyang Chen
Thanks Konstantine for the great summary! +1 for having a separate KIP 
discussing the trade-offs for using a new serialization format for the protocol 
encoding. We probably could discuss a wider options and benchmark on the 
performance before reaching a final decision.

Best,
Boyang

From: Konstantine Karantasis 
Sent: Tuesday, February 5, 2019 4:23 AM
To: dev@kafka.apache.org
Subject: Re: [DISCUSS] KIP-415: Incremental Cooperative Rebalancing in Kafka 
Connect

Hi all,

Thank you for your comments so far.
Now that KIP freeze and feature freeze are behind us for version 2.2, I'd
like to bring this thread back at the top of the email stack, with the
following suggestion:

I'll be changing KIP-415's description to include a serialization format
that extends the current scheme and is based on Kafka structs.

The initial suggestion to transition to using an alternative serialization
format (e.g. flatbuffers) was made just in case we saw this would have a
good potential and we could arrive in a quick consensus on this matter. I
believe the arguments for such a transition make sense, but the pros are
probably not enough to outweigh the introduction of a dependency at this
point and justify changes in every client that will potentially use
incremental cooperative rebalancing in the future. The changes in the
rebalancing protocol have not been very frequent so far.

Admittedly, even more important is the fact that the discussion around the
serialization format of the new protocol is only tangentially related to
the core of KIP-415. Thus, in order to keep the discussion focused on the
essential changes required by KIP-415, which are expected to have
significant impact in addressing the stop-the-world effect, I'd like to
punt any optimizations to the serialization format and change the KIP to
describe a schema that depends on Kafka structs as the current (V0) version
does.

I hope this will allow us to make progress easier and bring the changes of
this new rebalancing protocol to Kafka clients, beginning with Kafka
Connect, in a more applicable and less disruptive way.

I'll change the schema descriptions by end of day.

Looking forward to your next comments!

Konstantine

On Mon, Jan 28, 2019 at 5:22 PM Konstantine Karantasis <
konstant...@confluent.io> wrote:

>
> Hi Ismael,
> thanks for bringing up serialization in the discussion!
>
> Indeed, JSON was considered given it's the prevalent text-based
> serialization option.
>
> In comparison to flatbuffers, most generic pros and cons are valid in this
> context too. Higher perfomance during serde, small size, optional fields,
> strongly typed and others.
>
> Specifically, for Connect's use case, flatbuffers serialization, although
> it introduces a single dependency, it appears more appealing for the
> following reasons:
>
> * The protocol is evolving from a binary format again to a binary one.
> * Although new fields, nested or not, are expected to be introduced (as in
> KIP-415) or old fields may get deprecated, the protocol schemas are
> expected to be simple, mostly flat and manageable. We won't need to process
> arbitrarily nested structures during runtime, for which JSON would be a
> better fit. The current proposal aims to make the current append only
> format a bit more flexible.
> * It's good to keep performance tight because the loop that includes
> subprotocol serde will need to accomodate resource release and assignment.
> Also, rebalancing in it's incremental cooperative form which is expected to
> be lighter has the potential to start happening more frequently. Parsing
> JSON with Jackson has been a hotspot in certain occasions in the past if I
> remember correctly.
> * Evolution will be facilitated by handling or ignoring optional fields
> easily. The protocol may evolve with fewer hard version bumps like the one
> proposed here from V0 to V1.
> * Optional fields are omitted, not just compressed.
> * Unpacking of fields does not require deserialization of the whole
> message, making transition between versions or flavors of the protocol easy
> and performant.
> * Flatbuffers' specification is simple and can be implemented, even in the
> absence of appropriate clients.
>
> I hope the above highlight why flatbuffers is a good candidate for this
> use case and, thus, worth adding as a dependency.
> Strictly speaking, yes, they introduce a new compile-time dependency. But
> during runtime, such a dependency seems equivalent to introducing a JSON
> parser (such as Jackson that is already being used in AK).
>
> Your question is very valid. It's probably worth adding an item under
> rejected alternatives, once we agree how we want to move forward.
>
> Best,
> Konstantine
>
>
>
> On Fri, Jan 25, 2019 at 11:13 PM Ismael Juma  wrote:
>
>> Thanks for the KIP Konstantine. Quick question: introducing a new
>> serialization format (ie flatbuffers) has major implications. Have we
>> considered json? If so, why did we reject it?
>>

[DISCUSS] KIP-296 Add connector level configurability for producer/consumer client configs

2019-02-06 Thread Cyrus Vafadari
Allen,

I think this is a good fix -- it is often intuitive for users to define
consumer and producer configs on a Connector level as opposed to a worker
level, since each connector-task gets its own producer/consumer. I think we
should re-start the discussion.

I think this KIP is actually repeated in KIP-407, which leads me to believe
there's a desire to see this happen.

Cyrus


[jira] [Created] (KAFKA-7905) KTable Reduce Allow Null value to be pass in Subtractor

2019-02-06 Thread HARSHIT AGARWAL (JIRA)
HARSHIT AGARWAL created KAFKA-7905:
--

 Summary: KTable Reduce Allow Null value to be pass in Subtractor
 Key: KAFKA-7905
 URL: https://issues.apache.org/jira/browse/KAFKA-7905
 Project: Kafka
  Issue Type: Bug
  Components: streams
Reporter: HARSHIT AGARWAL
 Attachments: KTableReduceExceptionTrace.txt

KTable Reduce Allow Null value to be pass in Subtractor.

.reduce((aggValue, newValue) -> addCounts(aggValue, newValue),
 (aggValue, oldValue) -> subtractCounts(*{color:#FF}aggValue{color}*, 
oldValue))

*{color:#FF}aggValue{color}* passed by *{color:#FF}{color}* 
org.apache.kafka.streams.kstream.internals.KTableReduce.java is 
{color:#FF}*null*{color}.

 

Please find attached Exception Trace.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Build failed in Jenkins: kafka-trunk-jdk11 #266

2019-02-06 Thread Apache Jenkins Server
See 


Changes:

[github] KAFKA-7828: Add ExternalCommandWorker to Trogdor (#6219)

--
[...truncated 2.30 MB...]

org.apache.kafka.connect.runtime.WorkerSinkTaskThreadedTest > testCommitFailure 
STARTED

org.apache.kafka.connect.runtime.WorkerSinkTaskThreadedTest > testCommitFailure 
PASSED

org.apache.kafka.connect.runtime.WorkerSinkTaskThreadedTest > 
testCommitSuccessFollowedByFailure STARTED

org.apache.kafka.connect.runtime.WorkerSinkTaskThreadedTest > 
testCommitSuccessFollowedByFailure PASSED

org.apache.kafka.connect.runtime.WorkerSinkTaskThreadedTest > 
testCommitConsumerFailure STARTED

org.apache.kafka.connect.runtime.WorkerSinkTaskThreadedTest > 
testCommitConsumerFailure PASSED

org.apache.kafka.connect.runtime.WorkerSinkTaskThreadedTest > testCommitTimeout 
STARTED

org.apache.kafka.connect.runtime.WorkerSinkTaskThreadedTest > testCommitTimeout 
PASSED

org.apache.kafka.connect.runtime.WorkerSinkTaskThreadedTest > 
testAssignmentPauseResume STARTED

org.apache.kafka.connect.runtime.WorkerSinkTaskThreadedTest > 
testAssignmentPauseResume PASSED

org.apache.kafka.connect.runtime.WorkerSinkTaskThreadedTest > testRewind STARTED

org.apache.kafka.connect.runtime.WorkerSinkTaskThreadedTest > testRewind PASSED

org.apache.kafka.connect.runtime.WorkerSinkTaskThreadedTest > 
testRewindOnRebalanceDuringPoll STARTED

org.apache.kafka.connect.runtime.WorkerSinkTaskThreadedTest > 
testRewindOnRebalanceDuringPoll PASSED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testCreateConnectorFailedBasicValidation STARTED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testCreateConnectorFailedBasicValidation PASSED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testCreateConnectorFailedCustomValidation STARTED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testCreateConnectorFailedCustomValidation PASSED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testCreateConnectorAlreadyExists STARTED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testCreateConnectorAlreadyExists PASSED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testDestroyConnector STARTED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testDestroyConnector PASSED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testRestartConnector STARTED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testRestartConnector PASSED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testRestartTask STARTED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testRestartTask PASSED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testAccessors STARTED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testAccessors PASSED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testInconsistentConfigs STARTED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testInconsistentConfigs PASSED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testPutConnectorConfig STARTED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testPutConnectorConfig PASSED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testCreateConnector STARTED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testCreateConnector PASSED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testJoinAssignment STARTED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testJoinAssignment PASSED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testRebalance STARTED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testRebalance PASSED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testRebalanceFailedConnector STARTED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testRebalanceFailedConnector PASSED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testHaltCleansUpWorker STARTED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testHaltCleansUpWorker PASSED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testConnectorNameConflictsWithWorkerGroupId STARTED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testConnectorNameConflictsWithWorkerGroupId PASSED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testRestartUnknownConnector STARTED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testRestartUnknownConnector PASSED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 

Build failed in Jenkins: kafka-trunk-jdk8 #3368

2019-02-06 Thread Apache Jenkins Server
See 


Changes:

[github] KAFKA-7828: Add ExternalCommandWorker to Trogdor (#6219)

--
[...truncated 2.29 MB...]
org.apache.kafka.connect.converters.LongConverterTest > 
testSerializingIncorrectType PASSED

org.apache.kafka.connect.converters.LongConverterTest > 
testDeserializingHeaderWithTooManyBytes STARTED

org.apache.kafka.connect.converters.LongConverterTest > 
testDeserializingHeaderWithTooManyBytes PASSED

org.apache.kafka.connect.converters.LongConverterTest > testNullToBytes STARTED

org.apache.kafka.connect.converters.LongConverterTest > testNullToBytes PASSED

org.apache.kafka.connect.converters.LongConverterTest > 
testSerializingIncorrectHeader STARTED

org.apache.kafka.connect.converters.LongConverterTest > 
testSerializingIncorrectHeader PASSED

org.apache.kafka.connect.converters.LongConverterTest > 
testDeserializingDataWithTooManyBytes STARTED

org.apache.kafka.connect.converters.LongConverterTest > 
testDeserializingDataWithTooManyBytes PASSED

org.apache.kafka.connect.converters.LongConverterTest > 
testConvertingSamplesToAndFromBytes STARTED

org.apache.kafka.connect.converters.LongConverterTest > 
testConvertingSamplesToAndFromBytes PASSED

org.apache.kafka.connect.converters.ShortConverterTest > testBytesNullToNumber 
STARTED

org.apache.kafka.connect.converters.ShortConverterTest > testBytesNullToNumber 
PASSED

org.apache.kafka.connect.converters.ShortConverterTest > 
testSerializingIncorrectType STARTED

org.apache.kafka.connect.converters.ShortConverterTest > 
testSerializingIncorrectType PASSED

org.apache.kafka.connect.converters.ShortConverterTest > 
testDeserializingHeaderWithTooManyBytes STARTED

org.apache.kafka.connect.converters.ShortConverterTest > 
testDeserializingHeaderWithTooManyBytes PASSED

org.apache.kafka.connect.converters.ShortConverterTest > testNullToBytes STARTED

org.apache.kafka.connect.converters.ShortConverterTest > testNullToBytes PASSED

org.apache.kafka.connect.converters.ShortConverterTest > 
testSerializingIncorrectHeader STARTED

org.apache.kafka.connect.converters.ShortConverterTest > 
testSerializingIncorrectHeader PASSED

org.apache.kafka.connect.converters.ShortConverterTest > 
testDeserializingDataWithTooManyBytes STARTED

org.apache.kafka.connect.converters.ShortConverterTest > 
testDeserializingDataWithTooManyBytes PASSED

org.apache.kafka.connect.converters.ShortConverterTest > 
testConvertingSamplesToAndFromBytes STARTED

org.apache.kafka.connect.converters.ShortConverterTest > 
testConvertingSamplesToAndFromBytes PASSED

384 tests completed, 1 failed

> Task :connect:runtime:test FAILED

> Task :streams:streams-scala:test

org.apache.kafka.streams.scala.StreamToTableJoinScalaIntegrationTestImplicitSerdes
 > testShouldCountClicksPerRegionWithNamedRepartitionTopic STARTED

org.apache.kafka.streams.scala.StreamToTableJoinScalaIntegrationTestImplicitSerdes
 > testShouldCountClicksPerRegionWithNamedRepartitionTopic PASSED

org.apache.kafka.streams.scala.StreamToTableJoinScalaIntegrationTestImplicitSerdes
 > testShouldCountClicksPerRegionJava STARTED

org.apache.kafka.streams.scala.StreamToTableJoinScalaIntegrationTestImplicitSerdes
 > testShouldCountClicksPerRegionJava PASSED

org.apache.kafka.streams.scala.StreamToTableJoinScalaIntegrationTestImplicitSerdes
 > testShouldCountClicksPerRegion STARTED

org.apache.kafka.streams.scala.StreamToTableJoinScalaIntegrationTestImplicitSerdes
 > testShouldCountClicksPerRegion PASSED

org.apache.kafka.streams.scala.kstream.ConsumedTest > Create a Consumed should 
create a Consumed with Serdes STARTED

org.apache.kafka.streams.scala.kstream.ConsumedTest > Create a Consumed should 
create a Consumed with Serdes PASSED

org.apache.kafka.streams.scala.kstream.ConsumedTest > Create a Consumed with 
timestampExtractor and resetPolicy should create a Consumed with Serdes, 
timestampExtractor and resetPolicy STARTED

org.apache.kafka.streams.scala.kstream.ConsumedTest > Create a Consumed with 
timestampExtractor and resetPolicy should create a Consumed with Serdes, 
timestampExtractor and resetPolicy PASSED

org.apache.kafka.streams.scala.kstream.ConsumedTest > Create a Consumed with 
timestampExtractor should create a Consumed with Serdes and timestampExtractor 
STARTED

org.apache.kafka.streams.scala.kstream.ConsumedTest > Create a Consumed with 
timestampExtractor should create a Consumed with Serdes and timestampExtractor 
PASSED

org.apache.kafka.streams.scala.kstream.ConsumedTest > Create a Consumed with 
resetPolicy should create a Consumed with Serdes and resetPolicy STARTED

org.apache.kafka.streams.scala.kstream.ConsumedTest > Create a Consumed with 
resetPolicy should create a Consumed with Serdes and resetPolicy PASSED

org.apache.kafka.streams.scala.kstream.ProducedTest > Create a Produced should 
create a Produced with Serdes STARTED


[jira] [Created] (KAFKA-7904) Add AtMinIsr topic partition category

2019-02-06 Thread Kevin Lu (JIRA)
Kevin Lu created KAFKA-7904:
---

 Summary: Add AtMinIsr topic partition category
 Key: KAFKA-7904
 URL: https://issues.apache.org/jira/browse/KAFKA-7904
 Project: Kafka
  Issue Type: New Feature
Reporter: Kevin Lu
Assignee: Kevin Lu


https://cwiki.apache.org/confluence/display/KAFKA/KIP-427%3A+Add+AtMinIsr+topic+partition+category



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7903) Replace OffsetCommit request/response with automated protocol

2019-02-06 Thread Boyang Chen (JIRA)
Boyang Chen created KAFKA-7903:
--

 Summary: Replace OffsetCommit request/response with automated 
protocol
 Key: KAFKA-7903
 URL: https://issues.apache.org/jira/browse/KAFKA-7903
 Project: Kafka
  Issue Type: Sub-task
Reporter: Boyang Chen
Assignee: Boyang Chen






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [DISCUSS] KIP-405: Kafka Tiered Storage

2019-02-06 Thread Harsha
Thanks Eno, Adam & Satish for you review and questions. I'll address these in 
KIP and update the thread here. 

Thanks,
Harsha

On Wed, Feb 6, 2019, at 7:09 AM, Satish Duggana wrote:
> Thanks, Harsha for the KIP. It is a good start for tiered storage in
> Kafka. I have a few comments/questions.
> 
> It may be good to have a configuration to keep the number of local
> segments instead of keeping only the active segment. This config can
> be exposed at cluster and topic levels with default value as 1. In
> some use cases, few consumers may lag over one segment, it will be
> better to serve from local storage instead of remote storage.
> 
> It may be better to keep “remote.log.storage.enable” and respective
> configuration at topic level along with cluster level. It will be
> helpful in environments where few topics are configured with
> local-storage and other topics are configured with remote storage.
> 
> Each topic-partition leader pushes its log segments with respective
> index files to remote whenever active log rolls over, it updates the
> remote log index file for the respective remote log segment. The
> second option is to add offset index files also for each segment. It
> can serve consumer fetch requests for old segments from local log
> segment instead of serving directly from the remote log which may
> cause high latencies. There can be different strategies in when the
> remote segment is copied to a local segment.
> 
> What is “remote.log.manager.scheduler.interval.ms” config about?
> 
> How do followers sync RemoteLogSegmentIndex files? Do they request
> from leader replica? This looks to be important as the failed over
> leader should have RemoteLogSegmentIndex updated and ready to avoid
> high latencies in serving old data stored in remote logs.
> 
> Thanks,
> Satish.
> 
> On Tue, Feb 5, 2019 at 10:42 PM Ryanne Dolan  wrote:
> >
> > Thanks Harsha, makes sense.
> >
> > Ryanne
> >
> > On Mon, Feb 4, 2019 at 5:53 PM Harsha Chintalapani  wrote:
> >
> > > "I think you are saying that this enables additional (potentially cheaper)
> > > storage options without *requiring* an existing ETL pipeline. “
> > > Yes.
> > >
> > > " But it's not really a replacement for the sort of pipelines people build
> > > with Connect, Gobblin etc.”
> > >
> > > It is not. But also making an assumption that everyone runs these
> > > pipelines for storing raw Kafka data into HDFS or S3 is also wrong
> > >  assumption.
> > > The aim of this KIP is to provide tiered storage as whole package not
> > > asking users to ship the data on their own using existing ETL, which means
> > > running a consumer and maintaining those pipelines.
> > >
> > > " My point was that, if you are already offloading records in an ETL
> > > pipeline, why do you need a new pipeline built into the broker to ship the
> > > same data to the same place?”
> > >
> > > As you said its ETL pipeline, which means users of these pipelines are
> > > reading the data from broker and transforming its state and storing it
> > > somewhere.
> > > The point of this KIP is store log segments as it is without changing
> > > their structure so that we can use the existing offset mechanisms to look
> > > it up when the consumer needs to read old data. When you do load it via
> > > your existing pipelines you are reading the topic as a whole , which
> > > doesn’t guarantee that you’ll produce this data back into HDFS in S3 in 
> > > the
> > > same order and who is going to generate the Index files again.
> > >
> > >
> > > "So you'd end up with one of 1)cold segments are only useful to Kafka; 2)
> > > you have the same data written to HDFS/etc twice, once for Kafka and once
> > > for everything else, in two separate formats”
> > >
> > > You are talking two different use cases. If someone is storing raw data
> > > out of Kafka for long term access.
> > > By storing the data as it is in HDFS though Kafka will solve this issue.
> > > They do not need to run another pipe-line to ship these logs.
> > >
> > > If they are running pipelines to store in HDFS in a different format,
> > > thats a different use case. May be they are transforming Kafka logs to ORC
> > > so that they can query through Hive.  Once you transform the log segment 
> > > it
> > > does loose its ability to use the existing offset index.
> > > Main objective here not to change the existing protocol and still be able
> > > to write and read logs from remote storage.
> > >
> > >
> > > -Harsha
> > >
> > > On Feb 4, 2019, 2:53 PM -0800, Ryanne Dolan ,
> > > wrote:
> > > > Thanks Harsha, makes sense for the most part.
> > > >
> > > > > tiered storage is to get away from this and make this transparent to
> > > the
> > > > user
> > > >
> > > > I think you are saying that this enables additional (potentially 
> > > > cheaper)
> > > > storage options without *requiring* an existing ETL pipeline. But it's
> > > not
> > > > really a replacement for the sort of pipelines people build with 
> > > > Connect,
> > > > 

Re: [Discuss] Question on KIP-298: Error Handling in Kafka Connect

2019-02-06 Thread Randall Hauch
Hi, Pere.

The primary reason that KIP-298 did not support a DLQ for source connectors
was because we couldn't get around serialization problems. With source
connectors, the converter (serializer) is the last element in the chain,
and if there is a problem serializing a record then we could not work out
how to serialize the unserializeable record so it could be written to the
DLQ? Another problem was the inability to write into Kafka, at which point
it's probably likely we cannot write to the DLQ topic, either.

Randall

On Tue, Jan 1, 2019 at 2:18 PM Pere Urbón Bayes 
wrote:

> Hi,
>  a quick question on the KIP-298 Dead letter queue, as I read from the KIP
> is only available for the Sink connectors.
>
> While I know the challenges of defining a dead-letter queue for the
> incoming messages, I wanted ask/discuss what is the sense in here for this,
> do you completely discard the option?
>
> I sort of see it useful for messages that where pulled from the source, but
> somehow could not be ingested in Kafka, might be because of serialisation
> for example.
>
> What do you think?
>
> --
> Pere Urbon-Bayes
> Software Architect
> http://www.purbon.com
> https://twitter.com/purbon
> https://www.linkedin.com/in/purbon/
>


Re: Statestore restoration & scaling questions - possible KIP as well.

2019-02-06 Thread Adam Bellemare
Bump - hoping someone has some insight. Alternately, redirection to a more
suitable forum.

Thanks

On Sun, Feb 3, 2019 at 10:25 AM Adam Bellemare 
wrote:

> Hey Folks
>
> I have a few questions around the operations of stateful processing while
> scaling nodes up/down, and a possible KIP in question #4. Most of them have
> to do with task processing during rebuilding of state stores after scaling
> nodes up.
>
> Scenario:
> Single node/thread, processing 2 topics (10 partitions each):
> User event topic (events) - ie: key:userId, value: ProductId
> Product topic (entity) - ie: key: ProductId, value: productData
>
> My topology looks like this:
>
> KTable productTable = ... //materialize from product topic
>
> KStream output = userStream
> .map(x => (x.value, x.key) ) //Swap the key and value around
> .join(productTable, ... ) //Joiner is not relevant here
> .to(...)  //Send it to some output topic
>
>
> Here are my questions:
> 1) If I scale the processing node count up, partitions will be rebalanced
> to the new node. Does processing continue as normal on the original node,
> while the new node's processing is paused as the internal state stores are
> rebuilt/reloaded? From my reading of the code (and own experience) I
> believe this to be the case, but I am just curious in case I missed
> something.
>
> 2) What happens to the userStream map task? Will the new node be able to
> process this task while the state store is rebuilding/reloading? My reading
> of the code suggests that this map process will be paused on the new node
> while the state store is rebuilt. The effect of this is that it will lead
> to a delay in events reaching the original node's partitions, which will be
> seen as late-arriving events. Am I right in this assessment?
>
> 3) How does scaling up work with standby state-store replicas? From my
> reading of the code, it appears that scaling a node up will result in a
> reabalance, with the state assigned to the new node being rebuilt first
> (leading to a pause in processing). Following this, the standy replicas are
> populated. Am I correct in this reading?
>
> 4) If my reading in #3 is correct, would it be possible to pre-populate
> the standby stores on scale-up before initiating active-task transfer? This
> would allow seamless scale-up and scale-down without requiring any pauses
> for rebuilding state. I am interested in kicking this off as a KIP if so,
> but would appreciate any JIRAs or related KIPs to read up on prior to
> digging into this.
>
>
> Thanks
>
> Adam Bellemare
>


[jira] [Created] (KAFKA-7902) SASL/OAUTHBEARER can become unable to connect: javax.security.sasl.SaslException: Unable to find OAuth Bearer token in Subject's private credentials (size=2)

2019-02-06 Thread Ron Dagostino (JIRA)
Ron Dagostino created KAFKA-7902:


 Summary: SASL/OAUTHBEARER can become unable to connect: 
javax.security.sasl.SaslException: Unable to find OAuth Bearer token in 
Subject's private credentials (size=2) 
 Key: KAFKA-7902
 URL: https://issues.apache.org/jira/browse/KAFKA-7902
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 2.1.0, 2.0.1, 2.0.0
Reporter: Ron Dagostino
Assignee: Ron Dagostino


It is possible for a Java SASL/OAUTHBEARER client (either a non-broker 
producer/consumer client or a broker when acting as an inter-broker client) to 
end up in a state where it cannot connect to a new broker (or, if 
re-authentication as implemented by KIP-368 and merged for v2.2.0 were to be 
deployed and enabled, to be unable to re-authenticate). The error message looks 
like this:

{{Connection to node 1 failed authentication due to: An error: 
(java.security.PrivilegedActionException: javax.security.sasl.SaslException: 
Unable to find OAuth Bearer token in Subject's private credentials (size=2) 
[Caused by java.io.IOException: Unable to find OAuth Bearer token in Subject's 
private credentials (size=2)]) occurred when evaluating SASL token received 
from the Kafka Broker. Kafka Client will go to AUTHENTICATION_FAILED state.}}

The root cause of the problem begins at this point in the code:

[https://github.com/apache/kafka/blob/2.0/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/expiring/ExpiringCredentialRefreshingLogin.java#L378]:

The {{loginContext}} field doesn't get replaced with the old version stored 
away in the {{optionalLoginContextToLogout}} variable if/when the 
{{loginContext.login()}} call on line 381 throws an exception. *This is an 
unusual event* – the OAuth authorization server must be unavailable at the 
moment when the token refresh occurs – but when it does happen it puts the 
refresher thread instance in an invalid state because now its {{loginContext}} 
field represents the one that failed instead of the original one, which is now 
lost.  The current {{loginContext}} can't be logged out – it will throw an 
{{InvalidStateException}} if that is attempted because there is no token 
associated with it -- and the token associated with the login context that was 
lost can never be logged out and removed from the Subject's private credentials 
(because we don't retain a reference to it).  The net effect is that we end up 
with an extra token on the Subject's private credentials, which eventually 
results in the exception mentioned above when the client tries to authenticate 
to a broker.

So the chain of events is:

1) login failure upon token refresh causes the refresher thread's login context 
field to be incorrect, and the existing token on the Subject's private 
credentials will never be logged out/removed
 2) retry occurs in 10 seconds, potentially repeatedly until the authorization 
server is back online
 3) login succeeds, adding a second token to the Subject's private credentials 
(logout is then called on the login context set incorrectly in the most recent 
failure -- e.g. in step 1 -- which results in an exception, but this is not the 
real issue -- it is the 2 tokens on the Subject's private credentials that is 
the issue)
 4) At this point we now have 2 tokens on the Subject, and then at some point 
in the future the client tries to make a new connection, it sees the 2 tokens 
and throws an exception – BOOM! The client is now unable to connect (or 
re-authenticate if applicable) going forward.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [DISCUSS] KIP-405: Kafka Tiered Storage

2019-02-06 Thread Satish Duggana
Thanks, Harsha for the KIP. It is a good start for tiered storage in
Kafka. I have a few comments/questions.

It may be good to have a configuration to keep the number of local
segments instead of keeping only the active segment. This config can
be exposed at cluster and topic levels with default value as 1. In
some use cases, few consumers may lag over one segment, it will be
better to serve from local storage instead of remote storage.

It may be better to keep “remote.log.storage.enable” and respective
configuration at topic level along with cluster level. It will be
helpful in environments where few topics are configured with
local-storage and other topics are configured with remote storage.

Each topic-partition leader pushes its log segments with respective
index files to remote whenever active log rolls over, it updates the
remote log index file for the respective remote log segment. The
second option is to add offset index files also for each segment. It
can serve consumer fetch requests for old segments from local log
segment instead of serving directly from the remote log which may
cause high latencies. There can be different strategies in when the
remote segment is copied to a local segment.

What is “remote.log.manager.scheduler.interval.ms” config about?

How do followers sync RemoteLogSegmentIndex files? Do they request
from leader replica? This looks to be important as the failed over
leader should have RemoteLogSegmentIndex updated and ready to avoid
high latencies in serving old data stored in remote logs.

Thanks,
Satish.

On Tue, Feb 5, 2019 at 10:42 PM Ryanne Dolan  wrote:
>
> Thanks Harsha, makes sense.
>
> Ryanne
>
> On Mon, Feb 4, 2019 at 5:53 PM Harsha Chintalapani  wrote:
>
> > "I think you are saying that this enables additional (potentially cheaper)
> > storage options without *requiring* an existing ETL pipeline. “
> > Yes.
> >
> > " But it's not really a replacement for the sort of pipelines people build
> > with Connect, Gobblin etc.”
> >
> > It is not. But also making an assumption that everyone runs these
> > pipelines for storing raw Kafka data into HDFS or S3 is also wrong
> >  assumption.
> > The aim of this KIP is to provide tiered storage as whole package not
> > asking users to ship the data on their own using existing ETL, which means
> > running a consumer and maintaining those pipelines.
> >
> > " My point was that, if you are already offloading records in an ETL
> > pipeline, why do you need a new pipeline built into the broker to ship the
> > same data to the same place?”
> >
> > As you said its ETL pipeline, which means users of these pipelines are
> > reading the data from broker and transforming its state and storing it
> > somewhere.
> > The point of this KIP is store log segments as it is without changing
> > their structure so that we can use the existing offset mechanisms to look
> > it up when the consumer needs to read old data. When you do load it via
> > your existing pipelines you are reading the topic as a whole , which
> > doesn’t guarantee that you’ll produce this data back into HDFS in S3 in the
> > same order and who is going to generate the Index files again.
> >
> >
> > "So you'd end up with one of 1)cold segments are only useful to Kafka; 2)
> > you have the same data written to HDFS/etc twice, once for Kafka and once
> > for everything else, in two separate formats”
> >
> > You are talking two different use cases. If someone is storing raw data
> > out of Kafka for long term access.
> > By storing the data as it is in HDFS though Kafka will solve this issue.
> > They do not need to run another pipe-line to ship these logs.
> >
> > If they are running pipelines to store in HDFS in a different format,
> > thats a different use case. May be they are transforming Kafka logs to ORC
> > so that they can query through Hive.  Once you transform the log segment it
> > does loose its ability to use the existing offset index.
> > Main objective here not to change the existing protocol and still be able
> > to write and read logs from remote storage.
> >
> >
> > -Harsha
> >
> > On Feb 4, 2019, 2:53 PM -0800, Ryanne Dolan ,
> > wrote:
> > > Thanks Harsha, makes sense for the most part.
> > >
> > > > tiered storage is to get away from this and make this transparent to
> > the
> > > user
> > >
> > > I think you are saying that this enables additional (potentially cheaper)
> > > storage options without *requiring* an existing ETL pipeline. But it's
> > not
> > > really a replacement for the sort of pipelines people build with Connect,
> > > Gobblin etc. My point was that, if you are already offloading records in
> > an
> > > ETL pipeline, why do you need a new pipeline built into the broker to
> > ship
> > > the same data to the same place? I think in most cases this will be an
> > > additional pipeline, not a replacement, because the segments written to
> > > cold storage won't be useful outside Kafka. So you'd end up with one of
> > 1)
> > > cold segments 

[jira] [Created] (KAFKA-7901) Message Expiring at Producer with "Unable to send" when Partitions are under replicated

2019-02-06 Thread Zubair Abdul Khaliq (JIRA)
Zubair Abdul Khaliq created KAFKA-7901:
--

 Summary: Message Expiring at Producer with "Unable to send" when 
Partitions are under replicated
 Key: KAFKA-7901
 URL: https://issues.apache.org/jira/browse/KAFKA-7901
 Project: Kafka
  Issue Type: Bug
  Components: replication
Affects Versions: 0.11.0.1, 0.11.0.0
Reporter: Zubair Abdul Khaliq
 Attachments: Screen Shot 2019-02-06 at 3.49.29 PM.png, Screen Shot 
2019-02-06 at 3.49.41 PM.png

We are experiencing data loss , with Message Expiring at Producer with Message 
as 

"Expiring 5 record(s) for *: 60002 ms has passed since batch creation 
plus linger time"

"Expiring 4 record(s) for *: 60084 ms has passed since last append"

 

It was observed that when we are getting failed records , the partition were 
under replicated as seen in attached screenshots. We had increased timeout from 
default 30 sec to 60 sec. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)