http://git-wip-us.apache.org/repos/asf/ignite/blob/54bac750/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/cache/CacheBasedDatasetBuilder.java ---------------------------------------------------------------------- diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/cache/CacheBasedDatasetBuilder.java b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/cache/CacheBasedDatasetBuilder.java new file mode 100644 index 0000000..5c0d583 --- /dev/null +++ b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/cache/CacheBasedDatasetBuilder.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.ml.dataset.impl.cache; + +import java.io.Serializable; +import java.util.UUID; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.affinity.AffinityFunction; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.ml.dataset.DatasetBuilder; +import org.apache.ignite.ml.dataset.PartitionContextBuilder; +import org.apache.ignite.ml.dataset.PartitionDataBuilder; +import org.apache.ignite.ml.dataset.impl.cache.util.ComputeUtils; +import org.apache.ignite.ml.dataset.impl.cache.util.DatasetAffinityFunctionWrapper; + +/** + * A dataset builder that makes {@link CacheBasedDataset}. Encapsulate logic of building cache based dataset such as + * allocation required data structures and initialization of {@code context} part of partitions. + * + * @param <K> Type of a key in {@code upstream} data. + * @param <V> Type of a value in {@code upstream} data. + */ +public class CacheBasedDatasetBuilder<K, V> implements DatasetBuilder<K, V> { + /** Number of retries for the case when one of partitions not found on the node where loading is performed. */ + private static final int RETRIES = 15 * 60; + + /** Retry interval (ms) for the case when one of partitions not found on the node where loading is performed. */ + private static final int RETRY_INTERVAL = 1000; + + /** Template of the name of Ignite Cache containing partition {@code context}. */ + private static final String DATASET_CACHE_TEMPLATE = "%s_DATASET_%s"; + + /** Ignite instance. */ + private final Ignite ignite; + + /** Ignite Cache with {@code upstream} data. */ + private final IgniteCache<K, V> upstreamCache; + + /** + * Constructs a new instance of cache based dataset builder that makes {@link CacheBasedDataset}. + * + * @param ignite Ignite instance. + * @param upstreamCache Ignite Cache with {@code upstream} data. + */ + public CacheBasedDatasetBuilder(Ignite ignite, IgniteCache<K, V> upstreamCache) { + this.ignite = ignite; + this.upstreamCache = upstreamCache; + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public <C extends Serializable, D extends AutoCloseable> CacheBasedDataset<K, V, C, D> build( + PartitionContextBuilder<K, V, C> partCtxBuilder, PartitionDataBuilder<K, V, C, D> partDataBuilder) { + UUID datasetId = UUID.randomUUID(); + + // Retrieves affinity function of the upstream Ignite Cache. + CacheConfiguration<K, V> upstreamCacheConfiguration = upstreamCache.getConfiguration(CacheConfiguration.class); + AffinityFunction upstreamCacheAffinity = upstreamCacheConfiguration.getAffinity(); + + // Creates dataset cache configuration with affinity function that mimics to affinity function of the upstream + // cache. + CacheConfiguration<Integer, C> datasetCacheConfiguration = new CacheConfiguration<>(); + datasetCacheConfiguration.setName(String.format(DATASET_CACHE_TEMPLATE, upstreamCache.getName(), datasetId)); + datasetCacheConfiguration.setAffinity(new DatasetAffinityFunctionWrapper(upstreamCacheAffinity)); + + IgniteCache<Integer, C> datasetCache = ignite.createCache(datasetCacheConfiguration); + + ComputeUtils.initContext( + ignite, + upstreamCache.getName(), + datasetCache.getName(), + partCtxBuilder, + RETRIES, + RETRY_INTERVAL + ); + + return new CacheBasedDataset<>(ignite, upstreamCache, datasetCache, partDataBuilder, datasetId); + } +}
http://git-wip-us.apache.org/repos/asf/ignite/blob/54bac750/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/cache/package-info.java ---------------------------------------------------------------------- diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/cache/package-info.java b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/cache/package-info.java new file mode 100644 index 0000000..74629d7 --- /dev/null +++ b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/cache/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * <!-- Package description. --> + * Base package for cache based implementation of machine learning dataset. + */ +package org.apache.ignite.ml.dataset.impl.cache; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/54bac750/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/cache/util/ComputeUtils.java ---------------------------------------------------------------------- diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/cache/util/ComputeUtils.java b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/cache/util/ComputeUtils.java new file mode 100644 index 0000000..0785db2 --- /dev/null +++ b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/cache/util/ComputeUtils.java @@ -0,0 +1,251 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.ml.dataset.impl.cache.util; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.BitSet; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.locks.LockSupport; +import javax.cache.Cache; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteException; +import org.apache.ignite.Ignition; +import org.apache.ignite.cache.affinity.Affinity; +import org.apache.ignite.cache.query.QueryCursor; +import org.apache.ignite.cache.query.ScanQuery; +import org.apache.ignite.cluster.ClusterGroup; +import org.apache.ignite.lang.IgniteFuture; +import org.apache.ignite.ml.dataset.PartitionContextBuilder; +import org.apache.ignite.ml.dataset.PartitionDataBuilder; +import org.apache.ignite.ml.math.functions.IgniteFunction; + +/** + * Util class that provides common methods to perform computations on top of the Ignite Compute Grid. + */ +public class ComputeUtils { + /** Template of the key used to store partition {@code data} in local storage. */ + private static final String DATA_STORAGE_KEY_TEMPLATE = "part_data_storage_%s"; + + /** + * Calls the specified {@code fun} function on all partitions so that is't guaranteed that partitions with the same + * index of all specified caches will be placed on the same node and will not be moved before computation is + * finished. If partitions are placed on different nodes then call will be retried, but not more than {@code + * retries} times with {@code interval} interval specified in milliseconds. + * + * @param ignite Ignite instance. + * @param cacheNames Collection of cache names. + * @param fun Function to be applied on all partitions. + * @param retries Number of retries for the case when one of partitions not found on the node. + * @param interval Interval of retries for the case when one of partitions not found on the node. + * @param <R> Type of a result. + * @return Collection of results. + */ + public static <R> Collection<R> affinityCallWithRetries(Ignite ignite, Collection<String> cacheNames, + IgniteFunction<Integer, R> fun, int retries, int interval) { + assert cacheNames.size() > 0; + assert interval >= 0; + + String primaryCache = cacheNames.iterator().next(); + + Affinity<?> affinity = ignite.affinity(primaryCache); + int partitions = affinity.partitions(); + + BitSet completionFlags = new BitSet(partitions); + Collection<R> results = new ArrayList<>(); + + for (int t = 0; t <= retries; t++) { + ClusterGroup clusterGrp = ignite.cluster().forDataNodes(primaryCache); + + // Sends jobs. + Map<Integer, IgniteFuture<R>> futures = new HashMap<>(); + for (int part = 0; part < partitions; part++) + if (!completionFlags.get(part)) { + final int currPart = part; + + futures.put( + currPart, + ignite.compute(clusterGrp).affinityCallAsync(cacheNames, currPart, () -> fun.apply(currPart)) + ); + } + + // Collects results. + for (int part : futures.keySet()) + try { + R res = futures.get(part).get(); + results.add(res); + completionFlags.set(part); + } + catch (IgniteException ignore) { + } + + if (completionFlags.cardinality() == partitions) + return results; + + LockSupport.parkNanos(interval * 1_000_000); + } + + throw new IllegalStateException(); + } + + /** + * Calls the specified {@code fun} function on all partitions so that is't guaranteed that partitions with the same + * index of all specified caches will be placed on the same node and will not be moved before computation is + * finished. If partitions are placed on different nodes then call will be retried, but not more than {@code + * retries} times. + * + * @param ignite Ignite instance. + * @param cacheNames Collection of cache names. + * @param fun Function to be applied on all partitions. + * @param retries Number of retries for the case when one of partitions not found on the node. + * @param <R> Type of a result. + * @return Collection of results. + */ + public static <R> Collection<R> affinityCallWithRetries(Ignite ignite, Collection<String> cacheNames, + IgniteFunction<Integer, R> fun, int retries) { + return affinityCallWithRetries(ignite, cacheNames, fun, retries, 0); + } + + /** + * Extracts partition {@code data} from the local storage, if it's not found in local storage recovers this {@code + * data} from a partition {@code upstream} and {@code context}. Be aware that this method should be called from + * the node where partition is placed. + * + * @param ignite Ignite instance. + * @param upstreamCacheName Name of an {@code upstream} cache. + * @param datasetCacheName Name of a partition {@code context} cache. + * @param datasetId Dataset ID. + * @param part Partition index. + * @param partDataBuilder Partition data builder. + * @param <K> Type of a key in {@code upstream} data. + * @param <V> Type of a value in {@code upstream} data. + * @param <C> Type of a partition {@code context}. + * @param <D> Type of a partition {@code data}. + * @return Partition {@code data}. + */ + public static <K, V, C extends Serializable, D extends AutoCloseable> D getData(Ignite ignite, + String upstreamCacheName, String datasetCacheName, UUID datasetId, int part, + PartitionDataBuilder<K, V, C, D> partDataBuilder) { + + PartitionDataStorage dataStorage = (PartitionDataStorage)ignite + .cluster() + .nodeLocalMap() + .computeIfAbsent(String.format(DATA_STORAGE_KEY_TEMPLATE, datasetId), key -> new PartitionDataStorage()); + + return dataStorage.computeDataIfAbsent(part, () -> { + IgniteCache<Integer, C> learningCtxCache = ignite.cache(datasetCacheName); + C ctx = learningCtxCache.get(part); + + IgniteCache<K, V> upstreamCache = ignite.cache(upstreamCacheName); + + ScanQuery<K, V> qry = new ScanQuery<>(); + qry.setLocal(true); + qry.setPartition(part); + + long cnt = upstreamCache.localSizeLong(part); + try (QueryCursor<Cache.Entry<K, V>> cursor = upstreamCache.query(qry)) { + return partDataBuilder.build(new UpstreamCursorAdapter<>(cursor.iterator(), cnt), cnt, ctx); + } + }); + } + + /** + * Initializes partition {@code context} by loading it from a partition {@code upstream}. + * + * @param ignite Ignite instance. + * @param upstreamCacheName Name of an {@code upstream} cache. + * @param datasetCacheName Name of a partition {@code context} cache. + * @param ctxBuilder Partition {@code context} builder. + * @param <K> Type of a key in {@code upstream} data. + * @param <V> Type of a value in {@code upstream} data. + * @param <C> Type of a partition {@code context}. + */ + public static <K, V, C extends Serializable> void initContext(Ignite ignite, String upstreamCacheName, + String datasetCacheName, PartitionContextBuilder<K, V, C> ctxBuilder, int retries, int interval) { + affinityCallWithRetries(ignite, Arrays.asList(datasetCacheName, upstreamCacheName), part -> { + Ignite locIgnite = Ignition.localIgnite(); + + IgniteCache<K, V> locUpstreamCache = locIgnite.cache(upstreamCacheName); + + ScanQuery<K, V> qry = new ScanQuery<>(); + qry.setLocal(true); + qry.setPartition(part); + + long cnt = locUpstreamCache.localSizeLong(part); + C ctx; + try (QueryCursor<Cache.Entry<K, V>> cursor = locUpstreamCache.query(qry)) { + ctx = ctxBuilder.build(new UpstreamCursorAdapter<>(cursor.iterator(), cnt), cnt); + } + + IgniteCache<Integer, C> datasetCache = locIgnite.cache(datasetCacheName); + + datasetCache.put(part, ctx); + + return part; + }, retries, interval); + } + + /** + * Initializes partition {@code context} by loading it from a partition {@code upstream}. + * + * @param ignite Ignite instance. + * @param upstreamCacheName Name of an {@code upstream} cache. + * @param datasetCacheName Name of a partition {@code context} cache. + * @param ctxBuilder Partition {@code context} builder. + * @param retries Number of retries for the case when one of partitions not found on the node. + * @param <K> Type of a key in {@code upstream} data. + * @param <V> Type of a value in {@code upstream} data. + * @param <C> Type of a partition {@code context}. + */ + public static <K, V, C extends Serializable> void initContext(Ignite ignite, String upstreamCacheName, + String datasetCacheName, PartitionContextBuilder<K, V, C> ctxBuilder, int retries) { + initContext(ignite, upstreamCacheName, datasetCacheName, ctxBuilder, retries, 0); + } + + /** + * Extracts partition {@code context} from the Ignite Cache. + * + * @param ignite Ignite instance. + * @param datasetCacheName Dataset cache names. + * @param part Partition index. + * @param <C> Type of a partition {@code context}. + * @return Partition {@code context}. + */ + public static <C extends Serializable> C getContext(Ignite ignite, String datasetCacheName, int part) { + IgniteCache<Integer, C> datasetCache = ignite.cache(datasetCacheName); + return datasetCache.get(part); + } + + /** + * Saves the specified partition {@code context} into the Ignite Cache. + * + * @param ignite Ignite instance. + * @param datasetCacheName Dataset cache name. + * @param part Partition index. + * @param <C> Type of a partition {@code context}. + */ + public static <C extends Serializable> void saveContext(Ignite ignite, String datasetCacheName, int part, C ctx) { + IgniteCache<Integer, C> datasetCache = ignite.cache(datasetCacheName); + datasetCache.put(part, ctx); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/54bac750/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/cache/util/DatasetAffinityFunctionWrapper.java ---------------------------------------------------------------------- diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/cache/util/DatasetAffinityFunctionWrapper.java b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/cache/util/DatasetAffinityFunctionWrapper.java new file mode 100644 index 0000000..a8f6826 --- /dev/null +++ b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/cache/util/DatasetAffinityFunctionWrapper.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.ml.dataset.impl.cache.util; + +import java.util.List; +import java.util.UUID; +import org.apache.ignite.cache.affinity.AffinityFunction; +import org.apache.ignite.cache.affinity.AffinityFunctionContext; +import org.apache.ignite.cluster.ClusterNode; + +/** + * Affinity function wrapper that uses key as a partition index and delegates all other functions to specified + * delegate. + */ +public class DatasetAffinityFunctionWrapper implements AffinityFunction { + /** */ + private static final long serialVersionUID = -8233787063079973753L; + + /** Delegate that actually performs all methods except {@code partition()}. */ + private final AffinityFunction delegate; + + /** + * Constructs a new instance of affinity function wrapper. + * + * @param delegate Affinity function which actually performs all methods except {@code partition()}. + */ + public DatasetAffinityFunctionWrapper(AffinityFunction delegate) { + this.delegate = delegate; + } + + /** {@inheritDoc} */ + @Override public void reset() { + delegate.reset(); + } + + /** {@inheritDoc} */ + @Override public int partitions() { + return delegate.partitions(); + } + + /** + * Returns key as a partition index. + * + * @param key Partition index. + * @return Partition index. + */ + @Override public int partition(Object key) { + return (Integer) key; + } + + /** {@inheritDoc} */ + @Override public List<List<ClusterNode>> assignPartitions(AffinityFunctionContext affCtx) { + return delegate.assignPartitions(affCtx); + } + + /** {@inheritDoc} */ + @Override public void removeNode(UUID nodeId) { + delegate.removeNode(nodeId); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/54bac750/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/cache/util/PartitionDataStorage.java ---------------------------------------------------------------------- diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/cache/util/PartitionDataStorage.java b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/cache/util/PartitionDataStorage.java new file mode 100644 index 0000000..d5c47ee --- /dev/null +++ b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/cache/util/PartitionDataStorage.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.ml.dataset.impl.cache.util; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Function; +import java.util.function.Supplier; + +/** + * Local storage used to keep partition {@code data}. + */ +class PartitionDataStorage { + /** Storage of a partition {@code data}. */ + private final ConcurrentMap<Integer, Object> storage = new ConcurrentHashMap<>(); + + /** Storage of locks correspondent to partition {@code data} objects. */ + private final ConcurrentMap<Integer, Lock> locks = new ConcurrentHashMap<>(); + + /** + * Retrieves partition {@code data} correspondent to specified partition index if it exists in local storage or + * loads it using the specified {@code supplier}. Unlike {@link ConcurrentMap#computeIfAbsent(Object, Function)}, + * this method guarantees that function will be called only once. + * + * @param <D> Type of data. + * @param part Partition index. + * @param supplier Partition {@code data} supplier. + * @return Partition {@code data}. + */ + @SuppressWarnings("unchecked") + <D> D computeDataIfAbsent(int part, Supplier<D> supplier) { + Object data = storage.get(part); + + if (data == null) { + Lock lock = locks.computeIfAbsent(part, p -> new ReentrantLock()); + + lock.lock(); + try { + data = storage.computeIfAbsent(part, p -> supplier.get()); + } + finally { + lock.unlock(); + } + } + + return (D)data; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/54bac750/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/cache/util/UpstreamCursorAdapter.java ---------------------------------------------------------------------- diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/cache/util/UpstreamCursorAdapter.java b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/cache/util/UpstreamCursorAdapter.java new file mode 100644 index 0000000..4482af7 --- /dev/null +++ b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/cache/util/UpstreamCursorAdapter.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.ml.dataset.impl.cache.util; + +import java.util.Iterator; +import java.util.NoSuchElementException; +import javax.cache.Cache; +import org.apache.ignite.ml.dataset.UpstreamEntry; + +/** + * Cursor adapter used to transform {@code Cache.Entry} received from Ignite Cache query cursor into DLC-specific + * {@link UpstreamEntry}. + * + * @param <K> Type of an upstream value key. + * @param <V> Type of an upstream value. + */ +public class UpstreamCursorAdapter<K, V> implements Iterator<UpstreamEntry<K, V>> { + /** Cache entry iterator. */ + private final Iterator<Cache.Entry<K, V>> delegate; + + /** Size. */ + private long cnt; + + /** + * Constructs a new instance of iterator. + * + * @param delegate Cache entry iterator. + */ + UpstreamCursorAdapter(Iterator<Cache.Entry<K, V>> delegate, long cnt) { + this.delegate = delegate; + this.cnt = cnt; + } + + /** {@inheritDoc} */ + @Override public boolean hasNext() { + return delegate.hasNext() && cnt > 0; + } + + /** {@inheritDoc} */ + @Override public UpstreamEntry<K, V> next() { + if (cnt == 0) + throw new NoSuchElementException(); + + cnt--; + + Cache.Entry<K, V> next = delegate.next(); + + if (next == null) + return null; + + return new UpstreamEntry<>(next.getKey(), next.getValue()); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/54bac750/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/cache/util/package-info.java ---------------------------------------------------------------------- diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/cache/util/package-info.java b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/cache/util/package-info.java new file mode 100644 index 0000000..89e248f --- /dev/null +++ b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/cache/util/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * <!-- Package description. --> + * Contains util classes used in cache based implementation of dataset. + */ +package org.apache.ignite.ml.dataset.impl.cache.util; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/54bac750/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/local/LocalDataset.java ---------------------------------------------------------------------- diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/local/LocalDataset.java b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/local/LocalDataset.java new file mode 100644 index 0000000..c08b7de --- /dev/null +++ b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/local/LocalDataset.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.ml.dataset.impl.local; + +import java.io.Serializable; +import java.util.List; +import org.apache.ignite.ml.dataset.Dataset; +import org.apache.ignite.ml.math.functions.IgniteBiFunction; +import org.apache.ignite.ml.math.functions.IgniteBinaryOperator; +import org.apache.ignite.ml.math.functions.IgniteTriFunction; + +/** + * An implementation of dataset based on local data structures such as {@code Map} and {@code List} and doesn't requires + * Ignite environment. Introduces for testing purposes mostly, but can be used for simple local computations as well. + * + * @param <C> Type of a partition {@code context}. + * @param <D> Type of a partition {@code data}. + */ +public class LocalDataset<C extends Serializable, D extends AutoCloseable> implements Dataset<C, D> { + /** Partition {@code context} storage. */ + private final List<C> ctx; + + /** Partition {@code data} storage. */ + private final List<D> data; + + /** + * Constructs a new instance of dataset based on local data structures such as {@code Map} and {@code List} and + * doesn't requires Ignite environment. + * + * @param ctx Partition {@code context} storage. + * @param data Partition {@code data} storage. + */ + LocalDataset(List<C> ctx, List<D> data) { + this.ctx = ctx; + this.data = data; + } + + /** {@inheritDoc} */ + @Override public <R> R computeWithCtx(IgniteTriFunction<C, D, Integer, R> map, IgniteBinaryOperator<R> reduce, + R identity) { + R res = identity; + + for (int part = 0; part < ctx.size(); part++) + res = reduce.apply(res, map.apply(ctx.get(part), data.get(part), part)); + + return res; + } + + /** {@inheritDoc} */ + @Override public <R> R compute(IgniteBiFunction<D, Integer, R> map, IgniteBinaryOperator<R> reduce, R identity) { + R res = identity; + + for (int part = 0; part < data.size(); part++) + res = reduce.apply(res, map.apply(data.get(part), part)); + + return res; + } + + /** {@inheritDoc} */ + @Override public void close() { + // Do nothing, GC will clean up. + } + + /** */ + public List<C> getCtx() { + return ctx; + } + + /** */ + public List<D> getData() { + return data; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/54bac750/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/local/LocalDatasetBuilder.java ---------------------------------------------------------------------- diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/local/LocalDatasetBuilder.java b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/local/LocalDatasetBuilder.java new file mode 100644 index 0000000..0dc1ed6 --- /dev/null +++ b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/local/LocalDatasetBuilder.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.ml.dataset.impl.local; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import org.apache.ignite.ml.dataset.DatasetBuilder; +import org.apache.ignite.ml.dataset.PartitionContextBuilder; +import org.apache.ignite.ml.dataset.PartitionDataBuilder; +import org.apache.ignite.ml.dataset.UpstreamEntry; +import org.apache.ignite.ml.math.functions.IgniteFunction; + +/** + * A dataset builder that makes {@link LocalDataset}. Encapsulate logic of building local dataset such as allocation + * required data structures and initialization of {@code context} part of partitions. + * + * @param <K> Type of a key in {@code upstream} data. + * @param <V> Type of a value in {@code upstream} data. + */ +public class LocalDatasetBuilder<K, V> implements DatasetBuilder<K, V> { + /** {@code Map} with upstream data. */ + private final Map<K, V> upstreamMap; + + /** Number of partitions. */ + private final int partitions; + + /** + * Constructs a new instance of local dataset builder that makes {@link LocalDataset}. + * + * @param upstreamMap {@code Map} with upstream data. + * @param partitions Number of partitions. + */ + public LocalDatasetBuilder(Map<K, V> upstreamMap, int partitions) { + this.upstreamMap = upstreamMap; + this.partitions = partitions; + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public <C extends Serializable, D extends AutoCloseable> LocalDataset<C, D> build( + PartitionContextBuilder<K, V, C> partCtxBuilder, PartitionDataBuilder<K, V, C, D> partDataBuilder) { + List<C> ctxList = new ArrayList<>(); + List<D> dataList = new ArrayList<>(); + + int partSize = Math.max(1, upstreamMap.size() / partitions); + + Iterator<K> firstKeysIter = upstreamMap.keySet().iterator(); + Iterator<K> secondKeysIter = upstreamMap.keySet().iterator(); + + int ptr = 0; + for (int part = 0; part < partitions; part++) { + int cnt = part == partitions - 1 ? upstreamMap.size() - ptr : Math.min(partSize, upstreamMap.size() - ptr); + + C ctx = partCtxBuilder.build( + new IteratorWindow<>(firstKeysIter, k -> new UpstreamEntry<>(k, upstreamMap.get(k)), cnt), + cnt + ); + + D data = partDataBuilder.build( + new IteratorWindow<>(secondKeysIter, k -> new UpstreamEntry<>(k, upstreamMap.get(k)), cnt), + cnt, + ctx + ); + + ctxList.add(ctx); + dataList.add(data); + + ptr += cnt; + } + + return new LocalDataset<>(ctxList, dataList); + } + + /** + * Utils class that wraps iterator so that it produces only specified number of entries and allows to transform + * entries from one type to another. + * + * @param <K> Initial type of entries. + * @param <T> Target type of entries. + */ + private static class IteratorWindow<K, T> implements Iterator<T> { + /** Delegate iterator. */ + private final Iterator<K> delegate; + + /** Transformer that transforms entries from one type to another. */ + private final IgniteFunction<K, T> map; + + /** Count of entries to produce. */ + private final int cnt; + + /** Number of already produced entries. */ + private int ptr; + + /** + * Constructs a new instance of iterator window wrapper. + * + * @param delegate Delegate iterator. + * @param map Transformer that transforms entries from one type to another. + * @param cnt Count of entries to produce. + */ + IteratorWindow(Iterator<K> delegate, IgniteFunction<K, T> map, int cnt) { + this.delegate = delegate; + this.map = map; + this.cnt = cnt; + } + + /** {@inheritDoc} */ + @Override public boolean hasNext() { + return delegate.hasNext() && ptr < cnt; + } + + /** {@inheritDoc} */ + @Override public T next() { + ++ptr; + + return map.apply(delegate.next()); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/54bac750/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/local/package-info.java ---------------------------------------------------------------------- diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/local/package-info.java b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/local/package-info.java new file mode 100644 index 0000000..2b1b195 --- /dev/null +++ b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/local/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * <!-- Package description. --> + * Base package for local implementation of machine learning dataset. + */ +package org.apache.ignite.ml.dataset.impl.local; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/54bac750/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/package-info.java ---------------------------------------------------------------------- diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/package-info.java b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/package-info.java new file mode 100644 index 0000000..031a56a --- /dev/null +++ b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * <!-- Package description. --> + * Base package for implementations of machine learning dataset. + */ +package org.apache.ignite.ml.dataset.impl; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/54bac750/modules/ml/src/main/java/org/apache/ignite/ml/dataset/package-info.java ---------------------------------------------------------------------- diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/dataset/package-info.java b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/package-info.java new file mode 100644 index 0000000..96a63a7 --- /dev/null +++ b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * <!-- Package description. --> + * Base package for machine learning dataset classes. + */ +package org.apache.ignite.ml.dataset; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/54bac750/modules/ml/src/main/java/org/apache/ignite/ml/dataset/primitive/DatasetWrapper.java ---------------------------------------------------------------------- diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/dataset/primitive/DatasetWrapper.java b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/primitive/DatasetWrapper.java new file mode 100644 index 0000000..578a149 --- /dev/null +++ b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/primitive/DatasetWrapper.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.ml.dataset.primitive; + +import java.io.Serializable; +import org.apache.ignite.ml.dataset.Dataset; +import org.apache.ignite.ml.math.functions.IgniteBiFunction; +import org.apache.ignite.ml.math.functions.IgniteBinaryOperator; +import org.apache.ignite.ml.math.functions.IgniteTriFunction; + +/** + * A dataset wrapper that allows to introduce new functionality based on common {@code compute} methods. + * + * @param <C> Type of a partition {@code context}. + * @param <D> Type of a partition {@code data}. + * + * @see SimpleDataset + * @see SimpleLabeledDataset + */ +public class DatasetWrapper<C extends Serializable, D extends AutoCloseable> implements Dataset<C, D> { + /** Delegate that performs {@code compute} actions. */ + protected final Dataset<C, D> delegate; + + /** + * Constructs a new instance of dataset wrapper that delegates {@code compute} actions to the actual delegate. + * + * @param delegate Delegate that performs {@code compute} actions. + */ + public DatasetWrapper(Dataset<C, D> delegate) { + this.delegate = delegate; + } + + /** {@inheritDoc} */ + @Override public <R> R computeWithCtx(IgniteTriFunction<C, D, Integer, R> map, IgniteBinaryOperator<R> reduce, + R identity) { + return delegate.computeWithCtx(map, reduce, identity); + } + + /** {@inheritDoc} */ + @Override public <R> R compute(IgniteBiFunction<D, Integer, R> map, IgniteBinaryOperator<R> reduce, R identity) { + return delegate.compute(map, reduce, identity); + } + + /** {@inheritDoc} */ + @Override public void close() throws Exception { + delegate.close(); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/54bac750/modules/ml/src/main/java/org/apache/ignite/ml/dataset/primitive/SimpleDataset.java ---------------------------------------------------------------------- diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/dataset/primitive/SimpleDataset.java b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/primitive/SimpleDataset.java new file mode 100644 index 0000000..47c0c4b --- /dev/null +++ b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/primitive/SimpleDataset.java @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.ml.dataset.primitive; + +import com.github.fommil.netlib.BLAS; +import java.io.Serializable; +import org.apache.ignite.ml.dataset.Dataset; +import org.apache.ignite.ml.dataset.primitive.data.SimpleDatasetData; + +/** + * A simple dataset introduces additional methods based on a matrix of features. + * + * @param <C> Type of a partition {@code context}. + */ +public class SimpleDataset<C extends Serializable> extends DatasetWrapper<C, SimpleDatasetData> { + /** BLAS (Basic Linear Algebra Subprograms) instance. */ + private static final BLAS blas = BLAS.getInstance(); + + /** + * Creates a new instance of simple dataset that introduces additional methods based on a matrix of features. + * + * @param delegate Delegate that performs {@code compute} actions. + */ + public SimpleDataset(Dataset<C, SimpleDatasetData> delegate) { + super(delegate); + } + + /** + * Calculates mean value by all columns. + * + * @return Mean values. + */ + public double[] mean() { + ValueWithCount<double[]> res = delegate.compute((data, partIdx) -> { + double[] features = data.getFeatures(); + int rows = data.getRows(); + int cols = data.getCols(); + + double[] y = new double[cols]; + + for (int col = 0; col < cols; col++) + for (int j = col * rows; j < (col + 1) * rows; j++) + y[col] += features[j]; + + return new ValueWithCount<>(y, rows); + }, (a, b) -> a == null ? b : b == null ? a : new ValueWithCount<>(sum(a.val, b.val), a.cnt + b.cnt)); + + if (res != null) { + blas.dscal(res.val.length, 1.0 / res.cnt, res.val, 1); + return res.val; + } + + return null; + } + + /** + * Calculates standard deviation by all columns. + * + * @return Standard deviations. + */ + public double[] std() { + double[] mean = mean(); + ValueWithCount<double[]> res = delegate.compute(data -> { + double[] features = data.getFeatures(); + int rows = data.getRows(); + int cols = data.getCols(); + + double[] y = new double[cols]; + + for (int col = 0; col < cols; col++) + for (int j = col * rows; j < (col + 1) * rows; j++) + y[col] += Math.pow(features[j] - mean[col], 2); + + return new ValueWithCount<>(y, rows); + }, (a, b) -> a == null ? b : b == null ? a : new ValueWithCount<>(sum(a.val, b.val), a.cnt + b.cnt)); + + if (res != null) { + blas.dscal(res.val.length, 1.0 / res.cnt, res.val, 1); + for (int i = 0; i < res.val.length; i++) + res.val[i] = Math.sqrt(res.val[i]); + return res.val; + } + + return null; + } + + /** + * Calculates covariance matrix by all columns. + * + * @return Covariance matrix. + */ + public double[][] cov() { + double[] mean = mean(); + ValueWithCount<double[][]> res = delegate.compute(data -> { + double[] features = data.getFeatures(); + int rows = data.getRows(); + int cols = data.getCols(); + + double[][] y = new double[cols][cols]; + + for (int firstCol = 0; firstCol < cols; firstCol++) + for (int secondCol = 0; secondCol < cols; secondCol++) { + + for (int k = 0; k < rows; k++) { + double firstVal = features[rows * firstCol + k]; + double secondVal = features[rows * secondCol + k]; + y[firstCol][secondCol] += ((firstVal - mean[firstCol]) * (secondVal - mean[secondCol])); + } + } + + return new ValueWithCount<>(y, rows); + }, (a, b) -> a == null ? b : b == null ? a : new ValueWithCount<>(sum(a.val, b.val), a.cnt + b.cnt)); + + return res != null ? scale(res.val, 1.0 / res.cnt) : null; + } + + /** + * Calculates correlation matrix by all columns. + * + * @return Correlation matrix. + */ + public double[][] corr() { + double[][] cov = cov(); + double[] std = std(); + + for (int i = 0; i < cov.length; i++) + for (int j = 0; j < cov[0].length; j++) + cov[i][j] /= (std[i]*std[j]); + + return cov; + } + + /** + * Returns the sum of the two specified vectors. Be aware that it is in-place operation. + * + * @param a First vector. + * @param b Second vector. + * @return Sum of the two specified vectors. + */ + private static double[] sum(double[] a, double[] b) { + for (int i = 0; i < a.length; i++) + a[i] += b[i]; + + return a; + } + + /** + * Returns the sum of the two specified matrices. Be aware that it is in-place operation. + * + * @param a First matrix. + * @param b Second matrix. + * @return Sum of the two specified matrices. + */ + private static double[][] sum(double[][] a, double[][] b) { + for (int i = 0; i < a.length; i++) + for (int j = 0; j < a[i].length; j++) + a[i][j] += b[i][j]; + + return a; + } + + /** + * Multiplies all elements of the specified matrix on specified multiplier {@code alpha}. Be aware that it is + * in-place operation. + * + * @param a Matrix to be scaled. + * @param alpha Multiplier. + * @return Scaled matrix. + */ + private static double[][] scale(double[][] a, double alpha) { + for (int i = 0; i < a.length; i++) + for (int j = 0; j < a[i].length; j++) + a[i][j] *= alpha; + + return a; + } + + /** + * Util class that keeps values and count of rows this value has been calculated on. + * + * @param <V> Type of a value. + */ + private static class ValueWithCount<V> { + /** Value. */ + private final V val; + + /** Count of rows the value has been calculated on. */ + private final int cnt; + + /** + * Constructs a new instance of value with count. + * + * @param val Value. + * @param cnt Count of rows the value has been calculated on. + */ + ValueWithCount(V val, int cnt) { + this.val = val; + this.cnt = cnt; + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/54bac750/modules/ml/src/main/java/org/apache/ignite/ml/dataset/primitive/SimpleLabeledDataset.java ---------------------------------------------------------------------- diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/dataset/primitive/SimpleLabeledDataset.java b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/primitive/SimpleLabeledDataset.java new file mode 100644 index 0000000..1e91eec --- /dev/null +++ b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/primitive/SimpleLabeledDataset.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.ml.dataset.primitive; + +import java.io.Serializable; +import org.apache.ignite.ml.dataset.Dataset; +import org.apache.ignite.ml.dataset.primitive.data.SimpleLabeledDatasetData; + +/** + * A simple labeled dataset introduces additional methods based on a matrix of features and labels vector. + * + * @param <C> Type of a partition {@code context}. + */ +public class SimpleLabeledDataset<C extends Serializable> extends DatasetWrapper<C, SimpleLabeledDatasetData> { + /** + * Creates a new instance of simple labeled dataset that introduces additional methods based on a matrix of features + * and labels vector. + * + * @param delegate Delegate that performs {@code compute} actions. + */ + public SimpleLabeledDataset(Dataset<C, SimpleLabeledDatasetData> delegate) { + super(delegate); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/54bac750/modules/ml/src/main/java/org/apache/ignite/ml/dataset/primitive/builder/context/EmptyContextBuilder.java ---------------------------------------------------------------------- diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/dataset/primitive/builder/context/EmptyContextBuilder.java b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/primitive/builder/context/EmptyContextBuilder.java new file mode 100644 index 0000000..03b69b5 --- /dev/null +++ b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/primitive/builder/context/EmptyContextBuilder.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.ml.dataset.primitive.builder.context; + +import java.util.Iterator; +import org.apache.ignite.ml.dataset.PartitionContextBuilder; +import org.apache.ignite.ml.dataset.UpstreamEntry; +import org.apache.ignite.ml.dataset.primitive.context.EmptyContext; + +/** + * A partition {@code context} builder that makes {@link EmptyContext}. + * + * @param <K> Type of a key in {@code upstream} data. + * @param <V> Type of a value in {@code upstream} data. + */ +public class EmptyContextBuilder<K, V> implements PartitionContextBuilder<K, V, EmptyContext> { + /** */ + private static final long serialVersionUID = 6620781747993467186L; + + /** {@inheritDoc} */ + @Override public EmptyContext build(Iterator<UpstreamEntry<K, V>> upstreamData, long upstreamDataSize) { + return new EmptyContext(); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/54bac750/modules/ml/src/main/java/org/apache/ignite/ml/dataset/primitive/builder/context/package-info.java ---------------------------------------------------------------------- diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/dataset/primitive/builder/context/package-info.java b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/primitive/builder/context/package-info.java new file mode 100644 index 0000000..90d51df --- /dev/null +++ b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/primitive/builder/context/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * <!-- Package description. --> + * Contains partition {@code context} builders. + */ +package org.apache.ignite.ml.dataset.primitive.builder.context; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/54bac750/modules/ml/src/main/java/org/apache/ignite/ml/dataset/primitive/builder/data/SimpleDatasetDataBuilder.java ---------------------------------------------------------------------- diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/dataset/primitive/builder/data/SimpleDatasetDataBuilder.java b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/primitive/builder/data/SimpleDatasetDataBuilder.java new file mode 100644 index 0000000..6f29e2f --- /dev/null +++ b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/primitive/builder/data/SimpleDatasetDataBuilder.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.ml.dataset.primitive.builder.data; + +import java.io.Serializable; +import java.util.Iterator; +import org.apache.ignite.ml.dataset.PartitionDataBuilder; +import org.apache.ignite.ml.dataset.UpstreamEntry; +import org.apache.ignite.ml.dataset.primitive.data.SimpleDatasetData; +import org.apache.ignite.ml.math.functions.IgniteBiFunction; + +/** + * A partition {@code data} builder that makes {@link SimpleDatasetData}. + * + * @param <K> Type of a key in <tt>upstream</tt> data. + * @param <V> Type of a value in <tt>upstream</tt> data. + * @param <C> Type of a partition <tt>context</tt>. + */ +public class SimpleDatasetDataBuilder<K, V, C extends Serializable> + implements PartitionDataBuilder<K, V, C, SimpleDatasetData> { + /** */ + private static final long serialVersionUID = 756800193212149975L; + + /** Function that extracts features from an {@code upstream} data. */ + private final IgniteBiFunction<K, V, double[]> featureExtractor; + + /** Number of columns (features). */ + private final int cols; + + /** + * Construct a new instance of partition {@code data} builder that makes {@link SimpleDatasetData}. + * + * @param featureExtractor Function that extracts features from an {@code upstream} data. + * @param cols Number of columns (features). + */ + public SimpleDatasetDataBuilder(IgniteBiFunction<K, V, double[]> featureExtractor, int cols) { + this.featureExtractor = featureExtractor; + this.cols = cols; + } + + /** {@inheritDoc} */ + @Override public SimpleDatasetData build(Iterator<UpstreamEntry<K, V>> upstreamData, long upstreamDataSize, C ctx) { + // Prepares the matrix of features in flat column-major format. + double[] features = new double[Math.toIntExact(upstreamDataSize * cols)]; + + int ptr = 0; + while (upstreamData.hasNext()) { + UpstreamEntry<K, V> entry = upstreamData.next(); + double[] row = featureExtractor.apply(entry.getKey(), entry.getValue()); + + assert row.length == cols : "Feature extractor must return exactly " + cols + " features"; + + for (int i = 0; i < cols; i++) + features[Math.toIntExact(i * upstreamDataSize + ptr)] = row[i]; + + ptr++; + } + + return new SimpleDatasetData(features, Math.toIntExact(upstreamDataSize), cols); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/54bac750/modules/ml/src/main/java/org/apache/ignite/ml/dataset/primitive/builder/data/SimpleLabeledDatasetDataBuilder.java ---------------------------------------------------------------------- diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/dataset/primitive/builder/data/SimpleLabeledDatasetDataBuilder.java b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/primitive/builder/data/SimpleLabeledDatasetDataBuilder.java new file mode 100644 index 0000000..12fcc4c --- /dev/null +++ b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/primitive/builder/data/SimpleLabeledDatasetDataBuilder.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.ml.dataset.primitive.builder.data; + +import java.io.Serializable; +import java.util.Iterator; +import org.apache.ignite.ml.dataset.PartitionDataBuilder; +import org.apache.ignite.ml.dataset.UpstreamEntry; +import org.apache.ignite.ml.dataset.primitive.data.SimpleLabeledDatasetData; +import org.apache.ignite.ml.math.functions.IgniteBiFunction; + +/** + * A partition {@code data} builder that makes {@link SimpleLabeledDatasetData}. + * + * @param <K> Type of a key in <tt>upstream</tt> data. + * @param <V> Type of a value in <tt>upstream</tt> data. + * @param <C> type of a partition <tt>context</tt>. + */ +public class SimpleLabeledDatasetDataBuilder<K, V, C extends Serializable> + implements PartitionDataBuilder<K, V, C, SimpleLabeledDatasetData> { + /** */ + private static final long serialVersionUID = 3678784980215216039L; + + /** Function that extracts features from an {@code upstream} data. */ + private final IgniteBiFunction<K, V, double[]> featureExtractor; + + /** Function that extracts labels from an {@code upstream} data. */ + private final IgniteBiFunction<K, V, Double> lbExtractor; + + /** Number of columns (features). */ + private final int cols; + + /** + * Constructs a new instance of partition {@code data} builder that makes {@link SimpleLabeledDatasetData}. + * + * @param featureExtractor Function that extracts features from an {@code upstream} data. + * @param lbExtractor Function that extracts labels from an {@code upstream} data. + * @param cols Number of columns (features). + */ + public SimpleLabeledDatasetDataBuilder(IgniteBiFunction<K, V, double[]> featureExtractor, + IgniteBiFunction<K, V, Double> lbExtractor, int cols) { + this.featureExtractor = featureExtractor; + this.lbExtractor = lbExtractor; + this.cols = cols; + } + + /** {@inheritDoc} */ + @Override public SimpleLabeledDatasetData build(Iterator<UpstreamEntry<K, V>> upstreamData, + long upstreamDataSize, C ctx) { + // Prepares the matrix of features in flat column-major format. + double[] features = new double[Math.toIntExact(upstreamDataSize * cols)]; + double[] labels = new double[Math.toIntExact(upstreamDataSize)]; + + int ptr = 0; + while (upstreamData.hasNext()) { + UpstreamEntry<K, V> entry = upstreamData.next(); + double[] row = featureExtractor.apply(entry.getKey(), entry.getValue()); + + assert row.length == cols : "Feature extractor must return exactly " + cols + " features"; + + for (int i = 0; i < cols; i++) + features[Math.toIntExact(i * upstreamDataSize) + ptr] = row[i]; + + labels[ptr] = lbExtractor.apply(entry.getKey(), entry.getValue()); + + ptr++; + } + + return new SimpleLabeledDatasetData(features, Math.toIntExact(upstreamDataSize), cols, labels); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/54bac750/modules/ml/src/main/java/org/apache/ignite/ml/dataset/primitive/builder/data/package-info.java ---------------------------------------------------------------------- diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/dataset/primitive/builder/data/package-info.java b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/primitive/builder/data/package-info.java new file mode 100644 index 0000000..3a38043 --- /dev/null +++ b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/primitive/builder/data/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * <!-- Package description. --> + * Contains partition {@code data} builders. + */ +package org.apache.ignite.ml.dataset.primitive.builder.data; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/54bac750/modules/ml/src/main/java/org/apache/ignite/ml/dataset/primitive/builder/package-info.java ---------------------------------------------------------------------- diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/dataset/primitive/builder/package-info.java b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/primitive/builder/package-info.java new file mode 100644 index 0000000..7511639 --- /dev/null +++ b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/primitive/builder/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * <!-- Package description. --> + * Base package for partition {@code data} and {@code context} builders. + */ +package org.apache.ignite.ml.dataset.primitive.builder; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/54bac750/modules/ml/src/main/java/org/apache/ignite/ml/dataset/primitive/context/EmptyContext.java ---------------------------------------------------------------------- diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/dataset/primitive/context/EmptyContext.java b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/primitive/context/EmptyContext.java new file mode 100644 index 0000000..12a61d4 --- /dev/null +++ b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/primitive/context/EmptyContext.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.ml.dataset.primitive.context; + +import java.io.Serializable; + +/** + * An empty partition {@code context}. + */ +public class EmptyContext implements Serializable { + /** */ + private static final long serialVersionUID = 4108938672110578991L; +} http://git-wip-us.apache.org/repos/asf/ignite/blob/54bac750/modules/ml/src/main/java/org/apache/ignite/ml/dataset/primitive/context/package-info.java ---------------------------------------------------------------------- diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/dataset/primitive/context/package-info.java b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/primitive/context/package-info.java new file mode 100644 index 0000000..6d48b7d --- /dev/null +++ b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/primitive/context/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * <!-- Package description. --> + * Contains implementation of partition {@code context}. + */ +package org.apache.ignite.ml.dataset.primitive.context; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/54bac750/modules/ml/src/main/java/org/apache/ignite/ml/dataset/primitive/data/SimpleDatasetData.java ---------------------------------------------------------------------- diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/dataset/primitive/data/SimpleDatasetData.java b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/primitive/data/SimpleDatasetData.java new file mode 100644 index 0000000..7f82720 --- /dev/null +++ b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/primitive/data/SimpleDatasetData.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.ml.dataset.primitive.data; + +import org.apache.ignite.ml.dataset.primitive.SimpleDataset; + +/** + * A partition {@code data} of the {@link SimpleDataset} containing matrix of features in flat column-major format + * stored in heap. + */ +public class SimpleDatasetData implements AutoCloseable { + /** Matrix of features in a dense flat column-major format. */ + private final double[] features; + + /** Number of rows. */ + private final int rows; + + /** Number of columns. */ + private final int cols; + + /** + * Constructs a new instance of partition {@code data} of the {@link SimpleDataset} containing matrix of features in + * flat column-major format stored in heap. + * + * @param features Matrix of features in a dense flat column-major format. + * @param rows Number of rows. + * @param cols Number of columns. + */ + public SimpleDatasetData(double[] features, int rows, int cols) { + this.features = features; + this.rows = rows; + this.cols = cols; + } + + /** */ + public double[] getFeatures() { + return features; + } + + /** */ + public int getRows() { + return rows; + } + + /** */ + public int getCols() { + return cols; + } + + /** {@inheritDoc} */ + @Override public void close() { + // Do nothing, GC will clean up. + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/54bac750/modules/ml/src/main/java/org/apache/ignite/ml/dataset/primitive/data/SimpleLabeledDatasetData.java ---------------------------------------------------------------------- diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/dataset/primitive/data/SimpleLabeledDatasetData.java b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/primitive/data/SimpleLabeledDatasetData.java new file mode 100644 index 0000000..38041f8 --- /dev/null +++ b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/primitive/data/SimpleLabeledDatasetData.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.ml.dataset.primitive.data; + +import org.apache.ignite.ml.dataset.primitive.SimpleLabeledDataset; + +/** + * A partition {@code data} of the {@link SimpleLabeledDataset} containing matrix of features in flat column-major + * format stored in heap and vector of labels stored in heap as well. + */ +public class SimpleLabeledDatasetData implements AutoCloseable { + /** Matrix with features in a dense flat column-major format. */ + private final double[] features; + + /** Number of rows. */ + private final int rows; + + /** Number of columns. */ + private final int cols; + + /** Vector with labels. */ + private final double[] labels; + + /** + * Constructs a new instance of partition {@code data} of the {@link SimpleLabeledDataset} containing matrix of + * features in flat column-major format stored in heap and vector of labels stored in heap as well. + * + * @param features Matrix with features in a dense flat column-major format. + * @param rows Number of rows. + * @param cols Number of columns. + * @param labels Vector with labels. + */ + public SimpleLabeledDatasetData(double[] features, int rows, int cols, double[] labels) { + this.features = features; + this.rows = rows; + this.cols = cols; + this.labels = labels; + } + + /** */ + public double[] getFeatures() { + return features; + } + + /** */ + public int getRows() { + return rows; + } + + /** */ + public int getCols() { + return cols; + } + + /** */ + public double[] getLabels() { + return labels; + } + + /** {@inheritDoc} */ + @Override public void close() { + // Do nothing, GC will clean up. + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/54bac750/modules/ml/src/main/java/org/apache/ignite/ml/dataset/primitive/data/package-info.java ---------------------------------------------------------------------- diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/dataset/primitive/data/package-info.java b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/primitive/data/package-info.java new file mode 100644 index 0000000..2676ad8 --- /dev/null +++ b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/primitive/data/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * <!-- Package description. --> + * Contains implementation of partition {@code data}. + */ +package org.apache.ignite.ml.dataset.primitive.data; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/54bac750/modules/ml/src/main/java/org/apache/ignite/ml/dataset/primitive/package-info.java ---------------------------------------------------------------------- diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/dataset/primitive/package-info.java b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/primitive/package-info.java new file mode 100644 index 0000000..e4f1c5e --- /dev/null +++ b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/primitive/package-info.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * <!-- Package description. --> + * Package that contains basic primitives build on top of {@link org.apache.ignite.ml.dataset.Dataset}. Primitives are + * simple components that can be used in other algorithms based on the dataset infrastructure or for debugging. + * + * Primitives include partition {@code context} implementations, partition {@code data} implementations and extensions + * of dataset. Partition {@code context} and {@code data} implementations can be used in other algorithm in case these + * algorithm doesn't need to keep specific data and can work with standard primitive {@code data} or {@code context}. + * Extensions of dataset provides basic most often used functions that can be used for debugging or data analysis. + */ +package org.apache.ignite.ml.dataset.primitive; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/54bac750/modules/ml/src/main/java/org/apache/ignite/ml/preprocessing/PreprocessingTrainer.java ---------------------------------------------------------------------- diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/preprocessing/PreprocessingTrainer.java b/modules/ml/src/main/java/org/apache/ignite/ml/preprocessing/PreprocessingTrainer.java new file mode 100644 index 0000000..cb321e4 --- /dev/null +++ b/modules/ml/src/main/java/org/apache/ignite/ml/preprocessing/PreprocessingTrainer.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.ml.preprocessing; + +import org.apache.ignite.ml.dataset.DatasetBuilder; +import org.apache.ignite.ml.math.functions.IgniteBiFunction; + +/** + * + * @param <K> Type of a key in {@code upstream} data. + * @param <V> Type of a value in {@code upstream} data. + * @param <T> Type of a value returned by base preprocessor. + * @param <R> Type of a value returned by preprocessor fitted by this trainer. + */ +public interface PreprocessingTrainer<K, V, T, R> { + /** + * Fits preprocessor. + * + * @param datasetBuilder Dataset builder. + * @param basePreprocessor Base preprocessor. + * @param cols Number of columns. + * @return Preprocessor. + */ + public IgniteBiFunction<K, V, R> fit(DatasetBuilder<K, V> datasetBuilder, + IgniteBiFunction<K, V, T> basePreprocessor, int cols); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/54bac750/modules/ml/src/main/java/org/apache/ignite/ml/preprocessing/normalization/NormalizationPartitionData.java ---------------------------------------------------------------------- diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/preprocessing/normalization/NormalizationPartitionData.java b/modules/ml/src/main/java/org/apache/ignite/ml/preprocessing/normalization/NormalizationPartitionData.java new file mode 100644 index 0000000..1e7a93f --- /dev/null +++ b/modules/ml/src/main/java/org/apache/ignite/ml/preprocessing/normalization/NormalizationPartitionData.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.ml.preprocessing.normalization; + +/** + * Partition data used in normalization preprocessor. + * + * @see NormalizationTrainer + * @see NormalizationPreprocessor + */ +public class NormalizationPartitionData implements AutoCloseable { + /** Minimal values. */ + private final double[] min; + + /** Maximum values. */ + private final double[] max; + + /** + * Constructs a new instance of normalization partition data. + * + * @param min Minimal values. + * @param max Maximum values. + */ + public NormalizationPartitionData(double[] min, double[] max) { + this.min = min; + this.max = max; + } + + /** */ + public double[] getMin() { + return min; + } + + /** */ + public double[] getMax() { + return max; + } + + /** */ + @Override public void close() { + // Do nothing, GC will clean up. + } +}