anton-vinogradov commented on a change in pull request #8909:
URL: https://github.com/apache/ignite/pull/8909#discussion_r637777201



##########
File path: 
modules/core/src/main/java/org/apache/ignite/cdc/ChangeDataCaptureConfiguration.java
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cdc;
+
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.internal.cdc.ChangeDataCapture;
+import org.apache.ignite.lang.IgniteExperimental;
+
+/**
+ * This class defines {@link ChangeDataCapture} runtime configuration.
+ * This configuration is passed to {@link ChangeDataCapture} constructor.
+ * It defines all configuration
+ */
+@IgniteExperimental
+public class ChangeDataCaptureConfiguration {
+    /** */
+    private static final int DFLT_LOCK_TIMEOUT = 1000;
+
+    /** */
+    private static final long DFLT_CHECK_FREQ = 1000L;
+
+    /** */
+    private static final boolean DFLT_KEEP_BINARY = true;

Review comment:
       do we really need this flag. 
   seems, the only false case is useful?
   
   we need a test to confirm this flag is useful when it true.

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/cdc/WALRecordsConsumer.java
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cdc;
+
+import java.util.EnumSet;
+import java.util.Iterator;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cdc.ChangeDataCaptureConsumer;
+import org.apache.ignite.cdc.ChangeDataCaptureEvent;
+import org.apache.ignite.internal.pagemem.wal.record.DataEntry;
+import org.apache.ignite.internal.pagemem.wal.record.DataRecord;
+import org.apache.ignite.internal.pagemem.wal.record.UnwrappedDataEntry;
+import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
+import org.apache.ignite.internal.processors.cache.GridCacheOperation;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgnitePredicate;
+
+import static 
org.apache.ignite.internal.processors.cache.GridCacheOperation.CREATE;
+import static 
org.apache.ignite.internal.processors.cache.GridCacheOperation.DELETE;
+import static 
org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
+import static 
org.apache.ignite.internal.processors.cache.GridCacheOperation.UPDATE;
+
+/**
+ * Transform {@link DataEntry} to {@link ChangeDataCaptureEvent} and sends it 
to {@link ChangeDataCaptureConsumer}.
+ *
+ * @see ChangeDataCapture
+ * @see ChangeDataCaptureConsumer
+ */
+public class WALRecordsConsumer<K, V> {
+    /** Ignite logger. */
+    private IgniteLogger log;
+
+    /** Data change events consumer. */
+    private final ChangeDataCaptureConsumer cdcConsumer;
+
+    /** Operations types we interested in. */
+    private static final EnumSet<GridCacheOperation> OPS_TYPES = 
EnumSet.of(CREATE, UPDATE, DELETE, TRANSFORM);
+
+    /** Operations filter. */
+    private static final IgnitePredicate<? super DataEntry> OPS_FILTER = e -> {
+        if (!(e instanceof UnwrappedDataEntry))
+            throw new IllegalStateException("Unexpected data entry type[" + 
e.getClass().getName());
+
+        if ((e.flags() & DataEntry.PRELOAD_FLAG) != 0 ||
+            (e.flags() & DataEntry.FROM_STORE_FLAG) != 0)
+            return false;
+
+        return OPS_TYPES.contains(e.op());
+    };
+
+    /**
+     * @param cdcConsumer User provided CDC consumer.
+     */
+    public WALRecordsConsumer(ChangeDataCaptureConsumer cdcConsumer) {
+        this.cdcConsumer = cdcConsumer;
+    }
+
+    /**
+     * Handles record from the WAL.
+     * If this method return {@code true} then current offset in WAL will be 
stored and WAL iteration will be
+     * started from it on CDC application fail/restart.
+     *
+     * @param recs WAL records iterator.
+     * @param <T> Record type.
+     * @return {@code True} if current offset in WAL should be commited.
+     */
+    public <T extends WALRecord> boolean onRecords(Iterator<T> recs) {
+        recs = F.iterator(recs, r -> r, true, r -> r.type() == 
WALRecord.RecordType.DATA_RECORD_V2);
+
+        return cdcConsumer.onChange(F.concat(F.iterator(recs, r -> 
F.iterator(((DataRecord)r).writeEntries(), e -> {
+            UnwrappedDataEntry ue = (UnwrappedDataEntry)e;
+
+            GridCacheVersion ver = e.writeVersion();
+
+            ChangeEventOrderImpl ord =

Review comment:
       just use GridCacheVersion, it's already Comparable.

##########
File path: 
modules/core/src/main/java/org/apache/ignite/cdc/ChangeDataCaptureLoader.java
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cdc;
+
+import java.net.URL;
+import java.util.Collection;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.cdc.ChangeDataCapture;
+import 
org.apache.ignite.internal.processors.resource.GridSpringResourceContext;
+import org.apache.ignite.internal.util.spring.IgniteSpringHelper;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiTuple;
+
+import static org.apache.ignite.internal.IgniteComponentType.SPRING;
+
+/**
+ * Utility class to load {@link ChangeDataCapture} from Spring XML 
configuration.
+ */
+public class ChangeDataCaptureLoader {
+    /**
+     * Loads {@link ChangeDataCapture} from XML configuration file and 
possible error message.
+     * If load fails then error message wouldn't be null.
+     *
+     * @param springXmlPath Path to XML configuration file.
+     * @return Tuple of {@code ChangeDataCapture} and error message.
+     * @throws IgniteCheckedException If failed.
+     */
+    public static ChangeDataCapture loadChangeDataCapture(
+        String springXmlPath
+    ) throws IgniteCheckedException {

Review comment:
       any reason for newlines?

##########
File path: 
modules/core/src/main/java/org/apache/ignite/cdc/ChangeDataCaptureEvent.java
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cdc;
+
+import java.io.Serializable;
+import org.apache.ignite.internal.cdc.ChangeDataCapture;
+import org.apache.ignite.lang.IgniteExperimental;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Event of single entry change.
+ * Instance presents new value of modified entry.
+ *
+ * @see ChangeDataCapture
+ * @see ChangeDataCaptureConsumer
+ */
+@IgniteExperimental
+public interface ChangeDataCaptureEvent extends Serializable {
+    /**
+     * @return Key for the changed entry.
+     */
+    public Object key();
+
+    /**
+     * @return Value for the changed entry or {@code null} in case of entry 
removal.
+     */
+    @Nullable public Object value();
+
+    /**
+     * @return {@code True} if event fired on primary node for partition 
containing this entry.
+     * @see <a href="
+     * 
https://ignite.apache.org/docs/latest/configuring-caches/configuring-backups#configuring-partition-backups";>
+     * Configuring partition backups.</a>
+     */
+    public boolean primary();
+
+    /**
+     * @return Partition number.
+     */
+    public int partition();

Review comment:
       has no usages and useless since this is just info about Ignite internals.

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
##########
@@ -2566,7 +2566,7 @@ private void initializeDefaultSpi(IgniteConfiguration 
cfg) {
          * @throws IgniteCheckedException If failed.
          */
         @SuppressWarnings("ErrorNotRethrown")
-        private IgniteLogger initLogger(@Nullable IgniteLogger cfgLog, 
@Nullable String app, UUID nodeId, String workDir)
+        public static IgniteLogger initLogger(@Nullable IgniteLogger cfgLog, 
@Nullable String app, UUID nodeId, String workDir)

Review comment:
       too long line

##########
File path: 
modules/core/src/main/java/org/apache/ignite/cdc/ChangeDataCaptureConsumer.java
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cdc;
+
+import java.util.Iterator;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.cdc.ChangeDataCapture;
+import org.apache.ignite.lang.IgniteExperimental;
+import org.apache.ignite.resources.LoggerResource;
+
+/**
+ * Consumer of WAL data change events.
+ * This consumer will receive event of data changes during {@link 
ChangeDataCapture} application invocation.
+ * Lifecycle of consumer is the following:
+ * <ul>
+ *     <li>Start of the consumer {@link #start()}.</li>
+ *     <li>Notification of the consumer by the {@link #onChange(Iterator)} 
call.</li>
+ *     <li>Stop of the consumer {@link #stop()}.</li>
+ * </ul>
+ *
+ * In case consumer implementation wants to user {@link IgniteLogger}, please, 
use, {@link LoggerResource} annotation:
+ * <pre> {@code
+ * public class ChangeDataCaptureConsumer implements ChangeDataCaptureConsumer 
{
+ *     @LoggerReource
+ *     private IgniteLogger log;
+ *
+ *     ...
+ * }
+ * }</pre>
+ *
+ * Note, consumption of the {@link ChangeDataCaptureEvent} will started from 
the last saved offset.
+ * Offset of consumptions is saved on the disk every time {@link 
#onChange(Iterator)} returns {@code true}.
+ *
+ * @see ChangeDataCapture
+ * @see ChangeDataCaptureEvent
+ * @see ChangeEventOrder
+ */
+@IgniteExperimental
+public interface ChangeDataCaptureConsumer {
+    /**
+     * Starts the consumer.
+     */
+    public void start();
+
+    /**
+     * Handles entry changes events.
+     * If this method return {@code true} then current offset will be stored 
and ongoing notifications after CDC application fail/restart
+     * will be started from it.
+     *
+     * @param events Entry change events.
+     * @return {@code True} if current offset should be saved on the disk to 
continue from it in case any failures or restart.
+     */
+    public boolean onChange(Iterator<ChangeDataCaptureEvent> events);

Review comment:
       `onChange` but param is `events`.
   Could we make this consistent? 
   Also, how this method called at other CDCs?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to