http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTask.java ---------------------------------------------------------------------- diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTask.java b/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTask.java deleted file mode 100644 index 90651ed..0000000 --- a/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTask.java +++ /dev/null @@ -1,107 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ -package org.apache.kafka.copycat.sink; - -import org.apache.kafka.clients.consumer.OffsetAndMetadata; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.annotation.InterfaceStability; -import org.apache.kafka.copycat.connector.Task; - -import java.util.Collection; -import java.util.Map; - -/** - * SinkTask is a Task takes records loaded from Kafka and sends them to another system. In - * addition to the basic {@link #put} interface, SinkTasks must also implement {@link #flush} - * to support offset commits. - */ -@InterfaceStability.Unstable -public abstract class SinkTask implements Task { - - /** - * <p> - * The configuration key that provides the list of topics that are inputs for this - * SinkTask. - * </p> - */ - public static final String TOPICS_CONFIG = "topics"; - - protected SinkTaskContext context; - - public void initialize(SinkTaskContext context) { - this.context = context; - } - - /** - * Start the Task. This should handle any configuration parsing and one-time setup of the task. - * @param props initial configuration - */ - @Override - public abstract void start(Map<String, String> props); - - /** - * Put the records in the sink. Usually this should send the records to the sink asynchronously - * and immediately return. - * - * If this operation fails, the SinkTask may throw a {@link org.apache.kafka.copycat.errors.RetriableException} to - * indicate that the framework should attempt to retry the same call again. Other exceptions will cause the task to - * be stopped immediately. {@link SinkTaskContext#timeout(long)} can be used to set the maximum time before the - * batch will be retried. - * - * @param records the set of records to send - */ - public abstract void put(Collection<SinkRecord> records); - - /** - * Flush all records that have been {@link #put} for the specified topic-partitions. The - * offsets are provided for convenience, but could also be determined by tracking all offsets - * included in the SinkRecords passed to {@link #put}. - * - * @param offsets mapping of TopicPartition to committed offset - */ - public abstract void flush(Map<TopicPartition, OffsetAndMetadata> offsets); - - /** - * The SinkTask use this method to create writers for newly assigned partitions in case of partition - * re-assignment. In partition re-assignment, some new partitions may be assigned to the SinkTask. - * The SinkTask needs to create writers and perform necessary recovery for the newly assigned partitions. - * This method will be called after partition re-assignment completes and before the SinkTask starts - * fetching data. - * @param partitions The list of partitions that are now assigned to the task (may include - * partitions previously assigned to the task) - */ - public void onPartitionsAssigned(Collection<TopicPartition> partitions) { - } - - /** - * The SinkTask use this method to close writers and commit offsets for partitions that are - * longer assigned to the SinkTask. This method will be called before a rebalance operation starts - * and after the SinkTask stops fetching data. - * @param partitions The list of partitions that were assigned to the consumer on the last - * rebalance - */ - public void onPartitionsRevoked(Collection<TopicPartition> partitions) { - } - - /** - * Perform any cleanup to stop this task. In SinkTasks, this method is invoked only once outstanding calls to other - * methods have completed (e.g., {@link #put(Collection)} has returned) and a final {@link #flush(Map)} and offset - * commit has completed. Implementations of this method should only need to perform final cleanup operations, such - * as closing network connections to the sink system. - */ - public abstract void stop(); -}
http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTaskContext.java ---------------------------------------------------------------------- diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTaskContext.java b/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTaskContext.java deleted file mode 100644 index 763b9a4..0000000 --- a/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTaskContext.java +++ /dev/null @@ -1,82 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ - -package org.apache.kafka.copycat.sink; - -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.annotation.InterfaceStability; - -import java.util.Map; -import java.util.Set; - -/** - * Context passed to SinkTasks, allowing them to access utilities in the copycat runtime. - */ -@InterfaceStability.Unstable -public interface SinkTaskContext { - /** - * Reset the consumer offsets for the given topic partitions. SinkTasks should use this if they manage offsets - * in the sink data store rather than using Kafka consumer offsets. For example, an HDFS connector might record - * offsets in HDFS to provide exactly once delivery. When the SinkTask is started or a rebalance occurs, the task - * would reload offsets from HDFS and use this method to reset the consumer to those offsets. - * - * SinkTasks that do not manage their own offsets do not need to use this method. - * - * @param offsets map of offsets for topic partitions - */ - void offset(Map<TopicPartition, Long> offsets); - - /** - * Reset the consumer offsets for the given topic partition. SinkTasks should use if they manage offsets - * in the sink data store rather than using Kafka consumer offsets. For example, an HDFS connector might record - * offsets in HDFS to provide exactly once delivery. When the topic partition is recovered the task - * would reload offsets from HDFS and use this method to reset the consumer to the offset. - * - * SinkTasks that do not manage their own offsets do not need to use this method. - * - * @param tp the topic partition to reset offset. - * @param offset the offset to reset to. - */ - void offset(TopicPartition tp, long offset); - - /** - * Set the timeout in milliseconds. SinkTasks should use this to indicate that they need to retry certain - * operations after the timeout. SinkTasks may have certain operations on external systems that may need - * to retry in case of failures. For example, append a record to an HDFS file may fail due to temporary network - * issues. SinkTasks use this method to set how long to wait before retrying. - * @param timeoutMs the backoff timeout in milliseconds. - */ - void timeout(long timeoutMs); - - /** - * Get the current set of assigned TopicPartitions for this task. - * @return the set of currently assigned TopicPartitions - */ - Set<TopicPartition> assignment(); - - /** - * Pause consumption of messages from the specified TopicPartitions. - * @param partitions the partitions which should be paused - */ - void pause(TopicPartition... partitions); - - /** - * Resume consumption of messages from previously paused TopicPartitions. - * @param partitions the partitions to resume - */ - void resume(TopicPartition... partitions); -} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceConnector.java ---------------------------------------------------------------------- diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceConnector.java b/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceConnector.java deleted file mode 100644 index 7258cdf..0000000 --- a/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceConnector.java +++ /dev/null @@ -1,29 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ -package org.apache.kafka.copycat.source; - -import org.apache.kafka.common.annotation.InterfaceStability; -import org.apache.kafka.copycat.connector.Connector; - -/** - * SourceConnectors implement the connector interface to pull data from another system and send - * it to Kafka. - */ -@InterfaceStability.Unstable -public abstract class SourceConnector extends Connector { - -} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceRecord.java ---------------------------------------------------------------------- diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceRecord.java b/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceRecord.java deleted file mode 100644 index 7f54c10..0000000 --- a/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceRecord.java +++ /dev/null @@ -1,109 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ - -package org.apache.kafka.copycat.source; - -import org.apache.kafka.common.annotation.InterfaceStability; -import org.apache.kafka.copycat.connector.CopycatRecord; -import org.apache.kafka.copycat.data.Schema; - -import java.util.Map; - -/** - * <p> - * SourceRecords are generated by SourceTasks and passed to Copycat for storage in - * Kafka. In addition to the standard fields in CopycatRecord which specify where data is stored - * in Kafka, they also include a sourcePartition and sourceOffset. - * </p> - * <p> - * The sourcePartition represents a single input sourcePartition that the record came from (e.g. a filename, table - * name, or topic-partition). The sourceOffset represents a position in that sourcePartition which can be used - * to resume consumption of data. - * </p> - * <p> - * These values can have arbitrary structure and should be represented using - * org.apache.kafka.copycat.data objects (or primitive values). For example, a database connector - * might specify the sourcePartition as a record containing { "db": "database_name", "table": - * "table_name"} and the sourceOffset as a Long containing the timestamp of the row. - * </p> - */ -@InterfaceStability.Unstable -public class SourceRecord extends CopycatRecord { - private final Map<String, ?> sourcePartition; - private final Map<String, ?> sourceOffset; - - public SourceRecord(Map<String, ?> sourcePartition, Map<String, ?> sourceOffset, - String topic, Integer partition, Schema valueSchema, Object value) { - this(sourcePartition, sourceOffset, topic, partition, null, null, valueSchema, value); - } - - public SourceRecord(Map<String, ?> sourcePartition, Map<String, ?> sourceOffset, - String topic, Schema valueSchema, Object value) { - this(sourcePartition, sourceOffset, topic, null, null, null, valueSchema, value); - } - - public SourceRecord(Map<String, ?> sourcePartition, Map<String, ?> sourceOffset, - String topic, Integer partition, - Schema keySchema, Object key, Schema valueSchema, Object value) { - super(topic, partition, keySchema, key, valueSchema, value); - this.sourcePartition = sourcePartition; - this.sourceOffset = sourceOffset; - } - - public Map<String, ?> sourcePartition() { - return sourcePartition; - } - - public Map<String, ?> sourceOffset() { - return sourceOffset; - } - - @Override - public boolean equals(Object o) { - if (this == o) - return true; - if (o == null || getClass() != o.getClass()) - return false; - if (!super.equals(o)) - return false; - - SourceRecord that = (SourceRecord) o; - - if (sourcePartition != null ? !sourcePartition.equals(that.sourcePartition) : that.sourcePartition != null) - return false; - if (sourceOffset != null ? !sourceOffset.equals(that.sourceOffset) : that.sourceOffset != null) - return false; - - return true; - } - - @Override - public int hashCode() { - int result = super.hashCode(); - result = 31 * result + (sourcePartition != null ? sourcePartition.hashCode() : 0); - result = 31 * result + (sourceOffset != null ? sourceOffset.hashCode() : 0); - return result; - } - - @Override - public String toString() { - return "SourceRecord{" + - "sourcePartition=" + sourcePartition + - ", sourceOffset=" + sourceOffset + - "} " + super.toString(); - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceTask.java ---------------------------------------------------------------------- diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceTask.java b/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceTask.java deleted file mode 100644 index 841943f..0000000 --- a/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceTask.java +++ /dev/null @@ -1,82 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ - -package org.apache.kafka.copycat.source; - -import org.apache.kafka.common.annotation.InterfaceStability; -import org.apache.kafka.copycat.connector.Task; - -import java.util.List; -import java.util.Map; - -/** - * SourceTask is a Task that pulls records from another system for storage in Kafka. - */ -@InterfaceStability.Unstable -public abstract class SourceTask implements Task { - - protected SourceTaskContext context; - - /** - * Initialize this SourceTask with the specified context object. - */ - public void initialize(SourceTaskContext context) { - this.context = context; - } - - /** - * Start the Task. This should handle any configuration parsing and one-time setup of the task. - * @param props initial configuration - */ - @Override - public abstract void start(Map<String, String> props); - - /** - * Poll this SourceTask for new records. This method should block if no data is currently - * available. - * - * @return a list of source records - */ - public abstract List<SourceRecord> poll() throws InterruptedException; - - /** - * <p> - * Commit the offsets, up to the offsets that have been returned by {@link #poll()}. This - * method should block until the commit is complete. - * </p> - * <p> - * SourceTasks are not required to implement this functionality; Copycat will record offsets - * automatically. This hook is provided for systems that also need to store offsets internally - * in their own system. - * </p> - */ - public void commit() throws InterruptedException { - // This space intentionally left blank. - } - - /** - * Signal this SourceTask to stop. In SourceTasks, this method only needs to signal to the task that it should stop - * trying to poll for new data and interrupt any outstanding poll() requests. It is not required that the task has - * fully stopped. Note that this method necessarily may be invoked from a different thread than {@link #poll()} and - * {@link #commit()}. - * - * For example, if a task uses a {@link java.nio.channels.Selector} to receive data over the network, this method - * could set a flag that will force {@link #poll()} to exit immediately and invoke - * {@link java.nio.channels.Selector#wakeup()} to interrupt any ongoing requests. - */ - public abstract void stop(); -} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceTaskContext.java ---------------------------------------------------------------------- diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceTaskContext.java b/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceTaskContext.java deleted file mode 100644 index bc18c30..0000000 --- a/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceTaskContext.java +++ /dev/null @@ -1,32 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ -package org.apache.kafka.copycat.source; - -import org.apache.kafka.common.annotation.InterfaceStability; -import org.apache.kafka.copycat.storage.OffsetStorageReader; - -/** - * SourceTaskContext is provided to SourceTasks to allow them to interact with the underlying - * runtime. - */ -@InterfaceStability.Unstable -public interface SourceTaskContext { - /** - * Get the OffsetStorageReader for this SourceTask. - */ - OffsetStorageReader offsetStorageReader(); -} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/api/src/main/java/org/apache/kafka/copycat/storage/Converter.java ---------------------------------------------------------------------- diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/storage/Converter.java b/copycat/api/src/main/java/org/apache/kafka/copycat/storage/Converter.java deleted file mode 100644 index d51b789..0000000 --- a/copycat/api/src/main/java/org/apache/kafka/copycat/storage/Converter.java +++ /dev/null @@ -1,57 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ - -package org.apache.kafka.copycat.storage; - -import org.apache.kafka.common.annotation.InterfaceStability; -import org.apache.kafka.copycat.data.Schema; -import org.apache.kafka.copycat.data.SchemaAndValue; - -import java.util.Map; - -/** - * The Converter interface provides support for translating between Copycat's runtime data format - * and byte[]. Internally, this likely includes an intermediate step to the format used by the serialization - * layer (e.g. JsonNode, GenericRecord, Message). - */ -@InterfaceStability.Unstable -public interface Converter { - - /** - * Configure this class. - * @param configs configs in key/value pairs - * @param isKey whether is for key or value - */ - void configure(Map<String, ?> configs, boolean isKey); - - /** - * Convert a Copycat data object to a native object for serialization. - * @param topic the topic associated with the data - * @param schema the schema for the value - * @param value the value to convert - * @return - */ - byte[] fromCopycatData(String topic, Schema schema, Object value); - - /** - * Convert a native object to a Copycat data object. - * @param topic the topic associated with the data - * @param value the value to convert - * @return an object containing the {@link Schema} and the converted value - */ - SchemaAndValue toCopycatData(String topic, byte[] value); -} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/api/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageReader.java ---------------------------------------------------------------------- diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageReader.java b/copycat/api/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageReader.java deleted file mode 100644 index 95d2c04..0000000 --- a/copycat/api/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageReader.java +++ /dev/null @@ -1,65 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ - -package org.apache.kafka.copycat.storage; - -import org.apache.kafka.common.annotation.InterfaceStability; - -import java.util.Collection; -import java.util.Map; - -/** - * <p> - * OffsetStorageReader provides access to the offset storage used by sources. This can be used by - * connectors to determine offsets to start consuming data from. This is most commonly used during - * initialization of a task, but can also be used during runtime, e.g. when reconfiguring a task. - * </p> - * <p> - * Offsets are always defined as Maps of Strings to primitive types, i.e. all types supported by - * {@link org.apache.kafka.copycat.data.Schema} other than Array, Map, and Struct. - * </p> - */ -@InterfaceStability.Unstable -public interface OffsetStorageReader { - /** - * Get the offset for the specified partition. If the data isn't already available locally, this - * gets it from the backing store, which may require some network round trips. - * - * @param partition object uniquely identifying the partition of data - * @return object uniquely identifying the offset in the partition of data - */ - <T> Map<String, Object> offset(Map<String, T> partition); - - /** - * <p> - * Get a set of offsets for the specified partition identifiers. This may be more efficient - * than calling {@link #offset(Map)} repeatedly. - * </p> - * <p> - * Note that when errors occur, this method omits the associated data and tries to return as - * many of the requested values as possible. This allows a task that's managing many partitions to - * still proceed with any available data. Therefore, implementations should take care to check - * that the data is actually available in the returned response. The only case when an - * exception will be thrown is if the entire request failed, e.g. because the underlying - * storage was unavailable. - * </p> - * - * @param partitions set of identifiers for partitions of data - * @return a map of partition identifiers to decoded offsets - */ - <T> Map<Map<String, T>, Map<String, Object>> offsets(Collection<Map<String, T>> partitions); -} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/api/src/main/java/org/apache/kafka/copycat/storage/StringConverter.java ---------------------------------------------------------------------- diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/storage/StringConverter.java b/copycat/api/src/main/java/org/apache/kafka/copycat/storage/StringConverter.java deleted file mode 100644 index 8d708f8..0000000 --- a/copycat/api/src/main/java/org/apache/kafka/copycat/storage/StringConverter.java +++ /dev/null @@ -1,81 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ - -package org.apache.kafka.copycat.storage; - -import org.apache.kafka.common.errors.SerializationException; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.common.serialization.StringSerializer; -import org.apache.kafka.copycat.data.Schema; -import org.apache.kafka.copycat.data.SchemaAndValue; -import org.apache.kafka.copycat.errors.DataException; - -import java.util.HashMap; -import java.util.Map; - -/** - * {@link Converter} implementation that only supports serializing to strings. When converting Copycat data to bytes, - * the schema will be ignored and {@link Object#toString()} will always be invoked to convert the data to a String. - * When converting from bytes to Copycat format, the converter will only ever return an optional string schema and - * a string or null. - * - * Encoding configuration is identical to {@link StringSerializer} and {@link StringDeserializer}, but for convenience - * this class can also be configured to use the same encoding for both encoding and decoding with the converter.encoding - * setting. - */ -public class StringConverter implements Converter { - private final StringSerializer serializer = new StringSerializer(); - private final StringDeserializer deserializer = new StringDeserializer(); - - public StringConverter() { - } - - @Override - public void configure(Map<String, ?> configs, boolean isKey) { - Map<String, Object> serializerConfigs = new HashMap<>(); - serializerConfigs.putAll(configs); - Map<String, Object> deserializerConfigs = new HashMap<>(); - deserializerConfigs.putAll(configs); - - Object encodingValue = configs.get("converter.encoding"); - if (encodingValue != null) { - serializerConfigs.put("serializer.encoding", encodingValue); - deserializerConfigs.put("deserializer.encoding", encodingValue); - } - - serializer.configure(serializerConfigs, isKey); - deserializer.configure(deserializerConfigs, isKey); - } - - @Override - public byte[] fromCopycatData(String topic, Schema schema, Object value) { - try { - return serializer.serialize(topic, value == null ? null : value.toString()); - } catch (SerializationException e) { - throw new DataException("Failed to serialize to a string: ", e); - } - } - - @Override - public SchemaAndValue toCopycatData(String topic, byte[] value) { - try { - return new SchemaAndValue(Schema.OPTIONAL_STRING_SCHEMA, deserializer.deserialize(topic, value)); - } catch (SerializationException e) { - throw new DataException("Failed to deserialize string: ", e); - } - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/api/src/main/java/org/apache/kafka/copycat/util/ConnectorUtils.java ---------------------------------------------------------------------- diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/util/ConnectorUtils.java b/copycat/api/src/main/java/org/apache/kafka/copycat/util/ConnectorUtils.java deleted file mode 100644 index f9dd53a..0000000 --- a/copycat/api/src/main/java/org/apache/kafka/copycat/util/ConnectorUtils.java +++ /dev/null @@ -1,66 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ - -package org.apache.kafka.copycat.util; - -import org.apache.kafka.common.annotation.InterfaceStability; - -import java.util.ArrayList; -import java.util.List; - -/** - * Utilities that connector implementations might find useful. Contains common building blocks - * for writing connectors. - */ -@InterfaceStability.Unstable -public class ConnectorUtils { - /** - * Given a list of elements and a target number of groups, generates list of groups of - * elements to match the target number of groups, spreading them evenly among the groups. - * This generates groups with contiguous elements, which results in intuitive ordering if - * your elements are also ordered (e.g. alphabetical lists of table names if you sort - * table names alphabetically to generate the raw partitions) or can result in efficient - * partitioning if elements are sorted according to some criteria that affects performance - * (e.g. topic partitions with the same leader). - * - * @param elements list of elements to partition - * @param numGroups the number of output groups to generate. - */ - public static <T> List<List<T>> groupPartitions(List<T> elements, int numGroups) { - if (numGroups <= 0) - throw new IllegalArgumentException("Number of groups must be positive."); - - List<List<T>> result = new ArrayList<>(numGroups); - - // Each group has either n+1 or n raw partitions - int perGroup = elements.size() / numGroups; - int leftover = elements.size() - (numGroups * perGroup); - - int assigned = 0; - for (int group = 0; group < numGroups; group++) { - int numThisGroup = group < leftover ? perGroup + 1 : perGroup; - List<T> groupList = new ArrayList<>(numThisGroup); - for (int i = 0; i < numThisGroup; i++) { - groupList.add(elements.get(assigned)); - assigned++; - } - result.add(groupList); - } - - return result; - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/api/src/test/java/org/apache/kafka/copycat/connector/ConnectorReconfigurationTest.java ---------------------------------------------------------------------- diff --git a/copycat/api/src/test/java/org/apache/kafka/copycat/connector/ConnectorReconfigurationTest.java b/copycat/api/src/test/java/org/apache/kafka/copycat/connector/ConnectorReconfigurationTest.java deleted file mode 100644 index 7b1e9eb..0000000 --- a/copycat/api/src/test/java/org/apache/kafka/copycat/connector/ConnectorReconfigurationTest.java +++ /dev/null @@ -1,82 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ - -package org.apache.kafka.copycat.connector; - -import org.apache.kafka.copycat.errors.CopycatException; -import org.junit.Test; - -import java.util.Collections; -import java.util.List; -import java.util.Map; - -import static org.junit.Assert.assertEquals; - -public class ConnectorReconfigurationTest { - - @Test - public void testDefaultReconfigure() throws Exception { - TestConnector conn = new TestConnector(false); - conn.reconfigure(Collections.<String, String>emptyMap()); - assertEquals(conn.stopOrder, 0); - assertEquals(conn.configureOrder, 1); - } - - @Test(expected = CopycatException.class) - public void testReconfigureStopException() throws Exception { - TestConnector conn = new TestConnector(true); - conn.reconfigure(Collections.<String, String>emptyMap()); - } - - private static class TestConnector extends Connector { - private boolean stopException; - private int order = 0; - public int stopOrder = -1; - public int configureOrder = -1; - - public TestConnector(boolean stopException) { - this.stopException = stopException; - } - - @Override - public String version() { - return "1.0"; - } - - @Override - public void start(Map<String, String> props) { - configureOrder = order++; - } - - @Override - public Class<? extends Task> taskClass() { - return null; - } - - @Override - public List<Map<String, String>> taskConfigs(int count) { - return null; - } - - @Override - public void stop() { - stopOrder = order++; - if (stopException) - throw new CopycatException("error"); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/api/src/test/java/org/apache/kafka/copycat/data/CopycatSchemaTest.java ---------------------------------------------------------------------- diff --git a/copycat/api/src/test/java/org/apache/kafka/copycat/data/CopycatSchemaTest.java b/copycat/api/src/test/java/org/apache/kafka/copycat/data/CopycatSchemaTest.java deleted file mode 100644 index 4976950..0000000 --- a/copycat/api/src/test/java/org/apache/kafka/copycat/data/CopycatSchemaTest.java +++ /dev/null @@ -1,303 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ - -package org.apache.kafka.copycat.data; - -import org.apache.kafka.copycat.errors.DataException; -import org.junit.Test; - -import java.math.BigDecimal; -import java.math.BigInteger; -import java.nio.ByteBuffer; -import java.nio.CharBuffer; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotEquals; -import static org.junit.Assert.assertNull; - -public class CopycatSchemaTest { - private static final Schema MAP_INT_STRING_SCHEMA = SchemaBuilder.map(Schema.INT32_SCHEMA, Schema.STRING_SCHEMA).build(); - private static final Schema FLAT_STRUCT_SCHEMA = SchemaBuilder.struct() - .field("field", Schema.INT32_SCHEMA) - .build(); - private static final Schema STRUCT_SCHEMA = SchemaBuilder.struct() - .field("first", Schema.INT32_SCHEMA) - .field("second", Schema.STRING_SCHEMA) - .field("array", SchemaBuilder.array(Schema.INT32_SCHEMA).build()) - .field("map", SchemaBuilder.map(Schema.INT32_SCHEMA, Schema.STRING_SCHEMA).build()) - .field("nested", FLAT_STRUCT_SCHEMA) - .build(); - private static final Schema PARENT_STRUCT_SCHEMA = SchemaBuilder.struct() - .field("nested", FLAT_STRUCT_SCHEMA) - .build(); - - @Test - public void testFieldsOnStructSchema() { - Schema schema = SchemaBuilder.struct() - .field("foo", Schema.BOOLEAN_SCHEMA) - .field("bar", Schema.INT32_SCHEMA) - .build(); - - assertEquals(2, schema.fields().size()); - // Validate field lookup by name - Field foo = schema.field("foo"); - assertEquals(0, foo.index()); - Field bar = schema.field("bar"); - assertEquals(1, bar.index()); - // Any other field name should fail - assertNull(schema.field("other")); - } - - - @Test(expected = DataException.class) - public void testFieldsOnlyValidForStructs() { - Schema.INT8_SCHEMA.fields(); - } - - @Test - public void testValidateValueMatchingType() { - CopycatSchema.validateValue(Schema.INT8_SCHEMA, (byte) 1); - CopycatSchema.validateValue(Schema.INT16_SCHEMA, (short) 1); - CopycatSchema.validateValue(Schema.INT32_SCHEMA, 1); - CopycatSchema.validateValue(Schema.INT64_SCHEMA, (long) 1); - CopycatSchema.validateValue(Schema.FLOAT32_SCHEMA, 1.f); - CopycatSchema.validateValue(Schema.FLOAT64_SCHEMA, 1.); - CopycatSchema.validateValue(Schema.BOOLEAN_SCHEMA, true); - CopycatSchema.validateValue(Schema.STRING_SCHEMA, "a string"); - CopycatSchema.validateValue(Schema.BYTES_SCHEMA, "a byte array".getBytes()); - CopycatSchema.validateValue(Schema.BYTES_SCHEMA, ByteBuffer.wrap("a byte array".getBytes())); - CopycatSchema.validateValue(SchemaBuilder.array(Schema.INT32_SCHEMA).build(), Arrays.asList(1, 2, 3)); - CopycatSchema.validateValue( - SchemaBuilder.map(Schema.INT32_SCHEMA, Schema.STRING_SCHEMA).build(), - Collections.singletonMap(1, "value") - ); - // Struct tests the basic struct layout + complex field types + nested structs - Struct structValue = new Struct(STRUCT_SCHEMA) - .put("first", 1) - .put("second", "foo") - .put("array", Arrays.asList(1, 2, 3)) - .put("map", Collections.singletonMap(1, "value")) - .put("nested", new Struct(FLAT_STRUCT_SCHEMA).put("field", 12)); - CopycatSchema.validateValue(STRUCT_SCHEMA, structValue); - } - - @Test - public void testValidateValueMatchingLogicalType() { - CopycatSchema.validateValue(Decimal.schema(2), new BigDecimal(new BigInteger("156"), 2)); - CopycatSchema.validateValue(Date.SCHEMA, new java.util.Date(0)); - CopycatSchema.validateValue(Time.SCHEMA, new java.util.Date(0)); - CopycatSchema.validateValue(Timestamp.SCHEMA, new java.util.Date(0)); - } - - // To avoid requiring excessive numbers of tests, these checks for invalid types use a similar type where possible - // to only include a single test for each type - - @Test(expected = DataException.class) - public void testValidateValueMismatchInt8() { - CopycatSchema.validateValue(Schema.INT8_SCHEMA, 1); - } - - @Test(expected = DataException.class) - public void testValidateValueMismatchInt16() { - CopycatSchema.validateValue(Schema.INT16_SCHEMA, 1); - } - - @Test(expected = DataException.class) - public void testValidateValueMismatchInt32() { - CopycatSchema.validateValue(Schema.INT32_SCHEMA, (long) 1); - } - - @Test(expected = DataException.class) - public void testValidateValueMismatchInt64() { - CopycatSchema.validateValue(Schema.INT64_SCHEMA, 1); - } - - @Test(expected = DataException.class) - public void testValidateValueMismatchFloat() { - CopycatSchema.validateValue(Schema.FLOAT32_SCHEMA, 1.0); - } - - @Test(expected = DataException.class) - public void testValidateValueMismatchDouble() { - CopycatSchema.validateValue(Schema.FLOAT64_SCHEMA, 1.f); - } - - @Test(expected = DataException.class) - public void testValidateValueMismatchBoolean() { - CopycatSchema.validateValue(Schema.BOOLEAN_SCHEMA, 1.f); - } - - @Test(expected = DataException.class) - public void testValidateValueMismatchString() { - // CharSequence is a similar type (supertype of String), but we restrict to String. - CharBuffer cbuf = CharBuffer.wrap("abc"); - CopycatSchema.validateValue(Schema.STRING_SCHEMA, cbuf); - } - - @Test(expected = DataException.class) - public void testValidateValueMismatchBytes() { - CopycatSchema.validateValue(Schema.BYTES_SCHEMA, new Object[]{1, "foo"}); - } - - @Test(expected = DataException.class) - public void testValidateValueMismatchArray() { - CopycatSchema.validateValue(SchemaBuilder.array(Schema.INT32_SCHEMA).build(), Arrays.asList("a", "b", "c")); - } - - @Test(expected = DataException.class) - public void testValidateValueMismatchArraySomeMatch() { - // Even if some match the right type, this should fail if any mismatch. In this case, type erasure loses - // the fact that the list is actually List<Object>, but we couldn't tell if only checking the first element - CopycatSchema.validateValue(SchemaBuilder.array(Schema.INT32_SCHEMA).build(), Arrays.asList(1, 2, "c")); - } - - @Test(expected = DataException.class) - public void testValidateValueMismatchMapKey() { - CopycatSchema.validateValue(MAP_INT_STRING_SCHEMA, Collections.singletonMap("wrong key type", "value")); - } - - @Test(expected = DataException.class) - public void testValidateValueMismatchMapValue() { - CopycatSchema.validateValue(MAP_INT_STRING_SCHEMA, Collections.singletonMap(1, 2)); - } - - @Test(expected = DataException.class) - public void testValidateValueMismatchMapSomeKeys() { - Map<Object, String> data = new HashMap<>(); - data.put(1, "abc"); - data.put("wrong", "it's as easy as one two three"); - CopycatSchema.validateValue(MAP_INT_STRING_SCHEMA, data); - } - - @Test(expected = DataException.class) - public void testValidateValueMismatchMapSomeValues() { - Map<Integer, Object> data = new HashMap<>(); - data.put(1, "abc"); - data.put(2, "wrong".getBytes()); - CopycatSchema.validateValue(MAP_INT_STRING_SCHEMA, data); - } - - @Test(expected = DataException.class) - public void testValidateValueMismatchStructWrongSchema() { - // Completely mismatching schemas - CopycatSchema.validateValue( - FLAT_STRUCT_SCHEMA, - new Struct(SchemaBuilder.struct().field("x", Schema.INT32_SCHEMA).build()).put("x", 1) - ); - } - - @Test(expected = DataException.class) - public void testValidateValueMismatchStructWrongNestedSchema() { - // Top-level schema matches, but nested does not. - CopycatSchema.validateValue( - PARENT_STRUCT_SCHEMA, - new Struct(PARENT_STRUCT_SCHEMA) - .put("nested", new Struct(SchemaBuilder.struct().field("x", Schema.INT32_SCHEMA).build()).put("x", 1)) - ); - } - - @Test(expected = DataException.class) - public void testValidateValueMismatchDecimal() { - CopycatSchema.validateValue(Decimal.schema(2), new BigInteger("156")); - } - - @Test(expected = DataException.class) - public void testValidateValueMismatchDate() { - CopycatSchema.validateValue(Date.SCHEMA, 1000L); - } - - @Test(expected = DataException.class) - public void testValidateValueMismatchTime() { - CopycatSchema.validateValue(Time.SCHEMA, 1000L); - } - - @Test(expected = DataException.class) - public void testValidateValueMismatchTimestamp() { - CopycatSchema.validateValue(Timestamp.SCHEMA, 1000L); - } - - @Test - public void testPrimitiveEquality() { - // Test that primitive types, which only need to consider all the type & metadata fields, handle equality correctly - CopycatSchema s1 = new CopycatSchema(Schema.Type.INT8, false, null, "name", 2, "doc"); - CopycatSchema s2 = new CopycatSchema(Schema.Type.INT8, false, null, "name", 2, "doc"); - CopycatSchema differentType = new CopycatSchema(Schema.Type.INT16, false, null, "name", 2, "doc"); - CopycatSchema differentOptional = new CopycatSchema(Schema.Type.INT8, true, null, "name", 2, "doc"); - CopycatSchema differentDefault = new CopycatSchema(Schema.Type.INT8, false, true, "name", 2, "doc"); - CopycatSchema differentName = new CopycatSchema(Schema.Type.INT8, false, null, "otherName", 2, "doc"); - CopycatSchema differentVersion = new CopycatSchema(Schema.Type.INT8, false, null, "name", 4, "doc"); - CopycatSchema differentDoc = new CopycatSchema(Schema.Type.INT8, false, null, "name", 2, "other doc"); - CopycatSchema differentParameters = new CopycatSchema(Schema.Type.INT8, false, null, "name", 2, "doc", Collections.singletonMap("param", "value"), null, null, null); - - assertEquals(s1, s2); - assertNotEquals(s1, differentType); - assertNotEquals(s1, differentOptional); - assertNotEquals(s1, differentDefault); - assertNotEquals(s1, differentName); - assertNotEquals(s1, differentVersion); - assertNotEquals(s1, differentDoc); - assertNotEquals(s1, differentParameters); - } - - @Test - public void testArrayEquality() { - // Validate that the value type for the array is tested for equality. This test makes sure the same schema object is - // never reused to ensure we're actually checking equality - CopycatSchema s1 = new CopycatSchema(Schema.Type.ARRAY, false, null, null, null, null, null, null, null, SchemaBuilder.int8().build()); - CopycatSchema s2 = new CopycatSchema(Schema.Type.ARRAY, false, null, null, null, null, null, null, null, SchemaBuilder.int8().build()); - CopycatSchema differentValueSchema = new CopycatSchema(Schema.Type.ARRAY, false, null, null, null, null, null, null, null, SchemaBuilder.int16().build()); - - assertEquals(s1, s2); - assertNotEquals(s1, differentValueSchema); - } - - @Test - public void testMapEquality() { - // Same as testArrayEquality, but for both key and value schemas - CopycatSchema s1 = new CopycatSchema(Schema.Type.MAP, false, null, null, null, null, null, null, SchemaBuilder.int8().build(), SchemaBuilder.int16().build()); - CopycatSchema s2 = new CopycatSchema(Schema.Type.MAP, false, null, null, null, null, null, null, SchemaBuilder.int8().build(), SchemaBuilder.int16().build()); - CopycatSchema differentKeySchema = new CopycatSchema(Schema.Type.MAP, false, null, null, null, null, null, null, SchemaBuilder.string().build(), SchemaBuilder.int16().build()); - CopycatSchema differentValueSchema = new CopycatSchema(Schema.Type.MAP, false, null, null, null, null, null, null, SchemaBuilder.int8().build(), SchemaBuilder.string().build()); - - assertEquals(s1, s2); - assertNotEquals(s1, differentKeySchema); - assertNotEquals(s1, differentValueSchema); - } - - @Test - public void testStructEquality() { - // Same as testArrayEquality, but checks differences in fields. Only does a simple check, relying on tests of - // Field's equals() method to validate all variations in the list of fields will be checked - CopycatSchema s1 = new CopycatSchema(Schema.Type.STRUCT, false, null, null, null, null, null, - Arrays.asList(new Field("field", 0, SchemaBuilder.int8().build()), - new Field("field2", 1, SchemaBuilder.int16().build())), null, null); - CopycatSchema s2 = new CopycatSchema(Schema.Type.STRUCT, false, null, null, null, null, null, - Arrays.asList(new Field("field", 0, SchemaBuilder.int8().build()), - new Field("field2", 1, SchemaBuilder.int16().build())), null, null); - CopycatSchema differentField = new CopycatSchema(Schema.Type.STRUCT, false, null, null, null, null, null, - Arrays.asList(new Field("field", 0, SchemaBuilder.int8().build()), - new Field("different field name", 1, SchemaBuilder.int16().build())), null, null); - - assertEquals(s1, s2); - assertNotEquals(s1, differentField); - } - -} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/api/src/test/java/org/apache/kafka/copycat/data/DateTest.java ---------------------------------------------------------------------- diff --git a/copycat/api/src/test/java/org/apache/kafka/copycat/data/DateTest.java b/copycat/api/src/test/java/org/apache/kafka/copycat/data/DateTest.java deleted file mode 100644 index e7885ab..0000000 --- a/copycat/api/src/test/java/org/apache/kafka/copycat/data/DateTest.java +++ /dev/null @@ -1,78 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ - -package org.apache.kafka.copycat.data; - -import org.apache.kafka.copycat.errors.DataException; -import org.junit.Test; - -import java.util.Calendar; -import java.util.GregorianCalendar; -import java.util.TimeZone; - -import static org.junit.Assert.assertEquals; - -public class DateTest { - private static final GregorianCalendar EPOCH; - private static final GregorianCalendar EPOCH_PLUS_TEN_THOUSAND_DAYS; - private static final GregorianCalendar EPOCH_PLUS_TIME_COMPONENT; - static { - EPOCH = new GregorianCalendar(1970, Calendar.JANUARY, 1, 0, 0, 0); - EPOCH.setTimeZone(TimeZone.getTimeZone("UTC")); - - EPOCH_PLUS_TIME_COMPONENT = new GregorianCalendar(1970, Calendar.JANUARY, 1, 0, 0, 1); - EPOCH_PLUS_TIME_COMPONENT.setTimeZone(TimeZone.getTimeZone("UTC")); - - EPOCH_PLUS_TEN_THOUSAND_DAYS = new GregorianCalendar(1970, Calendar.JANUARY, 1, 0, 0, 0); - EPOCH_PLUS_TEN_THOUSAND_DAYS.setTimeZone(TimeZone.getTimeZone("UTC")); - EPOCH_PLUS_TEN_THOUSAND_DAYS.add(Calendar.DATE, 10000); - } - - @Test - public void testBuilder() { - Schema plain = Date.SCHEMA; - assertEquals(Date.LOGICAL_NAME, plain.name()); - assertEquals(1, (Object) plain.version()); - } - - @Test - public void testFromLogical() { - assertEquals(0, Date.fromLogical(Date.SCHEMA, EPOCH.getTime())); - assertEquals(10000, Date.fromLogical(Date.SCHEMA, EPOCH_PLUS_TEN_THOUSAND_DAYS.getTime())); - } - - @Test(expected = DataException.class) - public void testFromLogicalInvalidSchema() { - Date.fromLogical(Date.builder().name("invalid").build(), EPOCH.getTime()); - } - - @Test(expected = DataException.class) - public void testFromLogicalInvalidHasTimeComponents() { - Date.fromLogical(Date.SCHEMA, EPOCH_PLUS_TIME_COMPONENT.getTime()); - } - - @Test - public void testToLogical() { - assertEquals(EPOCH.getTime(), Date.toLogical(Date.SCHEMA, 0)); - assertEquals(EPOCH_PLUS_TEN_THOUSAND_DAYS.getTime(), Date.toLogical(Date.SCHEMA, 10000)); - } - - @Test(expected = DataException.class) - public void testToLogicalInvalidSchema() { - Date.toLogical(Date.builder().name("invalid").build(), 0); - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/api/src/test/java/org/apache/kafka/copycat/data/DecimalTest.java ---------------------------------------------------------------------- diff --git a/copycat/api/src/test/java/org/apache/kafka/copycat/data/DecimalTest.java b/copycat/api/src/test/java/org/apache/kafka/copycat/data/DecimalTest.java deleted file mode 100644 index ce71161..0000000 --- a/copycat/api/src/test/java/org/apache/kafka/copycat/data/DecimalTest.java +++ /dev/null @@ -1,63 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ - -package org.apache.kafka.copycat.data; - -import org.junit.Test; - -import java.math.BigDecimal; -import java.math.BigInteger; -import java.util.Collections; - -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; - -public class DecimalTest { - private static final int TEST_SCALE = 2; - private static final BigDecimal TEST_DECIMAL = new BigDecimal(new BigInteger("156"), TEST_SCALE); - private static final BigDecimal TEST_DECIMAL_NEGATIVE = new BigDecimal(new BigInteger("-156"), TEST_SCALE); - private static final byte[] TEST_BYTES = new byte[]{0, -100}; - private static final byte[] TEST_BYTES_NEGATIVE = new byte[]{-1, 100}; - - @Test - public void testBuilder() { - Schema plain = Decimal.builder(2).build(); - assertEquals(Decimal.LOGICAL_NAME, plain.name()); - assertEquals(Collections.singletonMap(Decimal.SCALE_FIELD, "2"), plain.parameters()); - assertEquals(1, (Object) plain.version()); - } - - @Test - public void testFromLogical() { - Schema schema = Decimal.schema(TEST_SCALE); - byte[] encoded = Decimal.fromLogical(schema, TEST_DECIMAL); - assertArrayEquals(TEST_BYTES, encoded); - - encoded = Decimal.fromLogical(schema, TEST_DECIMAL_NEGATIVE); - assertArrayEquals(TEST_BYTES_NEGATIVE, encoded); - } - - @Test - public void testToLogical() { - Schema schema = Decimal.schema(2); - BigDecimal converted = Decimal.toLogical(schema, TEST_BYTES); - assertEquals(TEST_DECIMAL, converted); - - converted = Decimal.toLogical(schema, TEST_BYTES_NEGATIVE); - assertEquals(TEST_DECIMAL_NEGATIVE, converted); - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/api/src/test/java/org/apache/kafka/copycat/data/FieldTest.java ---------------------------------------------------------------------- diff --git a/copycat/api/src/test/java/org/apache/kafka/copycat/data/FieldTest.java b/copycat/api/src/test/java/org/apache/kafka/copycat/data/FieldTest.java deleted file mode 100644 index d5458bc..0000000 --- a/copycat/api/src/test/java/org/apache/kafka/copycat/data/FieldTest.java +++ /dev/null @@ -1,40 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ - -package org.apache.kafka.copycat.data; - -import org.junit.Test; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotEquals; - -public class FieldTest { - - @Test - public void testEquality() { - Field field1 = new Field("name", 0, Schema.INT8_SCHEMA); - Field field2 = new Field("name", 0, Schema.INT8_SCHEMA); - Field differentName = new Field("name2", 0, Schema.INT8_SCHEMA); - Field differentIndex = new Field("name", 1, Schema.INT8_SCHEMA); - Field differentSchema = new Field("name", 0, Schema.INT16_SCHEMA); - - assertEquals(field1, field2); - assertNotEquals(field1, differentName); - assertNotEquals(field1, differentIndex); - assertNotEquals(field1, differentSchema); - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/api/src/test/java/org/apache/kafka/copycat/data/SchemaBuilderTest.java ---------------------------------------------------------------------- diff --git a/copycat/api/src/test/java/org/apache/kafka/copycat/data/SchemaBuilderTest.java b/copycat/api/src/test/java/org/apache/kafka/copycat/data/SchemaBuilderTest.java deleted file mode 100644 index 183f5fc..0000000 --- a/copycat/api/src/test/java/org/apache/kafka/copycat/data/SchemaBuilderTest.java +++ /dev/null @@ -1,305 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ - -package org.apache.kafka.copycat.data; - -import org.apache.kafka.copycat.errors.SchemaBuilderException; -import org.junit.Test; - -import java.nio.ByteBuffer; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; - -public class SchemaBuilderTest { - private static final String NAME = "name"; - private static final Integer VERSION = 2; - private static final String DOC = "doc"; - private static final Map<String, String> NO_PARAMS = null; - - @Test - public void testInt8Builder() { - Schema schema = SchemaBuilder.int8().build(); - assertTypeAndDefault(schema, Schema.Type.INT8, false, null); - assertNoMetadata(schema); - - schema = SchemaBuilder.int8().name(NAME).optional().defaultValue((byte) 12) - .version(VERSION).doc(DOC).build(); - assertTypeAndDefault(schema, Schema.Type.INT8, true, (byte) 12); - assertMetadata(schema, NAME, VERSION, DOC, NO_PARAMS); - } - - @Test(expected = SchemaBuilderException.class) - public void testInt8BuilderInvalidDefault() { - SchemaBuilder.int8().defaultValue("invalid"); - } - - @Test - public void testInt16Builder() { - Schema schema = SchemaBuilder.int16().build(); - assertTypeAndDefault(schema, Schema.Type.INT16, false, null); - assertNoMetadata(schema); - - schema = SchemaBuilder.int16().name(NAME).optional().defaultValue((short) 12) - .version(VERSION).doc(DOC).build(); - assertTypeAndDefault(schema, Schema.Type.INT16, true, (short) 12); - assertMetadata(schema, NAME, VERSION, DOC, NO_PARAMS); - } - - @Test(expected = SchemaBuilderException.class) - public void testInt16BuilderInvalidDefault() { - SchemaBuilder.int16().defaultValue("invalid"); - } - - @Test - public void testInt32Builder() { - Schema schema = SchemaBuilder.int32().build(); - assertTypeAndDefault(schema, Schema.Type.INT32, false, null); - assertNoMetadata(schema); - - schema = SchemaBuilder.int32().name(NAME).optional().defaultValue(12) - .version(VERSION).doc(DOC).build(); - assertTypeAndDefault(schema, Schema.Type.INT32, true, 12); - assertMetadata(schema, NAME, VERSION, DOC, NO_PARAMS); - } - - @Test(expected = SchemaBuilderException.class) - public void testInt32BuilderInvalidDefault() { - SchemaBuilder.int32().defaultValue("invalid"); - } - - @Test - public void testInt64Builder() { - Schema schema = SchemaBuilder.int64().build(); - assertTypeAndDefault(schema, Schema.Type.INT64, false, null); - assertNoMetadata(schema); - - schema = SchemaBuilder.int64().name(NAME).optional().defaultValue((long) 12) - .version(VERSION).doc(DOC).build(); - assertTypeAndDefault(schema, Schema.Type.INT64, true, (long) 12); - assertMetadata(schema, NAME, VERSION, DOC, NO_PARAMS); - } - - @Test(expected = SchemaBuilderException.class) - public void testInt64BuilderInvalidDefault() { - SchemaBuilder.int64().defaultValue("invalid"); - } - - @Test - public void testFloatBuilder() { - Schema schema = SchemaBuilder.float32().build(); - assertTypeAndDefault(schema, Schema.Type.FLOAT32, false, null); - assertNoMetadata(schema); - - schema = SchemaBuilder.float32().name(NAME).optional().defaultValue(12.f) - .version(VERSION).doc(DOC).build(); - assertTypeAndDefault(schema, Schema.Type.FLOAT32, true, 12.f); - assertMetadata(schema, NAME, VERSION, DOC, NO_PARAMS); - } - - @Test(expected = SchemaBuilderException.class) - public void testFloatBuilderInvalidDefault() { - SchemaBuilder.float32().defaultValue("invalid"); - } - - @Test - public void testDoubleBuilder() { - Schema schema = SchemaBuilder.float64().build(); - assertTypeAndDefault(schema, Schema.Type.FLOAT64, false, null); - assertNoMetadata(schema); - - schema = SchemaBuilder.float64().name(NAME).optional().defaultValue(12.0) - .version(VERSION).doc(DOC).build(); - assertTypeAndDefault(schema, Schema.Type.FLOAT64, true, 12.0); - assertMetadata(schema, NAME, VERSION, DOC, NO_PARAMS); - } - - @Test(expected = SchemaBuilderException.class) - public void testDoubleBuilderInvalidDefault() { - SchemaBuilder.float64().defaultValue("invalid"); - } - - @Test - public void testBooleanBuilder() { - Schema schema = SchemaBuilder.bool().build(); - assertTypeAndDefault(schema, Schema.Type.BOOLEAN, false, null); - assertNoMetadata(schema); - - schema = SchemaBuilder.bool().name(NAME).optional().defaultValue(true) - .version(VERSION).doc(DOC).build(); - assertTypeAndDefault(schema, Schema.Type.BOOLEAN, true, true); - assertMetadata(schema, NAME, VERSION, DOC, NO_PARAMS); - } - - @Test(expected = SchemaBuilderException.class) - public void testBooleanBuilderInvalidDefault() { - SchemaBuilder.bool().defaultValue("invalid"); - } - - @Test - public void testStringBuilder() { - Schema schema = SchemaBuilder.string().build(); - assertTypeAndDefault(schema, Schema.Type.STRING, false, null); - assertNoMetadata(schema); - - schema = SchemaBuilder.string().name(NAME).optional().defaultValue("a default string") - .version(VERSION).doc(DOC).build(); - assertTypeAndDefault(schema, Schema.Type.STRING, true, "a default string"); - assertMetadata(schema, NAME, VERSION, DOC, NO_PARAMS); - } - - @Test(expected = SchemaBuilderException.class) - public void testStringBuilderInvalidDefault() { - SchemaBuilder.string().defaultValue(true); - } - - @Test - public void testBytesBuilder() { - Schema schema = SchemaBuilder.bytes().build(); - assertTypeAndDefault(schema, Schema.Type.BYTES, false, null); - assertNoMetadata(schema); - - schema = SchemaBuilder.bytes().name(NAME).optional().defaultValue("a default byte array".getBytes()) - .version(VERSION).doc(DOC).build(); - assertTypeAndDefault(schema, Schema.Type.BYTES, true, "a default byte array".getBytes()); - assertMetadata(schema, NAME, VERSION, DOC, NO_PARAMS); - } - - @Test(expected = SchemaBuilderException.class) - public void testBytesBuilderInvalidDefault() { - SchemaBuilder.bytes().defaultValue("a string, not bytes"); - } - - - @Test - public void testParameters() { - Map<String, String> expectedParameters = new HashMap<>(); - expectedParameters.put("foo", "val"); - expectedParameters.put("bar", "baz"); - - Schema schema = SchemaBuilder.string().parameter("foo", "val").parameter("bar", "baz").build(); - assertTypeAndDefault(schema, Schema.Type.STRING, false, null); - assertMetadata(schema, null, null, null, expectedParameters); - - schema = SchemaBuilder.string().parameters(expectedParameters).build(); - assertTypeAndDefault(schema, Schema.Type.STRING, false, null); - assertMetadata(schema, null, null, null, expectedParameters); - } - - - @Test - public void testStructBuilder() { - Schema schema = SchemaBuilder.struct() - .field("field1", Schema.INT8_SCHEMA) - .field("field2", Schema.INT8_SCHEMA) - .build(); - assertTypeAndDefault(schema, Schema.Type.STRUCT, false, null); - assertEquals(2, schema.fields().size()); - assertEquals("field1", schema.fields().get(0).name()); - assertEquals(0, schema.fields().get(0).index()); - assertEquals(Schema.INT8_SCHEMA, schema.fields().get(0).schema()); - assertEquals("field2", schema.fields().get(1).name()); - assertEquals(1, schema.fields().get(1).index()); - assertEquals(Schema.INT8_SCHEMA, schema.fields().get(1).schema()); - assertNoMetadata(schema); - } - - @Test(expected = SchemaBuilderException.class) - public void testNonStructCantHaveFields() { - SchemaBuilder.int8().field("field", SchemaBuilder.int8().build()); - } - - - @Test - public void testArrayBuilder() { - Schema schema = SchemaBuilder.array(Schema.INT8_SCHEMA).build(); - assertTypeAndDefault(schema, Schema.Type.ARRAY, false, null); - assertEquals(schema.valueSchema(), Schema.INT8_SCHEMA); - assertNoMetadata(schema); - - // Default value - List<Byte> defArray = Arrays.asList((byte) 1, (byte) 2); - schema = SchemaBuilder.array(Schema.INT8_SCHEMA).defaultValue(defArray).build(); - assertTypeAndDefault(schema, Schema.Type.ARRAY, false, defArray); - assertEquals(schema.valueSchema(), Schema.INT8_SCHEMA); - assertNoMetadata(schema); - } - - @Test(expected = SchemaBuilderException.class) - public void testArrayBuilderInvalidDefault() { - // Array, but wrong embedded type - SchemaBuilder.array(Schema.INT8_SCHEMA).defaultValue(Arrays.asList("string")).build(); - } - - @Test - public void testMapBuilder() { - Schema schema = SchemaBuilder.map(Schema.INT8_SCHEMA, Schema.INT8_SCHEMA).build(); - assertTypeAndDefault(schema, Schema.Type.MAP, false, null); - assertEquals(schema.keySchema(), Schema.INT8_SCHEMA); - assertEquals(schema.valueSchema(), Schema.INT8_SCHEMA); - assertNoMetadata(schema); - - // Default value - Map<Byte, Byte> defMap = Collections.singletonMap((byte) 5, (byte) 10); - schema = SchemaBuilder.map(Schema.INT8_SCHEMA, Schema.INT8_SCHEMA) - .defaultValue(defMap).build(); - assertTypeAndDefault(schema, Schema.Type.MAP, false, defMap); - assertEquals(schema.keySchema(), Schema.INT8_SCHEMA); - assertEquals(schema.valueSchema(), Schema.INT8_SCHEMA); - assertNoMetadata(schema); - } - - @Test(expected = SchemaBuilderException.class) - public void testMapBuilderInvalidDefault() { - // Map, but wrong embedded type - Map<Byte, String> defMap = Collections.singletonMap((byte) 5, "foo"); - SchemaBuilder.map(Schema.INT8_SCHEMA, Schema.INT8_SCHEMA) - .defaultValue(defMap).build(); - } - - - - private void assertTypeAndDefault(Schema schema, Schema.Type type, boolean optional, Object defaultValue) { - assertEquals(type, schema.type()); - assertEquals(optional, schema.isOptional()); - if (type == Schema.Type.BYTES) { - // byte[] is not comparable, need to wrap to check correctly - if (defaultValue == null) - assertNull(schema.defaultValue()); - else - assertEquals(ByteBuffer.wrap((byte[]) defaultValue), ByteBuffer.wrap((byte[]) schema.defaultValue())); - } else { - assertEquals(defaultValue, schema.defaultValue()); - } - } - - private void assertMetadata(Schema schema, String name, Integer version, String doc, Map<String, String> parameters) { - assertEquals(name, schema.name()); - assertEquals(version, schema.version()); - assertEquals(doc, schema.doc()); - assertEquals(parameters, schema.parameters()); - } - - private void assertNoMetadata(Schema schema) { - assertMetadata(schema, null, null, null, null); - } -}