Repository: cassandra
Updated Branches:
  refs/heads/cassandra-3.0 f4af15463 -> 340df43fb


http://git-wip-us.apache.org/repos/asf/cassandra/blob/340df43f/src/java/org/apache/cassandra/schema/Types.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/Types.java 
b/src/java/org/apache/cassandra/schema/Types.java
index 0a7bb4f..0d6e36d 100644
--- a/src/java/org/apache/cassandra/schema/Types.java
+++ b/src/java/org/apache/cassandra/schema/Types.java
@@ -18,37 +18,61 @@
 package org.apache.cassandra.schema;
 
 import java.nio.ByteBuffer;
-import java.util.Iterator;
-import java.util.Optional;
+import java.util.*;
 
 import javax.annotation.Nullable;
 
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.MapDifference;
+import com.google.common.collect.Maps;
 
+import org.apache.cassandra.cql3.CQL3Type;
+import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.UserType;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.jgrapht.graph.DefaultDirectedGraph;
+import org.jgrapht.graph.DefaultEdge;
+import org.jgrapht.traverse.TopologicalOrderIterator;
 
 import static com.google.common.collect.Iterables.filter;
+import static java.util.stream.Collectors.toList;
+import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
 
 /**
  * An immutable container for a keyspace's UDTs.
  */
 public final class Types implements Iterable<UserType>
 {
-    private final ImmutableMap<ByteBuffer, UserType> types;
+    private static final Types NONE = new Types(ImmutableMap.of());
+
+    private final Map<ByteBuffer, UserType> types;
 
     private Types(Builder builder)
     {
         types = builder.types.build();
     }
 
+    /*
+     * For use in RawBuilder::build only.
+     */
+    private Types(Map<ByteBuffer, UserType> types)
+    {
+        this.types = types;
+    }
+
     public static Builder builder()
     {
         return new Builder();
     }
 
+    public static RawBuilder rawBuilder(String keyspace)
+    {
+        return new RawBuilder(keyspace);
+    }
+
     public static Types none()
     {
-        return builder().build();
+        return NONE;
     }
 
     public static Types of(UserType... types)
@@ -106,6 +130,11 @@ public final class Types implements Iterable<UserType>
         return builder().add(filter(this, t -> t != type)).build();
     }
 
