This is an automated email from the ASF dual-hosted git repository. nizhikov pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite-extensions.git
The following commit(s) were added to refs/heads/master by this push: new 266a2b3 IGNITE-16871 Metadata update initiated by marker records in event topic (#141) 266a2b3 is described below commit 266a2b3bd1a9025a29cab9904fe4bb19f90bcd19 Author: Nikolay <nizhi...@apache.org> AuthorDate: Tue May 17 09:55:05 2022 +0300 IGNITE-16871 Metadata update initiated by marker records in event topic (#141) --- .../org/apache/ignite/cdc/CdcEventsApplier.java | 75 ++----------------- .../ignite/cdc/kafka/IgniteToKafkaCdcStreamer.java | 84 ++++++++++++++-------- .../ignite/cdc/kafka/KafkaToIgniteCdcStreamer.java | 10 +-- .../cdc/kafka/KafkaToIgniteCdcStreamerApplier.java | 38 +++++++--- .../KafkaToIgniteCdcStreamerConfiguration.java | 22 ------ .../cdc/kafka/KafkaToIgniteMetadataUpdater.java | 34 +-------- 6 files changed, 94 insertions(+), 169 deletions(-) diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/CdcEventsApplier.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/CdcEventsApplier.java index 6160ead..01c35b4 100644 --- a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/CdcEventsApplier.java +++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/CdcEventsApplier.java @@ -24,10 +24,7 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.binary.BinaryObject; -import org.apache.ignite.binary.BinaryObjectException; import org.apache.ignite.cache.CacheEntryVersion; -import org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration; -import org.apache.ignite.cdc.kafka.KafkaToIgniteMetadataUpdater; import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.binary.BinaryMetadata; import org.apache.ignite.internal.binary.BinaryTypeImpl; @@ -39,9 +36,7 @@ import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl; import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl; import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; -import org.apache.ignite.internal.util.lang.IgniteInClosureX; import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.CU; /** @@ -66,34 +61,6 @@ public abstract class CdcEventsApplier { /** */ private final BooleanSupplier hasRemoves = () -> !F.isEmpty(rmvBatch); - /** - * Update closure. - * @see #applyWithRetry(IgniteInClosureX, IgniteInternalCache) - */ - private final IgniteInClosureX<IgniteInternalCache<BinaryObject, BinaryObject>> updClo = - new IgniteInClosureX<IgniteInternalCache<BinaryObject, BinaryObject>>() { - /** {@inheritDoc} */ - @Override public void applyx( - IgniteInternalCache<BinaryObject, BinaryObject> cache - ) throws IgniteCheckedException { - cache.putAllConflict(updBatch); - } - }; - - /** - * Remove closure. - * @see #applyWithRetry(IgniteInClosureX, IgniteInternalCache) - */ - private final IgniteInClosureX<IgniteInternalCache<BinaryObject, BinaryObject>> rmvClo = - new IgniteInClosureX<IgniteInternalCache<BinaryObject, BinaryObject>>() { - /** {@inheritDoc} */ - @Override public void applyx( - IgniteInternalCache<BinaryObject, BinaryObject> cache - ) throws IgniteCheckedException { - cache.removeAllConflict(rmvBatch); - } - }; - /** * @param maxBatchSize Maximum batch size. */ @@ -181,19 +148,20 @@ public abstract class CdcEventsApplier { * @param applyUpd Apply update batch flag supplier. * @param applyRmv Apply remove batch flag supplier. * @return Number of applied events. + * @throws IgniteCheckedException In case of error. */ private int applyIf( IgniteInternalCache<BinaryObject, BinaryObject> cache, BooleanSupplier applyUpd, BooleanSupplier applyRmv - ) { + ) throws IgniteCheckedException { int evtsApplied = 0; if (applyUpd.getAsBoolean()) { if (log().isDebugEnabled()) log().debug("Applying put batch [cache=" + cache.name() + ']'); - applyWithRetry(updClo, cache); + cache.putAllConflict(updBatch); evtsApplied += updBatch.size(); @@ -204,7 +172,7 @@ public abstract class CdcEventsApplier { if (log().isDebugEnabled()) log().debug("Applying remove batch [cache=" + cache.name() + ']'); - applyWithRetry(rmvClo, cache); + cache.removeAllConflict(rmvBatch); evtsApplied += rmvBatch.size(); @@ -214,36 +182,6 @@ public abstract class CdcEventsApplier { return evtsApplied; } - /** - * Executes closure with retry logic. - * Metadata update thread polls metadata asynchronously with {@link KafkaToIgniteCdcStreamerConfiguration#getMetaUpdateInterval()} - * interval. This means metadata updates can be seen later than data updates. In this case {@link BinaryObjectException} can point - * to absence of metadata. To overcome lack of metadata invoke {@link #updateMetadata()} and retry closure. - * - * @param clo Closure to apply. - * @param cache Cache for closure. - * @see KafkaToIgniteMetadataUpdater - * @see KafkaToIgniteCdcStreamerConfiguration#getMetaUpdateInterval() - */ - private void applyWithRetry( - IgniteInClosureX<IgniteInternalCache<BinaryObject, BinaryObject>> clo, - IgniteInternalCache<BinaryObject, BinaryObject> cache - ) { - try { - clo.apply(cache); - } - catch (Exception e) { - // Retry only if cause is BinaryObjectException. - if (!X.hasCause(e, BinaryObjectException.class)) - throw e; - - // Retry after metadata update. - updateMetadata(); - - clo.apply(cache); - } - } - /** @return {@code True} if update batch should be applied. */ private boolean isApplyBatch(Map<KeyCacheObject, ?> map, KeyCacheObject key) { return map.size() >= maxBatchSize || map.containsKey(key); @@ -296,11 +234,6 @@ public abstract class CdcEventsApplier { } } - /** Update metadata if possible. */ - protected void updateMetadata() { - // No-op. - } - /** @return Ignite instance. */ protected abstract IgniteEx ignite(); diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/IgniteToKafkaCdcStreamer.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/IgniteToKafkaCdcStreamer.java index 9c1b9c5..d821e2e 100644 --- a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/IgniteToKafkaCdcStreamer.java +++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/IgniteToKafkaCdcStreamer.java @@ -29,6 +29,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.function.Function; import java.util.stream.Collectors; +import java.util.stream.IntStream; import org.apache.ignite.IgniteLogger; import org.apache.ignite.binary.BinaryType; import org.apache.ignite.cdc.CdcConsumer; @@ -43,6 +44,7 @@ import org.apache.ignite.internal.util.IgniteUtils; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.A; import org.apache.ignite.internal.util.typedef.internal.CU; +import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteExperimental; import org.apache.ignite.resources.LoggerResource; import org.apache.kafka.clients.producer.KafkaProducer; @@ -91,6 +93,9 @@ public class IgniteToKafkaCdcStreamer implements CdcConsumer { /** Bytes sent metric description. */ public static final String BYTES_SENT_DESCRIPTION = "Count of bytes sent."; + /** Metadata updater marker. */ + public static final byte[] META_UPDATE_MARKER = U.longToBytes(0xC0FF1E); + /** Log. */ @LoggerResource private IgniteLogger log; @@ -140,6 +145,9 @@ public class IgniteToKafkaCdcStreamer implements CdcConsumer { /** Count of sent mappings. */ protected AtomicLongMetric mappingsCnt; + /** Count of metadata updates. */ + protected byte metaUpdCnt = 0; + /** */ private List<Future<RecordMetadata>> futs; @@ -176,46 +184,66 @@ public class IgniteToKafkaCdcStreamer implements CdcConsumer { return true; }); - while (filtered.hasNext()) { - sendLimited( - filtered, - evt -> new ProducerRecord<>( - evtTopic, - evt.partition() % kafkaParts, - evt.cacheId(), - IgniteUtils.toBytes(evt) - ), - evtsCnt - ); - } + sendAll( + filtered, + evt -> new ProducerRecord<>( + evtTopic, + evt.partition() % kafkaParts, + evt.cacheId(), + IgniteUtils.toBytes(evt) + ), + evtsCnt + ); return true; } /** {@inheritDoc} */ @Override public void onTypes(Iterator<BinaryType> types) { - while (types.hasNext()) { - sendLimited( - types, - t -> new ProducerRecord<>(metadataTopic, IgniteUtils.toBytes(((BinaryTypeImpl)t).metadata())), - typesCnt - ); - } + sendAll( + types, + t -> new ProducerRecord<>(metadataTopic, IgniteUtils.toBytes(((BinaryTypeImpl)t).metadata())), + typesCnt + ); + + sendMetaUpdatedMarkers(); } /** {@inheritDoc} */ @Override public void onMappings(Iterator<TypeMapping> mappings) { - while (mappings.hasNext()) { - sendLimited( - mappings, - m -> new ProducerRecord<>(metadataTopic, IgniteUtils.toBytes(m)), - mappingsCnt - ); - } + sendAll( + mappings, + m -> new ProducerRecord<>(metadataTopic, IgniteUtils.toBytes(m)), + mappingsCnt + ); + + sendMetaUpdatedMarkers(); + } + + /** Send marker(meta need to be updated) record to each partition of events topic. */ + private void sendMetaUpdatedMarkers() { + sendAll( + IntStream.range(0, kafkaParts).iterator(), + p -> new ProducerRecord<>(evtTopic, p, null, META_UPDATE_MARKER), + evtsCnt + ); + + if (log.isDebugEnabled()) + log.debug("Meta update markers sent."); + } + + /** Send all data to Kafka. */ + private <T> void sendAll( + Iterator<T> data, + Function<T, ProducerRecord<Integer, byte[]>> toRec, + AtomicLongMetric cntr + ) { + while (data.hasNext()) + sendOneBatch(data, toRec, cntr); } - /** Send limited amount of data to Kafka. */ - private <T> void sendLimited( + /** Send one batch. */ + private <T> void sendOneBatch( Iterator<T> data, Function<T, ProducerRecord<Integer, byte[]>> toRec, AtomicLongMetric cntr diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamer.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamer.java index 63f1df0..b0a5e4b 100644 --- a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamer.java +++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamer.java @@ -133,9 +133,8 @@ public class KafkaToIgniteCdcStreamer implements Runnable { this.kafkaProps = kafkaProps; this.streamerCfg = streamerCfg; - // Extra thread for metadata updater. - appliers = new ArrayList<>(streamerCfg.getThreadCount() + 1); - runners = new ArrayList<>(streamerCfg.getThreadCount() + 1); + appliers = new ArrayList<>(streamerCfg.getThreadCount()); + runners = new ArrayList<>(streamerCfg.getThreadCount()); if (!kafkaProps.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) throw new IllegalArgumentException("Kafka properties don't contains " + ConsumerConfig.GROUP_ID_CONFIG); @@ -179,12 +178,9 @@ public class KafkaToIgniteCdcStreamer implements Runnable { ign, log, kafkaProps, - streamerCfg, - stopped + streamerCfg ); - addAndStart("meta-update-thread", metaUpdr); - int kafkaPartsFrom = streamerCfg.getKafkaPartsFrom(); int kafkaParts = streamerCfg.getKafkaPartsTo() - kafkaPartsFrom; int threadCnt = streamerCfg.getThreadCount(); diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerApplier.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerApplier.java index 5de8cc3..acb2aeb 100644 --- a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerApplier.java +++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerApplier.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.io.ObjectInputStream; import java.time.Duration; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.Iterator; import java.util.List; @@ -49,6 +50,8 @@ import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.WakeupException; +import static org.apache.ignite.cdc.kafka.IgniteToKafkaCdcStreamer.META_UPDATE_MARKER; + /** * Thread that polls message from the Kafka topic partitions and applies those messages to the Ignite caches. * It expected that messages was written to the Kafka by the {@link IgniteToKafkaCdcStreamer} Change Data Capture consumer. @@ -212,17 +215,37 @@ class KafkaToIgniteCdcStreamerApplier extends CdcEventsApplier implements Runnab private void poll(KafkaConsumer<Integer, byte[]> cnsmr) throws IgniteCheckedException { ConsumerRecords<Integer, byte[]> recs = cnsmr.poll(Duration.ofMillis(kafkaReqTimeout)); - if (log.isDebugEnabled()) { - log.debug( - "Polled from consumer [assignments=" + cnsmr.assignment() + ",rcvdEvts=" + rcvdEvts.addAndGet(recs.count()) + ']' + if (log.isInfoEnabled()) { + log.info( + "Polled from consumer [assignments=" + cnsmr.assignment() + + ", cnt=" + recs.count() + + ", rcvdEvts=" + rcvdEvts.addAndGet(recs.count()) + ']' ); } - apply(F.iterator(recs, this::deserialize, true, rec -> F.isEmpty(caches) || caches.contains(rec.key()))); + apply(F.iterator(recs, this::deserialize, true, this::filterAndPossiblyUpdateMetadata)); cnsmr.commitSync(Duration.ofMillis(kafkaReqTimeout)); } + /** + * Filter out {@link CdcEvent} records. + * Updates metadata in case update metadata marker found. + * @param rec Record to filter. + * @return {@code True} if record should be pushed down. + */ + private boolean filterAndPossiblyUpdateMetadata(ConsumerRecord<Integer, byte[]> rec) { + byte[] val = rec.value(); + + if (rec.key() == null && Arrays.equals(val, META_UPDATE_MARKER)) { + metaUpdr.updateMetadata(); + + return false; + } + + return F.isEmpty(caches) || caches.contains(rec.key()); + } + /** * @param rec Kafka record. * @return CDC event. @@ -236,15 +259,12 @@ class KafkaToIgniteCdcStreamerApplier extends CdcEventsApplier implements Runnab } } - /** {@inheritDoc} */ - @Override protected void updateMetadata() { - metaUpdr.updateMetadata(); - } - /** {@inheritDoc} */ @Override public void close() { log.warning("Close applier!"); + metaUpdr.close(); + cnsmrs.forEach(KafkaConsumer::wakeup); } diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerConfiguration.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerConfiguration.java index 7811127..24675d4 100644 --- a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerConfiguration.java +++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerConfiguration.java @@ -33,9 +33,6 @@ public class KafkaToIgniteCdcStreamerConfiguration { /** Default maximum time to complete Kafka related requests, in milliseconds. */ public static final long DFLT_KAFKA_REQ_TIMEOUT = 3_000L; - /** Default maximum time to complete Kafka related requests, in milliseconds. */ - public static final long DFLT_META_UPD_INTERVAL = 3_000L; - /** Default {@link #threadCnt} value. */ public static final int DFLT_THREAD_CNT = 16; @@ -60,9 +57,6 @@ public class KafkaToIgniteCdcStreamerConfiguration { /** The maximum time to complete Kafka related requests, in milliseconds. */ private long kafkaReqTimeout = DFLT_KAFKA_REQ_TIMEOUT; - /** Amount of time between two polling of {@link #metadataTopic}, in milliseconds. */ - private long metaUpdInterval = DFLT_META_UPD_INTERVAL; - /** Metadata consumer group. */ private String metadataCnsmrGrp; @@ -193,22 +187,6 @@ public class KafkaToIgniteCdcStreamerConfiguration { this.metadataTopic = metadataTopic; } - /** - * @return Amount of time between two polling of {@link #metadataTopic}. - */ - public long getMetaUpdateInterval() { - return metaUpdInterval; - } - - /** - * Sets amount of time between two polling of {@link #metadataTopic}. - * - * @param metaUpdateInterval Amount of time between two polling of {@link #metadataTopic}. - */ - public void setMetaUpdateInterval(long metaUpdateInterval) { - this.metaUpdInterval = metaUpdateInterval; - } - /** * @return Consumer group to read metadata topic. */ diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteMetadataUpdater.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteMetadataUpdater.java index 852c991..c14097b 100644 --- a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteMetadataUpdater.java +++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteMetadataUpdater.java @@ -20,7 +20,6 @@ package org.apache.ignite.cdc.kafka; import java.time.Duration; import java.util.Collections; import java.util.Properties; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import org.apache.ignite.IgniteLogger; import org.apache.ignite.cdc.CdcEventsApplier; @@ -29,7 +28,6 @@ import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.binary.BinaryMetadata; import org.apache.ignite.internal.util.IgniteUtils; import org.apache.ignite.internal.util.typedef.internal.S; -import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; @@ -41,22 +39,16 @@ import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_ import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; /** */ -public class KafkaToIgniteMetadataUpdater implements AutoCloseable, Runnable { +public class KafkaToIgniteMetadataUpdater implements AutoCloseable { /** Ignite instance. */ private final IgniteEx ign; /** Log. */ private final IgniteLogger log; - /** Closed flag. Shared between all appliers. */ - private final AtomicBoolean stopped; - /** The maximum time to complete Kafka related requests, in milliseconds. */ private final long kafkaReqTimeout; - /** The maximum time to complete Kafka related requests, in milliseconds. */ - private final long metaUpdInterval; - /** */ private final KafkaConsumer<Void, byte[]> cnsmr; @@ -68,19 +60,15 @@ public class KafkaToIgniteMetadataUpdater implements AutoCloseable, Runnable { * @param log Logger. * @param initProps Kafka properties. * @param streamerCfg Streamer configuration. - * @param stopped Stopped flag. */ public KafkaToIgniteMetadataUpdater( IgniteEx ign, IgniteLogger log, Properties initProps, - KafkaToIgniteCdcStreamerConfiguration streamerCfg, - AtomicBoolean stopped + KafkaToIgniteCdcStreamerConfiguration streamerCfg ) { this.ign = ign; this.kafkaReqTimeout = streamerCfg.getKafkaRequestTimeout(); - this.metaUpdInterval = streamerCfg.getMetaUpdateInterval(); - this.stopped = stopped; this.log = log.getLogger(KafkaToIgniteMetadataUpdater.class); Properties kafkaProps = new Properties(); @@ -97,22 +85,6 @@ public class KafkaToIgniteMetadataUpdater implements AutoCloseable, Runnable { cnsmr.subscribe(Collections.singletonList(streamerCfg.getMetadataTopic())); } - /** {@inheritDoc} */ - @Override public void run() { - U.setCurrentIgniteName(ign.name()); - - while (!stopped.get()) { - updateMetadata(); - - try { - Thread.sleep(metaUpdInterval); - } - catch (InterruptedException e) { - // Ignore. - } - } - } - /** Polls all available records from metadata topic and applies it to Ignite. */ public synchronized void updateMetadata() { while (true) { @@ -141,8 +113,6 @@ public class KafkaToIgniteMetadataUpdater implements AutoCloseable, Runnable { /** {@inheritDoc} */ @Override public void close() { - log.warning("Close applier!"); - cnsmr.wakeup(); }