[
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16833161#comment-16833161
]
Andrew commented on KAFKA-8315:
-------------------------------
I have run a join by removing the timestamp transform and the aggregation and I
still get the same behaviour. i.e. the following topology :
{Code}
private static Topology joinStreamStream(final JoinerProperties props) {
final JoinedRecordFactory joinedRecordFactory =
JoinedRecordFactory.create(props.leftTopic().getSchema(),
props.rightTopic().getSchema());
final FieldMapper leftFieldMapper =
FieldMapper.create(props.leftTopic().getFields());
final FieldMapper rightFieldMapper =
FieldMapper.create(props.rightTopic().getFields());
final JoinValueMapper joinValueMapper =
JoinValueMapper.create(joinedRecordFactory, leftFieldMapper, rightFieldMapper,
props.joinSchema());
// extractors
final TimestampExtractor leftTsExtractor =
AvroTimestampExtractor.create(props.leftTopic().getTimestampField());
final TimestampExtractor rightTsExtractor =
AvroTimestampExtractor.create(props.rightTopic().getTimestampField());
final StreamsBuilder builder = new StreamsBuilder();
final Consumed<Object, GenericRecord> leftConsumed =
Consumed.with(leftTsExtractor);
final KStream<Object, GenericRecord> leftStream =
AvroMinMaxTimestampTransformer.wrap(
builder.stream(props.leftTopic().getName(), leftConsumed),
props.minStreamTimestamp(), props.maxStreamTimestamp());
final Consumed<Object, GenericRecord> rightConsumed =
Consumed.with(rightTsExtractor);
final KStream<Object, GenericRecord> rightStream =
AvroMinMaxTimestampTransformer.wrap(
builder.stream(props.rightTopic().getName(), rightConsumed),
props.minStreamTimestamp(), props.maxStreamTimestamp());
// setup the join
final ValueJoiner<GenericRecord, GenericRecord, GenericRecord> joiner =
AvroFieldsValueJoiner.create(joinedRecordFactory);
final JoinWindows joinWindow =
JoinWindows.of(Duration.ZERO).after(props.joinWindowAfterSize()).before(props.joinWindowBeforeSize()).grace(props.joinWindowGrace());
final KStreamKStreamJoinFunction join = props.joinType() ==
JoinerProperties.JoinType.INNER ? leftStream::join : leftStream::leftJoin;
final KStream<Object, GenericRecord> joinStream =
join.execute(rightStream, joiner, joinWindow);
// write the change-log stream to the topic
joinStream
.mapValues(joinValueMapper::apply)
.to(props.joinTopic());
return builder.build();
}
{Code}
> Cannot pass Materialized into a join operation - hence cant set retention
> period independent of grace
> -----------------------------------------------------------------------------------------------------
>
> Key: KAFKA-8315
> URL: https://issues.apache.org/jira/browse/KAFKA-8315
> Project: Kafka
> Issue Type: Bug
> Reporter: Andrew
> Assignee: John Roesler
> Priority: Major
> Attachments: code.java
>
>
> The documentation says to use `Materialized` not `JoinWindows.until()`
> ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]),
> but there is no where to pass a `Materialized` instance to the join
> operation, only to the group operation is supported it seems.
>
> Slack conversation here :
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300]
> [Additional]
> From what I understand, the retention period should be independent of the
> grace period, so I think this is more than a documentation fix (see comments
> below)
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)