+    MapDifference<ByteBuffer, UserType> diff(Types other)
+    {
+        return Maps.difference(types, other.types);
+    }
+
     @Override
     public boolean equals(Object o)
     {
@@ -126,7 +155,7 @@ public final class Types implements Iterable<UserType>
 
     public static final class Builder
     {
-        final ImmutableMap.Builder<ByteBuffer, UserType> types = new 
ImmutableMap.Builder<>();
+        final ImmutableMap.Builder<ByteBuffer, UserType> types = 
ImmutableMap.builder();
 
         private Builder()
         {
@@ -156,4 +185,100 @@ public final class Types implements Iterable<UserType>
             return this;
         }
     }
+
+    public static final class RawBuilder
+    {
+        final String keyspace;
+        final List<RawUDT> definitions;
+
+        private RawBuilder(String keyspace)
+        {
+            this.keyspace = keyspace;
+            this.definitions = new ArrayList<>();
+        }
+
+        /**
+         * Build a Types instance from Raw definitions.
+         *
+         * Constructs a DAG of graph dependencies and resolves them 1 by 1 in 
topological order.
+         */
+        public Types build()
+        {
+            if (definitions.isEmpty())
+                return Types.none();
+
+            /*
+             * build a DAG of UDT dependencies
+             */
+            DefaultDirectedGraph<RawUDT, DefaultEdge> graph = new 
DefaultDirectedGraph<>(DefaultEdge.class);
+
+            definitions.forEach(graph::addVertex);
+
+            for (RawUDT udt1: definitions)
+                for (RawUDT udt2 : definitions)
+                    if (udt1 != udt2 && udt1.referencesUserType(udt2.name))
+                        graph.addEdge(udt2, udt1);
+
+            /*
+             * iterate in topological order,
+             */
+            Types types = new Types(new HashMap<>());
+
+            TopologicalOrderIterator<RawUDT, DefaultEdge> iterator = new 
TopologicalOrderIterator<>(graph);
+            while (iterator.hasNext())
+            {
+                UserType udt = iterator.next().prepare(keyspace, types); // 
will throw InvalidRequestException if meets an unknown type
+                types.types.put(udt.name, udt);
+            }
+
+            /*
+             * return an immutable copy
+             */
+            return Types.builder().add(types).build();
+        }
+
+        void add(String name, List<String> fieldNames, List<String> fieldTypes)
+        {
+            List<CQL3Type.Raw> rawFieldTypes =
+                fieldTypes.stream()
+                          .map(CQLTypeParser::parseRaw)
+                          .collect(toList());
+
+            definitions.add(new RawUDT(name, fieldNames, rawFieldTypes));
+        }
+
+        private static final class RawUDT
+        {
+            final String name;
+            final List<String> fieldNames;
+            final List<CQL3Type.Raw> fieldTypes;
+
+            RawUDT(String name, List<String> fieldNames, List<CQL3Type.Raw> 
fieldTypes)
+            {
+                this.name = name;
+                this.fieldNames = fieldNames;
+                this.fieldTypes = fieldTypes;
+            }
+
+            boolean referencesUserType(String typeName)
+            {
+                return fieldTypes.stream().anyMatch(t -> 
t.referencesUserType(typeName));
+            }
+
+            UserType prepare(String keyspace, Types types)
+            {
+                List<ByteBuffer> preparedFieldNames =
+                    fieldNames.stream()
+                              .map(ByteBufferUtil::bytes)
+                              .collect(toList());
+
+                List<AbstractType<?>> preparedFieldTypes =
+                    fieldTypes.stream()
+                              .map(t -> t.prepare(keyspace, types).getType())
+                              .collect(toList());
+
+                return new UserType(keyspace, bytes(name), preparedFieldNames, 
preparedFieldTypes);
+            }
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/340df43f/src/java/org/apache/cassandra/schema/Views.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/Views.java 
b/src/java/org/apache/cassandra/schema/Views.java
index 5888b9d..b8fdd4b 100644
--- a/src/java/org/apache/cassandra/schema/Views.java
+++ b/src/java/org/apache/cassandra/schema/Views.java
@@ -26,6 +26,8 @@ import javax.annotation.Nullable;
 
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
+import com.google.common.collect.MapDifference;
+import com.google.common.collect.Maps;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ViewDefinition;
@@ -124,6 +126,11 @@ public final class Views implements 
Iterable<ViewDefinition>
         return without(view.viewName).with(view);
     }
 
+    MapDifference<String, ViewDefinition> diff(Views other)
+    {
+        return Maps.difference(views, other.views);
+    }
+
     @Override
     public boolean equals(Object o)
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/340df43f/src/java/org/apache/cassandra/service/MigrationManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/MigrationManager.java 
b/src/java/org/apache/cassandra/service/MigrationManager.java
index de6c7f7..6a21f91 100644
--- a/src/java/org/apache/cassandra/service/MigrationManager.java
+++ b/src/java/org/apache/cassandra/service/MigrationManager.java
@@ -468,20 +468,9 @@ public class MigrationManager
     private static void announce(Mutation schema, boolean announceLocally)
     {
         if (announceLocally)
-        {
-            try
-            {
-                SchemaKeyspace.mergeSchema(Collections.singletonList(schema));
-            }
-            catch (IOException e)
-            {
-                throw new RuntimeException(e);
-            }
-        }
+            SchemaKeyspace.mergeSchema(Collections.singletonList(schema));
         else
-        {
             
FBUtilities.waitOnFuture(announce(Collections.singletonList(schema)));
-        }
     }
 
     private static void pushSchemaMutation(InetAddress endpoint, 
Collection<Mutation> schema)
@@ -497,7 +486,7 @@ public class MigrationManager
     {
         Future<?> f = StageManager.getStage(Stage.MIGRATION).submit(new 
WrappedRunnable()
         {
-            protected void runMayThrow() throws IOException, 
ConfigurationException
+            protected void runMayThrow() throws ConfigurationException
             {
                 SchemaKeyspace.mergeSchemaAndAnnounceVersion(schema);
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/340df43f/src/java/org/apache/cassandra/service/MigrationTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/MigrationTask.java 
b/src/java/org/apache/cassandra/service/MigrationTask.java
index 4e3fac3..8a1b858 100644
--- a/src/java/org/apache/cassandra/service/MigrationTask.java
+++ b/src/java/org/apache/cassandra/service/MigrationTask.java
@@ -74,10 +74,6 @@ class MigrationTask extends WrappedRunnable
                 {
                     
SchemaKeyspace.mergeSchemaAndAnnounceVersion(message.payload);
                 }
-                catch (IOException e)
-                {
-                    logger.error("IOException merging remote schema", e);
-                }
                 catch (ConfigurationException e)
                 {
                     logger.error("Configuration exception merging remote 
schema", e);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/340df43f/test/unit/org/apache/cassandra/config/CFMetaDataTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/config/CFMetaDataTest.java 
b/test/unit/org/apache/cassandra/config/CFMetaDataTest.java
index 3deb5fa..9d91df3 100644
--- a/test/unit/org/apache/cassandra/config/CFMetaDataTest.java
+++ b/test/unit/org/apache/cassandra/config/CFMetaDataTest.java
@@ -21,18 +21,23 @@ package org.apache.cassandra.config;
 import java.util.*;
 
 import org.apache.cassandra.SchemaLoader;
-import org.apache.cassandra.db.*;
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.cql3.UntypedResultSet;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.Mutation;
 import org.apache.cassandra.db.marshal.AsciiType;
 import org.apache.cassandra.db.marshal.Int32Type;
 import org.apache.cassandra.db.marshal.UTF8Type;
-import org.apache.cassandra.db.rows.UnfilteredRowIterators;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.db.rows.UnfilteredRowIterators;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.schema.CompressionParams;
 import org.apache.cassandra.schema.KeyspaceMetadata;
 import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.schema.SchemaKeyspace;
-import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.schema.TableParams;
+import org.apache.cassandra.schema.Types;
 import org.apache.cassandra.thrift.CfDef;
 import org.apache.cassandra.thrift.ColumnDef;
 import org.apache.cassandra.thrift.IndexType;
@@ -154,10 +159,18 @@ public class CFMetaDataTest
         PartitionUpdate cfU = 
rm.getPartitionUpdate(Schema.instance.getId(SchemaKeyspace.NAME, 
SchemaKeyspace.TABLES));
         PartitionUpdate cdU = 
rm.getPartitionUpdate(Schema.instance.getId(SchemaKeyspace.NAME, 
SchemaKeyspace.COLUMNS));
 
-        CFMetaData newCfm = 
SchemaKeyspace.createTableFromTablePartitionAndColumnsPartition(
-                UnfilteredRowIterators.filter(cfU.unfilteredIterator(), 
FBUtilities.nowInSeconds()),
-                UnfilteredRowIterators.filter(cdU.unfilteredIterator(), 
FBUtilities.nowInSeconds())
-        );
-        assert cfm.equals(newCfm) : String.format("%n%s%n!=%n%s", cfm, newCfm);
+        UntypedResultSet.Row tableRow = 
QueryProcessor.resultify(String.format("SELECT * FROM %s.%s", 
SchemaKeyspace.NAME, SchemaKeyspace.TABLES),
+                                                                 
UnfilteredRowIterators.filter(cfU.unfilteredIterator(), 
FBUtilities.nowInSeconds()))
+                                                      .one();
+        TableParams params = SchemaKeyspace.createTableParamsFromRow(tableRow);
+
+        UntypedResultSet columnsRows = 
QueryProcessor.resultify(String.format("SELECT * FROM %s.%s", 
SchemaKeyspace.NAME, SchemaKeyspace.COLUMNS),
+                                                                
UnfilteredRowIterators.filter(cdU.unfilteredIterator(), 
FBUtilities.nowInSeconds()));
+        Set<ColumnDefinition> columns = new HashSet<>();
+        for (UntypedResultSet.Row row : columnsRows)
+            columns.add(SchemaKeyspace.createColumnFromRow(row, Types.none()));
+
+        assertEquals(cfm.params, params);
+        assertEquals(new HashSet<>(cfm.allColumns()), columns);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/340df43f/test/unit/org/apache/cassandra/cql3/validation/entities/TupleTypeTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/cql3/validation/entities/TupleTypeTest.java 
b/test/unit/org/apache/cassandra/cql3/validation/entities/TupleTypeTest.java
index 0e7084f..9e3c51d 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/entities/TupleTypeTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/entities/TupleTypeTest.java
@@ -110,7 +110,7 @@ public class TupleTypeTest extends CQLTester
             row(0, 4, tuple(null, "1"))
         );
 
-        assertInvalidMessage("Invalid tuple literal: too many elements. Type 
tuple<int, text> expects 2 but got 3",
+        assertInvalidMessage("Invalid tuple literal: too many elements. Type 
frozen<tuple<int, text>> expects 2 but got 3",
                              "INSERT INTO %s(k, t) VALUES (1,'1:2:3')");
     }
 
@@ -121,7 +121,7 @@ public class TupleTypeTest extends CQLTester
 
         assertInvalidSyntax("INSERT INTO %s (k, t) VALUES (0, ())");
 
-        assertInvalidMessage("Invalid tuple literal for t: too many elements. 
Type tuple<int, text, double> expects 3 but got 4",
+        assertInvalidMessage("Invalid tuple literal for t: too many elements. 
Type frozen<tuple<int, text, double>> expects 3 but got 4",
                              "INSERT INTO %s (k, t) VALUES (0, (2, 'foo', 3.1, 
'bar'))");
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/340df43f/test/unit/org/apache/cassandra/cql3/validation/entities/UFTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/cql3/validation/entities/UFTest.java 
b/test/unit/org/apache/cassandra/cql3/validation/entities/UFTest.java
index bf3d33f..78e85dc 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/entities/UFTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/entities/UFTest.java
@@ -797,7 +797,7 @@ public class UFTest extends CQLTester
     @Test
     public void testFunctionNonExistingKeyspace() throws Throwable
     {
-        assertInvalidMessage("to non existing keyspace",
+        assertInvalidMessage("Keyspace this_ks_does_not_exist doesn't exist",
                              "CREATE OR REPLACE FUNCTION 
this_ks_does_not_exist.jnft(val double) " +
                              "RETURNS NULL ON NULL INPUT " +
                              "RETURNS double " +
@@ -810,7 +810,7 @@ public class UFTest extends CQLTester
     {
         dropPerTestKeyspace();
 
-        assertInvalidMessage("to non existing keyspace",
+        assertInvalidMessage("Keyspace " + KEYSPACE_PER_TEST + " doesn't 
exist",
                              "CREATE OR REPLACE FUNCTION " + KEYSPACE_PER_TEST 
+ ".jnft(val double) " +
                              "RETURNS NULL ON NULL INPUT " +
                              "RETURNS double " +

http://git-wip-us.apache.org/repos/asf/cassandra/blob/340df43f/test/unit/org/apache/cassandra/schema/LegacySchemaMigratorTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/schema/LegacySchemaMigratorTest.java 
b/test/unit/org/apache/cassandra/schema/LegacySchemaMigratorTest.java
index d069d56..bc382fb 100644
--- a/test/unit/org/apache/cassandra/schema/LegacySchemaMigratorTest.java
+++ b/test/unit/org/apache/cassandra/schema/LegacySchemaMigratorTest.java
@@ -63,8 +63,7 @@ public class LegacySchemaMigratorTest
     {
         CQLTester.cleanupAndLeaveDirs();
 
-        List<KeyspaceMetadata> expected = keyspaceToMigrate();
-        expected.sort((k1, k2) -> k1.name.compareTo(k2.name));
+        Keyspaces expected = keyspacesToMigrate();
 
         // write the keyspaces into the legacy tables
         expected.forEach(LegacySchemaMigratorTest::legacySerializeKeyspace);
@@ -73,8 +72,7 @@ public class LegacySchemaMigratorTest
         LegacySchemaMigrator.migrate();
 
         // read back all the metadata from the new schema tables
-        List<KeyspaceMetadata> actual = 
SchemaKeyspace.readSchemaFromSystemTables();
-        actual.sort((k1, k2) -> k1.name.compareTo(k2.name));
+        Keyspaces actual = SchemaKeyspace.fetchNonSystemKeyspaces();
 
         // need to load back CFMetaData of those tables (CFS instances will 
still be loaded)
         loadLegacySchemaTables();
@@ -104,9 +102,9 @@ public class LegacySchemaMigratorTest
         
Schema.instance.setKeyspaceMetadata(systemKeyspace.withSwapped(systemTables));
     }
 
-    private static List<KeyspaceMetadata> keyspaceToMigrate()
+    private static Keyspaces keyspacesToMigrate()
     {
-        List<KeyspaceMetadata> keyspaces = new ArrayList<>();
+        Keyspaces.Builder keyspaces = Keyspaces.builder();
 
         // A whole bucket of shorthand
         String ks1 = KEYSPACE_PREFIX + "Keyspace1";
@@ -255,7 +253,7 @@ public class LegacySchemaMigratorTest
         keyspaces.add(keyspaceWithUDAs());
         keyspaces.add(keyspaceWithUDAsAndUDTs());
 
-        return keyspaces;
+        return keyspaces.build();
     }
 
     private static KeyspaceMetadata keyspaceWithDroppedCollections()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/340df43f/test/unit/org/apache/cassandra/schema/SchemaKeyspaceTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/schema/SchemaKeyspaceTest.java 
b/test/unit/org/apache/cassandra/schema/SchemaKeyspaceTest.java
index 3eb4faf..a1b7ad3 100644
--- a/test/unit/org/apache/cassandra/schema/SchemaKeyspaceTest.java
+++ b/test/unit/org/apache/cassandra/schema/SchemaKeyspaceTest.java
@@ -24,6 +24,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 
 import com.google.common.collect.ImmutableMap;
 
@@ -32,11 +33,13 @@ import org.junit.Test;
 
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.cql3.UntypedResultSet;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.Mutation;
-import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.db.marshal.AsciiType;
 import org.apache.cassandra.db.marshal.UTF8Type;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
@@ -158,14 +161,14 @@ public class SchemaKeyspaceTest
         assertEquals(extensions, metadata.params.extensions);
     }
 
-    private static void updateTable(String keyspace, CFMetaData oldTable, 
CFMetaData newTable) throws IOException
+    private static void updateTable(String keyspace, CFMetaData oldTable, 
CFMetaData newTable)
     {
         KeyspaceMetadata ksm = 
Schema.instance.getKeyspaceInstance(keyspace).getMetadata();
         Mutation mutation = SchemaKeyspace.makeUpdateTableMutation(ksm, 
oldTable, newTable, FBUtilities.timestampMicros(), false);
         SchemaKeyspace.mergeSchema(Collections.singleton(mutation));
     }
 
-    private static void createTable(String keyspace, String cql) throws 
IOException
+    private static void createTable(String keyspace, String cql)
     {
         CFMetaData table = CFMetaData.compile(cql, keyspace);
 
@@ -185,10 +188,21 @@ public class SchemaKeyspaceTest
 
         // Test schema conversion
         Mutation rm = SchemaKeyspace.makeCreateTableMutation(keyspace, cfm, 
FBUtilities.timestampMicros());
-        PartitionUpdate serializedCf = 
rm.getPartitionUpdate(Schema.instance.getId(SystemKeyspace.NAME, 
SchemaKeyspace.TABLES));
-        PartitionUpdate serializedCD = 
rm.getPartitionUpdate(Schema.instance.getId(SystemKeyspace.NAME, 
SchemaKeyspace.COLUMNS));
-        CFMetaData newCfm = 
SchemaKeyspace.createTableFromTablePartitionAndColumnsPartition(UnfilteredRowIterators.filter(serializedCf.unfilteredIterator(),
 FBUtilities.nowInSeconds()),
-                                                                               
             UnfilteredRowIterators.filter(serializedCD.unfilteredIterator(), 
FBUtilities.nowInSeconds()));
-        assert cfm.equals(newCfm) : String.format("%n%s%n!=%n%s", cfm, newCfm);
+        PartitionUpdate serializedCf = 
rm.getPartitionUpdate(Schema.instance.getId(SchemaKeyspace.NAME, 
SchemaKeyspace.TABLES));
+        PartitionUpdate serializedCD = 
rm.getPartitionUpdate(Schema.instance.getId(SchemaKeyspace.NAME, 
SchemaKeyspace.COLUMNS));
+
+        UntypedResultSet.Row tableRow = 
QueryProcessor.resultify(String.format("SELECT * FROM %s.%s", 
SchemaKeyspace.NAME, SchemaKeyspace.TABLES),
+                                                                 
UnfilteredRowIterators.filter(serializedCf.unfilteredIterator(), 
FBUtilities.nowInSeconds()))
+                                                      .one();
+        TableParams params = SchemaKeyspace.createTableParamsFromRow(tableRow);
+
+        UntypedResultSet columnsRows = 
QueryProcessor.resultify(String.format("SELECT * FROM %s.%s", 
SchemaKeyspace.NAME, SchemaKeyspace.COLUMNS),
+                                                                
UnfilteredRowIterators.filter(serializedCD.unfilteredIterator(), 
FBUtilities.nowInSeconds()));
+        Set<ColumnDefinition> columns = new HashSet<>();
+        for (UntypedResultSet.Row row : columnsRows)
+            columns.add(SchemaKeyspace.createColumnFromRow(row, Types.none()));
+
+        assertEquals(cfm.params, params);
+        assertEquals(new HashSet<>(cfm.allColumns()), columns);
     }
 }

Reply via email to