Repository: incubator-unomi
Updated Branches:
  refs/heads/master 9db8296e7 -> 5166fc23e


UNOMI-102 : Add Camel config for export features


Project: http://git-wip-us.apache.org/repos/asf/incubator-unomi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-unomi/commit/b7194f47
Tree: http://git-wip-us.apache.org/repos/asf/incubator-unomi/tree/b7194f47
Diff: http://git-wip-us.apache.org/repos/asf/incubator-unomi/diff/b7194f47

Branch: refs/heads/master
Commit: b7194f470d4ec728f0205b3386571f21d15f2bda
Parents: df59f57
Author: Abdelkader Midani <amid...@apache.org>
Authored: Thu Jun 29 03:09:55 2017 +0200
Committer: Abdelkader Midani <amid...@apache.org>
Committed: Thu Jun 29 03:09:55 2017 +0200

----------------------------------------------------------------------
 .../unomi/router/api/RouterConstants.java       |   9 +-
 .../router/core/bean/CollectProfileBean.java    |  38 +++++++
 .../router/core/context/RouterCamelContext.java |  33 ++++--
 .../core/processor/ConfigUpdateProcessor.java   |   1 -
 .../ExportRouteCompletionProcessor.java         |  76 ++++++++++++++
 .../ImportRouteCompletionProcessor.java         | 105 +++++++++++++++++++
 .../core/processor/LineBuildProcessor.java      |  54 ++++++++++
 .../processor/RouteCompletionProcessor.java     | 105 -------------------
 .../core/route/ConfigUpdateRouteBuilder.java    |   1 -
 .../route/ProfileExportCollectRouteBuilder.java |  53 +++++++---
 .../ProfileExportProducerRouteBuilder.java      |  69 ++++++++++++
 .../ProfileImportAbstractRouteBuilder.java      |  83 ---------------
 .../ProfileImportFromSourceRouteBuilder.java    |  18 ++--
 .../route/ProfileImportOneShotRouteBuilder.java |  26 ++---
 .../route/ProfileImportToUnomiRouteBuilder.java |  21 ++--
 .../core/route/RouterAbstractRouteBuilder.java  |  98 +++++++++++++++++
 .../strategy/ArrayListAggregationStrategy.java  |   2 -
 .../StringLinesAggregationStrategy.java         |  41 ++++++++
 .../resources/OSGI-INF/blueprint/blueprint.xml  |  43 +++++---
 .../main/resources/org.apache.unomi.router.cfg  |  17 ++-
 20 files changed, 608 insertions(+), 285 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/b7194f47/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/RouterConstants.java
----------------------------------------------------------------------
diff --git 
a/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/RouterConstants.java
 
b/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/RouterConstants.java
index 45de3d6..7f45228 100644
--- 
a/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/RouterConstants.java
+++ 
b/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/RouterConstants.java
@@ -32,13 +32,20 @@ public interface RouterConstants {
     String IMPORT_EXPORT_CONFIG_TYPE_RECURRENT = "recurrent";
     String IMPORT_EXPORT_CONFIG_TYPE_ONESHOT = "oneshot";
 
-    String DIRECT_DEPOSIT_BUFFER = "direct:depositBuffer";
+    String DIRECT_IMPORT_DEPOSIT_BUFFER = "direct:depositImportBuffer";
+    String DIRECT_EXPORT_DEPOSIT_BUFFER = "direct:depositExportBuffer";
 
     String DIRECTION_FROM = "from";
     String DIRECTION_TO = "to";
 
     String HEADER_CONFIG_TYPE = "configType";
 
+    String HEADER_EXPORT_CONFIG = "exportConfig";
     String HEADER_FAILED_MESSAGE = "failedMessage";
     String HEADER_IMPORT_CONFIG_ONESHOT = "importConfigOneShot";
+
+    String IMPORT_ONESHOT_ROUTE_ID = "ONE_SHOT_ROUTE";
+    String DEFAULT_FILE_COLUMN_SEPARATOR = ",";
+
+    String DEFAULT_FILE_LINE_SEPARATOR = "\n";
 }

http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/b7194f47/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/bean/CollectProfileBean.java
----------------------------------------------------------------------
diff --git 
a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/bean/CollectProfileBean.java
 
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/bean/CollectProfileBean.java
new file mode 100644
index 0000000..4525019
--- /dev/null
+++ 
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/bean/CollectProfileBean.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.unomi.router.core.bean;
+
+import org.apache.unomi.api.Profile;
+import org.apache.unomi.persistence.spi.PersistenceService;
+
+import java.util.List;
+
+/**
+ * Created by amidani on 28/06/2017.
+ */
+public class CollectProfileBean {
+
+    private PersistenceService persistenceService;
+
+    public List<Profile> extractProfileBySegment(String segment) {
+        return persistenceService.query("segments", segment,null, 
Profile.class);
+    }
+
+    public void setPersistenceService(PersistenceService persistenceService) {
+        this.persistenceService = persistenceService;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/b7194f47/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/context/RouterCamelContext.java
----------------------------------------------------------------------
diff --git 
a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/context/RouterCamelContext.java
 
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/context/RouterCamelContext.java
index 32ceba8..d6ca24b 100644
--- 
a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/context/RouterCamelContext.java
+++ 
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/context/RouterCamelContext.java
@@ -27,13 +27,11 @@ import org.apache.unomi.router.api.ExportConfiguration;
 import org.apache.unomi.router.api.ImportConfiguration;
 import org.apache.unomi.router.api.RouterConstants;
 import org.apache.unomi.router.api.services.ImportExportConfigurationService;
+import org.apache.unomi.router.core.processor.ExportRouteCompletionProcessor;
 import org.apache.unomi.router.core.processor.ImportConfigByFileNameProcessor;
-import org.apache.unomi.router.core.processor.RouteCompletionProcessor;
+import org.apache.unomi.router.core.processor.ImportRouteCompletionProcessor;
 import org.apache.unomi.router.core.processor.UnomiStorageProcessor;
-import org.apache.unomi.router.core.route.ProfileExportCollectRouteBuilder;
-import org.apache.unomi.router.core.route.ProfileImportFromSourceRouteBuilder;
-import org.apache.unomi.router.core.route.ProfileImportOneShotRouteBuilder;
-import org.apache.unomi.router.core.route.ProfileImportToUnomiRouteBuilder;
+import org.apache.unomi.router.core.route.*;
 import org.osgi.framework.Bundle;
 import org.osgi.framework.BundleContext;
 import org.osgi.framework.BundleEvent;
@@ -53,7 +51,8 @@ public class RouterCamelContext implements 
SynchronousBundleListener {
     private Logger logger = 
LoggerFactory.getLogger(RouterCamelContext.class.getName());
     private CamelContext camelContext;
     private UnomiStorageProcessor unomiStorageProcessor;
-    private RouteCompletionProcessor routeCompletionProcessor;
+    private ImportRouteCompletionProcessor importRouteCompletionProcessor;
+    private ExportRouteCompletionProcessor exportRouteCompletionProcessor;
     private ImportConfigByFileNameProcessor importConfigByFileNameProcessor;
     private ImportExportConfigurationService<ImportConfiguration> 
importConfigurationService;
     private ImportExportConfigurationService<ExportConfiguration> 
exportConfigurationService;
@@ -102,19 +101,26 @@ public class RouterCamelContext implements 
SynchronousBundleListener {
         //Unomi sink route
         ProfileImportToUnomiRouteBuilder builderProcessor = new 
ProfileImportToUnomiRouteBuilder(kafkaProps, configType);
         builderProcessor.setUnomiStorageProcessor(unomiStorageProcessor);
-        builderProcessor.setRouteCompletionProcessor(routeCompletionProcessor);
+        
builderProcessor.setImportRouteCompletionProcessor(importRouteCompletionProcessor);
         builderProcessor.setJacksonDataFormat(jacksonDataFormat);
         builderProcessor.setContext(camelContext);
         camelContext.addRoutes(builderProcessor);
 
         //--EXPORT ROUTES
-        ProfileExportCollectRouteBuilder profileExportCollectRouteBuilder = 
new ProfileExportCollectRouteBuilder();
+        ProfileExportCollectRouteBuilder profileExportCollectRouteBuilder = 
new ProfileExportCollectRouteBuilder(kafkaProps, configType);
         
profileExportCollectRouteBuilder.setExportConfigurationService(exportConfigurationService);
         
profileExportCollectRouteBuilder.setPersistenceService(persistenceService);
         profileExportCollectRouteBuilder.setAllowedEndpoints(allowedEndpoints);
+        
profileExportCollectRouteBuilder.setJacksonDataFormat(jacksonDataFormat);
         profileExportCollectRouteBuilder.setContext(camelContext);
         camelContext.addRoutes(profileExportCollectRouteBuilder);
 
+        ProfileExportProducerRouteBuilder profileExportProducerRouteBuilder = 
new ProfileExportProducerRouteBuilder(kafkaProps, configType);
+        
profileExportProducerRouteBuilder.setExportRouteCompletionProcessor(exportRouteCompletionProcessor);
+        
profileExportProducerRouteBuilder.setAllowedEndpoints(allowedEndpoints);
+        
profileExportProducerRouteBuilder.setJacksonDataFormat(jacksonDataFormat);
+        profileExportProducerRouteBuilder.setContext(camelContext);
+        camelContext.addRoutes(profileExportProducerRouteBuilder);
 
         camelContext.start();
 
@@ -174,10 +180,11 @@ public class RouterCamelContext implements 
SynchronousBundleListener {
         killExistingRoute(exportConfiguration.getItemId());
         //Handle transforming an import config oneshot <--> recurrent
         if 
(RouterConstants.IMPORT_EXPORT_CONFIG_TYPE_RECURRENT.equals(exportConfiguration.getConfigType()))
 {
-            ProfileExportCollectRouteBuilder profileExportCollectRouteBuilder 
= new ProfileExportCollectRouteBuilder();
+            ProfileExportCollectRouteBuilder profileExportCollectRouteBuilder 
= new ProfileExportCollectRouteBuilder(kafkaProps, configType);
             
profileExportCollectRouteBuilder.setExportConfigurationService(exportConfigurationService);
             
profileExportCollectRouteBuilder.setPersistenceService(persistenceService);
             
profileExportCollectRouteBuilder.setAllowedEndpoints(allowedEndpoints);
+            
profileExportCollectRouteBuilder.setJacksonDataFormat(jacksonDataFormat);
             profileExportCollectRouteBuilder.setContext(camelContext);
             camelContext.addRoutes(profileExportCollectRouteBuilder);
         }
@@ -191,8 +198,12 @@ public class RouterCamelContext implements 
SynchronousBundleListener {
         this.unomiStorageProcessor = unomiStorageProcessor;
     }
 
-    public void setRouteCompletionProcessor(RouteCompletionProcessor 
routeCompletionProcessor) {
-        this.routeCompletionProcessor = routeCompletionProcessor;
+    public void 
setImportRouteCompletionProcessor(ImportRouteCompletionProcessor 
importRouteCompletionProcessor) {
+        this.importRouteCompletionProcessor = importRouteCompletionProcessor;
+    }
+
+    public void 
setExportRouteCompletionProcessor(ExportRouteCompletionProcessor 
exportRouteCompletionProcessor) {
+        this.exportRouteCompletionProcessor = exportRouteCompletionProcessor;
     }
 
     public void 
setImportConfigByFileNameProcessor(ImportConfigByFileNameProcessor 
importConfigByFileNameProcessor) {

http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/b7194f47/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ConfigUpdateProcessor.java
----------------------------------------------------------------------
diff --git 
a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ConfigUpdateProcessor.java
 
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ConfigUpdateProcessor.java
index 8e6ab36..76bd8a6 100644
--- 
a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ConfigUpdateProcessor.java
+++ 
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ConfigUpdateProcessor.java
@@ -19,7 +19,6 @@ package org.apache.unomi.router.core.processor;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
 import org.apache.camel.Processor;
-import org.apache.unomi.router.api.ImportConfiguration;
 import org.apache.unomi.router.core.context.RouterCamelContext;
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/b7194f47/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ExportRouteCompletionProcessor.java
----------------------------------------------------------------------
diff --git 
a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ExportRouteCompletionProcessor.java
 
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ExportRouteCompletionProcessor.java
new file mode 100644
index 0000000..1b4d1da
--- /dev/null
+++ 
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ExportRouteCompletionProcessor.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.unomi.router.core.processor;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.unomi.router.api.ExportConfiguration;
+import org.apache.unomi.router.api.RouterConstants;
+import org.apache.unomi.router.api.services.ImportExportConfigurationService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Created by amidani on 29/06/2017.
+ */
+public class ExportRouteCompletionProcessor implements Processor {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(ExportRouteCompletionProcessor.class.getName());
+    private ImportExportConfigurationService<ExportConfiguration> 
exportConfigurationService;
+    private int executionsHistorySize;
+
+    @Override
+    public void process(Exchange exchange) throws Exception {
+        String importConfigId = null;
+        ExportConfiguration exportConfig = (ExportConfiguration) 
exchange.getIn().getHeader(RouterConstants.HEADER_EXPORT_CONFIG);
+
+        Map execution = new HashMap();
+        execution.put("date", ((Date) 
exchange.getProperty("CamelCreatedTimestamp")).getTime());
+        execution.put("extractedProfiles", 
exchange.getProperty("CamelSplitSize"));
+
+        ExportConfiguration exportConfiguration = 
exportConfigurationService.load(exportConfig.getItemId());
+
+        if (exportConfiguration.getExecutions().size() >= 
executionsHistorySize) {
+            int oldestExecIndex = 0;
+            long oldestExecDate = (Long) 
exportConfiguration.getExecutions().get(0).get("date");
+            for (int i = 1; i < exportConfiguration.getExecutions().size(); 
i++) {
+                if ((Long) 
exportConfiguration.getExecutions().get(i).get("date") < oldestExecDate) {
+                    oldestExecDate = (Long) 
exportConfiguration.getExecutions().get(i).get("date");
+                    oldestExecIndex = i;
+                }
+            }
+            exportConfiguration.getExecutions().remove(oldestExecIndex);
+        }
+
+        exportConfiguration.getExecutions().add(execution);
+        exportConfigurationService.save(exportConfiguration);
+
+        logger.info("Processing route {} completed.", 
exchange.getFromRouteId());
+    }
+
+    public void 
setExportConfigurationService(ImportExportConfigurationService<ExportConfiguration>
 exportConfigurationService) {
+        this.exportConfigurationService = exportConfigurationService;
+    }
+
+    public void setExecutionsHistorySize(int executionsHistorySize) {
+        this.executionsHistorySize = executionsHistorySize;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/b7194f47/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ImportRouteCompletionProcessor.java
----------------------------------------------------------------------
diff --git 
a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ImportRouteCompletionProcessor.java
 
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ImportRouteCompletionProcessor.java
new file mode 100644
index 0000000..edb7391
--- /dev/null
+++ 
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ImportRouteCompletionProcessor.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.unomi.router.core.processor;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.unomi.router.api.ImportConfiguration;
+import org.apache.unomi.router.api.ImportLineError;
+import org.apache.unomi.router.api.ProfileToImport;
+import org.apache.unomi.router.api.RouterConstants;
+import org.apache.unomi.router.api.services.ImportExportConfigurationService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+
+/**
+ * Created by amidani on 14/06/2017.
+ */
+public class ImportRouteCompletionProcessor implements Processor {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(ImportRouteCompletionProcessor.class.getName());
+    private ImportExportConfigurationService<ImportConfiguration> 
importConfigurationService;
+    private int executionsHistorySize;
+
+    @Override
+    public void process(Exchange exchange) throws Exception {
+        String importConfigId = null;
+        ImportConfiguration importConfigOneShot = (ImportConfiguration) 
exchange.getIn().getHeader(RouterConstants.HEADER_IMPORT_CONFIG_ONESHOT);
+        if (importConfigOneShot != null) {
+            importConfigId = importConfigOneShot.getItemId();
+        } else {
+            importConfigId = exchange.getFromRouteId();
+        }
+        ImportConfiguration importConfiguration = 
importConfigurationService.load(importConfigId);
+        long successCount = 0;
+        long failureCount = 0;
+        long ignoreCount = 0;
+        List<ImportLineError> errors = new ArrayList<ImportLineError>();
+
+        for (Object line : exchange.getIn().getBody(ArrayList.class)) {
+            if (line instanceof ProfileToImport) {
+                successCount++;
+            } else if (line instanceof ImportLineError) {
+                failureCount++;
+                errors.add(((ImportLineError) line));
+            } else {
+                ignoreCount++;
+            }
+        }
+
+        Map execution = new HashMap();
+        execution.put("date", ((Date) 
exchange.getProperty("CamelCreatedTimestamp")).getTime());
+        execution.put("totalLinesNb", exchange.getProperty("CamelSplitSize"));
+        execution.put("successCount", successCount);
+        execution.put("failureCount", failureCount);
+        execution.put("errors", errors);
+
+        if (importConfiguration.getExecutions().size() >= 
executionsHistorySize) {
+            int oldestExecIndex = 0;
+            long oldestExecDate = (Long) 
importConfiguration.getExecutions().get(0).get("date");
+            for (int i = 1; i < importConfiguration.getExecutions().size(); 
i++) {
+                if ((Long) 
importConfiguration.getExecutions().get(i).get("date") < oldestExecDate) {
+                    oldestExecDate = (Long) 
importConfiguration.getExecutions().get(i).get("date");
+                    oldestExecIndex = i;
+                }
+            }
+            importConfiguration.getExecutions().remove(oldestExecIndex);
+        }
+
+        importConfiguration.getExecutions().add(execution);
+        //Set running to false, route is complete
+        if (failureCount > 0 && successCount > 0) {
+            
importConfiguration.setStatus(RouterConstants.CONFIG_STATUS_COMPLETE_WITH_ERRORS);
+        } else if (failureCount > 0 && successCount == 0) {
+            
importConfiguration.setStatus(RouterConstants.CONFIG_STATUS_COMPLETE_ERRORS);
+        } else if (failureCount == 0 && successCount > 0) {
+            
importConfiguration.setStatus(RouterConstants.CONFIG_STATUS_COMPLETE_SUCCESS);
+        }
+        importConfigurationService.save(importConfiguration);
+        logger.info("Processing route {} completed.", 
exchange.getFromRouteId());
+    }
+
+    public void 
setImportConfigurationService(ImportExportConfigurationService<ImportConfiguration>
 importConfigurationService) {
+        this.importConfigurationService = importConfigurationService;
+    }
+
+    public void setExecutionsHistorySize(int executionsHistorySize) {
+        this.executionsHistorySize = executionsHistorySize;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/b7194f47/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/LineBuildProcessor.java
----------------------------------------------------------------------
diff --git 
a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/LineBuildProcessor.java
 
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/LineBuildProcessor.java
new file mode 100644
index 0000000..6f83741
--- /dev/null
+++ 
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/LineBuildProcessor.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.unomi.router.core.processor;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.unomi.api.Profile;
+import org.apache.unomi.router.api.ExportConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+/**
+ * Created by amidani on 28/06/2017.
+ */
+public class LineBuildProcessor implements Processor {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(LineBuildProcessor.class);
+
+    @Override
+    public void process(Exchange exchange) throws Exception {
+        ExportConfiguration exportConfiguration = (ExportConfiguration) 
exchange.getIn().getHeader("exportConfig");
+        exchange.getIn().setHeader("destination", 
exportConfiguration.getProperty("destination"));
+        Profile profile = exchange.getIn().getBody(Profile.class);
+
+        Map<String, String> mapping = (Map<String, String>) 
exportConfiguration.getProperty("mapping");
+        String lineToWrite = "";
+        for (int i = 0; i < mapping.size(); i++) {
+            String propertyName = mapping.get(String.valueOf(i));
+            lineToWrite += profile.getProperty(propertyName) != null ? 
profile.getProperty(propertyName) : "";
+            if (i + 1 < mapping.size()) {
+                lineToWrite += exportConfiguration.getColumnSeparator();
+            }
+        }
+
+        exchange.getIn().setBody(lineToWrite, String.class);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/b7194f47/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/RouteCompletionProcessor.java
----------------------------------------------------------------------
diff --git 
a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/RouteCompletionProcessor.java
 
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/RouteCompletionProcessor.java
deleted file mode 100644
index b522426..0000000
--- 
a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/RouteCompletionProcessor.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.unomi.router.core.processor;
-
-import org.apache.camel.Exchange;
-import org.apache.camel.Processor;
-import org.apache.unomi.router.api.ImportConfiguration;
-import org.apache.unomi.router.api.ImportLineError;
-import org.apache.unomi.router.api.ProfileToImport;
-import org.apache.unomi.router.api.services.ImportExportConfigurationService;
-import org.apache.unomi.router.api.RouterConstants;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.*;
-
-/**
- * Created by amidani on 14/06/2017.
- */
-public class RouteCompletionProcessor implements Processor {
-
-    private static final Logger logger = 
LoggerFactory.getLogger(RouteCompletionProcessor.class.getName());
-    private ImportExportConfigurationService<ImportConfiguration> 
importConfigurationService;
-    private int executionsHistorySize;
-
-    @Override
-    public void process(Exchange exchange) throws Exception {
-        String importConfigId = null;
-        ImportConfiguration importConfigOneShot = (ImportConfiguration) 
exchange.getIn().getHeader(RouterConstants.HEADER_IMPORT_CONFIG_ONESHOT);
-        if (importConfigOneShot != null) {
-            importConfigId = importConfigOneShot.getItemId();
-        } else {
-            importConfigId = exchange.getFromRouteId();
-        }
-        ImportConfiguration importConfiguration = 
importConfigurationService.load(importConfigId);
-        long successCount = 0;
-        long failureCount = 0;
-        long ignoreCount = 0;
-        List<ImportLineError> errors = new ArrayList<ImportLineError>();
-
-        for (Object line : exchange.getIn().getBody(ArrayList.class)) {
-            if (line instanceof ProfileToImport) {
-                successCount++;
-            } else if (line instanceof ImportLineError) {
-                failureCount++;
-                errors.add(((ImportLineError) line));
-            } else {
-                ignoreCount++;
-            }
-        }
-
-        Map execution = new HashMap();
-        execution.put("date", ((Date) 
exchange.getProperty("CamelCreatedTimestamp")).getTime());
-        execution.put("totalLinesNb", exchange.getProperty("CamelSplitSize"));
-        execution.put("successCount", successCount);
-        execution.put("failureCount", failureCount);
-        execution.put("errors", errors);
-
-        if (importConfiguration.getExecutions().size() >= 
executionsHistorySize) {
-            int oldestExecIndex = 0;
-            long oldestExecDate = (Long) 
importConfiguration.getExecutions().get(0).get("date");
-            for (int i = 1; i < importConfiguration.getExecutions().size(); 
i++) {
-                if ((Long) 
importConfiguration.getExecutions().get(i).get("date") < oldestExecDate) {
-                    oldestExecDate = (Long) 
importConfiguration.getExecutions().get(i).get("date");
-                    oldestExecIndex = i;
-                }
-            }
-            importConfiguration.getExecutions().remove(oldestExecIndex);
-        }
-
-        importConfiguration.getExecutions().add(execution);
-        //Set running to false, route is complete
-        if (failureCount > 0 && successCount > 0) {
-            
importConfiguration.setStatus(RouterConstants.CONFIG_STATUS_COMPLETE_WITH_ERRORS);
-        } else if (failureCount > 0 && successCount == 0) {
-            
importConfiguration.setStatus(RouterConstants.CONFIG_STATUS_COMPLETE_ERRORS);
-        } else if (failureCount == 0 && successCount > 0) {
-            
importConfiguration.setStatus(RouterConstants.CONFIG_STATUS_COMPLETE_SUCCESS);
-        }
-        importConfigurationService.save(importConfiguration);
-        logger.info("Processing route {} completed.", 
exchange.getFromRouteId());
-    }
-
-    public void 
setImportConfigurationService(ImportExportConfigurationService<ImportConfiguration>
 importConfigurationService) {
-        this.importConfigurationService = importConfigurationService;
-    }
-
-    public void setExecutionsHistorySize(int executionsHistorySize) {
-        this.executionsHistorySize = executionsHistorySize;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/b7194f47/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ConfigUpdateRouteBuilder.java
----------------------------------------------------------------------
diff --git 
a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ConfigUpdateRouteBuilder.java
 
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ConfigUpdateRouteBuilder.java
index dd70033..885713a 100644
--- 
a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ConfigUpdateRouteBuilder.java
+++ 
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ConfigUpdateRouteBuilder.java
@@ -18,7 +18,6 @@ package org.apache.unomi.router.core.route;
 
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.model.rest.RestBindingMode;
-import org.apache.unomi.api.services.ConfigSharingService;
 import org.apache.unomi.router.api.ExportConfiguration;
 import org.apache.unomi.router.api.ImportConfiguration;
 import org.apache.unomi.router.core.context.RouterCamelContext;

http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/b7194f47/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileExportCollectRouteBuilder.java
----------------------------------------------------------------------
diff --git 
a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileExportCollectRouteBuilder.java
 
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileExportCollectRouteBuilder.java
index 5c3015e..b67859a 100644
--- 
a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileExportCollectRouteBuilder.java
+++ 
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileExportCollectRouteBuilder.java
@@ -16,21 +16,25 @@
  */
 package org.apache.unomi.router.core.route;
 
-import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.LoggingLevel;
+import org.apache.camel.component.kafka.KafkaEndpoint;
+import org.apache.camel.model.ProcessorDefinition;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.unomi.api.Profile;
 import org.apache.unomi.persistence.spi.PersistenceService;
 import org.apache.unomi.router.api.ExportConfiguration;
+import org.apache.unomi.router.api.RouterConstants;
 import org.apache.unomi.router.api.services.ImportExportConfigurationService;
+import org.apache.unomi.router.core.bean.CollectProfileBean;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.List;
+import java.util.Map;
 
 /**
  * Created by amidani on 27/06/2017.
  */
-public class ProfileExportCollectRouteBuilder extends RouteBuilder {
+public class ProfileExportCollectRouteBuilder extends 
RouterAbstractRouteBuilder {
 
     private static final Logger logger = 
LoggerFactory.getLogger(ProfileExportCollectRouteBuilder.class);
 
@@ -38,7 +42,9 @@ public class ProfileExportCollectRouteBuilder extends 
RouteBuilder {
     private ImportExportConfigurationService<ExportConfiguration> 
exportConfigurationService;
     private PersistenceService persistenceService;
 
-    private String allowedEndpoints;
+    public ProfileExportCollectRouteBuilder(Map<String, String> kafkaProps, 
String configType) {
+        super(kafkaProps, configType);
+    }
 
     @Override
     public void configure() throws Exception {
@@ -48,16 +54,37 @@ public class ProfileExportCollectRouteBuilder extends 
RouteBuilder {
             exportConfigurationList = exportConfigurationService.getAll();
         }
 
+        CollectProfileBean collectProfileBean = new CollectProfileBean();
+        collectProfileBean.setPersistenceService(persistenceService);
+
+
         //Loop on multiple export configuration
         for (final ExportConfiguration exportConfiguration : 
exportConfigurationList) {
-            String endpoint = (String) 
exportConfiguration.getProperties().get("destination");
-
-            if (StringUtils.isNotBlank(endpoint) && 
allowedEndpoints.contains(endpoint.substring(0, endpoint.indexOf(':')))) {
-                List<Profile> profilesCollected = 
persistenceService.query("segments", (String) 
exportConfiguration.getProperties().get("segments"),
-                         null, Profile.class);
-                logger.info("Collected +++{}+++ profiles.", 
profilesCollected.size());
+            if (exportConfiguration.getProperties() != null && 
exportConfiguration.getProperties().size() > 0) {
+                if ((Map<String, String>) 
exportConfiguration.getProperties().get("mapping") != null) {
+                    String destinationEndpoint = (String) 
exportConfiguration.getProperties().get("destination");
+                    if (StringUtils.isNotBlank(destinationEndpoint) && 
allowedEndpoints.contains(destinationEndpoint.substring(0, 
destinationEndpoint.indexOf(':')))) {
+                        ProcessorDefinition prDef = 
from("timer://collectProfile?fixedRate=true&period=" + (String) 
exportConfiguration.getProperties().get("period"))
+                                .autoStartup(exportConfiguration.isActive())
+                                .bean(collectProfileBean, 
"extractProfileBySegment(" + exportConfiguration.getProperties().get("segment") 
+ ")")
+                                .split(body())
+                                .marshal(jacksonDataFormat)
+                                .convertBodyTo(String.class)
+                                
.setHeader(RouterConstants.HEADER_EXPORT_CONFIG, constant(exportConfiguration))
+                                .log(LoggingLevel.DEBUG, "BODY : ${body}");
+                        if 
(RouterConstants.CONFIG_TYPE_KAFKA.equals(configType)) {
+                            prDef.to((KafkaEndpoint) 
getEndpointURI(RouterConstants.DIRECTION_FROM, 
RouterConstants.DIRECT_EXPORT_DEPOSIT_BUFFER));
+                        } else {
+                            prDef.to((String) 
getEndpointURI(RouterConstants.DIRECTION_FROM, 
RouterConstants.DIRECT_EXPORT_DEPOSIT_BUFFER));
+                        }
+                    } else {
+                        logger.error("Endpoint scheme {} is not allowed, route 
{} will be skipped.", destinationEndpoint.substring(0, 
destinationEndpoint.indexOf(':')), exportConfiguration.getItemId());
+                    }
+                } else {
+                    logger.warn("Mapping is null in export configuration, 
route {} will be skipped!", exportConfiguration.getItemId());
+                }
             } else {
-                logger.error("Endpoint scheme {} is not allowed, route {} will 
be skipped.", endpoint.substring(0, endpoint.indexOf(':')), 
exportConfiguration.getItemId());
+                logger.warn("Export configuration incomplete, route {} will be 
skipped!", exportConfiguration.getItemId());
             }
         }
     }
@@ -74,8 +101,4 @@ public class ProfileExportCollectRouteBuilder extends 
RouteBuilder {
         this.persistenceService = persistenceService;
     }
 
-    public void setAllowedEndpoints(String allowedEndpoints) {
-        this.allowedEndpoints = allowedEndpoints;
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/b7194f47/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileExportProducerRouteBuilder.java
----------------------------------------------------------------------
diff --git 
a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileExportProducerRouteBuilder.java
 
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileExportProducerRouteBuilder.java
new file mode 100644
index 0000000..0b0b60a
--- /dev/null
+++ 
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileExportProducerRouteBuilder.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.unomi.router.core.route;
+
+import org.apache.camel.component.kafka.KafkaEndpoint;
+import org.apache.camel.model.RouteDefinition;
+import org.apache.unomi.router.api.RouterConstants;
+import org.apache.unomi.router.core.processor.ExportRouteCompletionProcessor;
+import org.apache.unomi.router.core.processor.LineBuildProcessor;
+import org.apache.unomi.router.core.strategy.StringLinesAggregationStrategy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+/**
+ * Created by amidani on 28/06/2017.
+ */
+public class ProfileExportProducerRouteBuilder extends 
RouterAbstractRouteBuilder {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(ProfileExportProducerRouteBuilder.class);
+
+    private ExportRouteCompletionProcessor exportRouteCompletionProcessor;
+
+    public ProfileExportProducerRouteBuilder(Map<String, String> kafkaProps, 
String configType) {
+        super(kafkaProps, configType);
+    }
+
+    @Override
+    public void configure() throws Exception {
+
+        logger.info("Configure Recurrent Route 'Export :: Data Producer'");
+
+        RouteDefinition rtDef;
+        if (RouterConstants.CONFIG_TYPE_KAFKA.equals(configType)) {
+            rtDef = from((KafkaEndpoint) 
getEndpointURI(RouterConstants.DIRECTION_TO, 
RouterConstants.DIRECT_EXPORT_DEPOSIT_BUFFER));
+        } else {
+            rtDef = from((String) getEndpointURI(RouterConstants.DIRECTION_TO, 
RouterConstants.DIRECT_EXPORT_DEPOSIT_BUFFER));
+        }
+
+        rtDef.unmarshal(jacksonDataFormat)
+                .process(new LineBuildProcessor())
+                .aggregate(constant(true), new 
StringLinesAggregationStrategy())
+                
.completionPredicate(exchangeProperty("CamelSplitSize").isEqualTo(exchangeProperty("CamelAggregatedSize")))
+                .eagerCheckCompletion()
+                .process(exportRouteCompletionProcessor)
+                .toD("${in.header.exportConfig.getProperty('destination')}");
+
+    }
+
+    public void 
setExportRouteCompletionProcessor(ExportRouteCompletionProcessor 
exportRouteCompletionProcessor) {
+        this.exportRouteCompletionProcessor = exportRouteCompletionProcessor;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/b7194f47/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportAbstractRouteBuilder.java
----------------------------------------------------------------------
diff --git 
a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportAbstractRouteBuilder.java
 
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportAbstractRouteBuilder.java
deleted file mode 100644
index bacc38e..0000000
--- 
a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportAbstractRouteBuilder.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.unomi.router.core.route;
-
-import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.jackson.JacksonDataFormat;
-import org.apache.camel.component.kafka.KafkaComponent;
-import org.apache.camel.component.kafka.KafkaConfiguration;
-import org.apache.camel.component.kafka.KafkaEndpoint;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.unomi.router.api.RouterConstants;
-
-import java.util.Map;
-
-/**
- * Created by amidani on 13/06/2017.
- */
-public abstract class ProfileImportAbstractRouteBuilder extends RouteBuilder {
-
-    protected JacksonDataFormat jacksonDataFormat;
-
-    protected String kafkaHost;
-    protected String kafkaPort;
-    protected String kafkaImportTopic;
-    protected String kafkaImportGroupId;
-    protected String kafkaImportConsumerCount;
-    protected String kafkaImportAutoCommit;
-
-    protected String configType;
-
-    public ProfileImportAbstractRouteBuilder(Map<String, String> kafkaProps, 
String configType) {
-        this.kafkaHost = kafkaProps.get("kafkaHost");
-        this.kafkaPort = kafkaProps.get("kafkaPort");
-        this.kafkaImportTopic = kafkaProps.get("kafkaImportTopic");
-        this.kafkaImportGroupId = kafkaProps.get("kafkaImportGroupId");
-        this.kafkaImportConsumerCount = 
kafkaProps.get("kafkaImportConsumerCount");
-        this.kafkaImportAutoCommit = kafkaProps.get("kafkaImportAutoCommit");
-        this.configType = configType;
-    }
-
-    public Object getEndpointURI(String direction) {
-        Object endpoint;
-        if (RouterConstants.CONFIG_TYPE_KAFKA.equals(configType)) {
-            //Prepare Kafka Deposit
-            StringBuilder kafkaUri = new StringBuilder("kafka:");
-            
kafkaUri.append(kafkaHost).append(":").append(kafkaPort).append("?topic=").append(kafkaImportTopic);
-            if (StringUtils.isNotBlank(kafkaImportGroupId)) {
-                kafkaUri.append("&groupId=" + kafkaImportGroupId);
-            }
-            if (RouterConstants.DIRECTION_TO.equals(direction)) {
-                kafkaUri.append("&autoCommitEnable=" + kafkaImportAutoCommit + 
"&consumersCount=" + kafkaImportConsumerCount);
-            }
-            KafkaConfiguration kafkaConfiguration = new KafkaConfiguration();
-            kafkaConfiguration.setBrokers(kafkaHost + ":" + kafkaPort);
-            kafkaConfiguration.setTopic(kafkaImportTopic);
-            kafkaConfiguration.setGroupId(kafkaImportGroupId);
-            endpoint = new KafkaEndpoint(kafkaUri.toString(), new 
KafkaComponent(this.getContext()));
-            ((KafkaEndpoint) endpoint).setConfiguration(kafkaConfiguration);
-        } else {
-            endpoint = RouterConstants.DIRECT_DEPOSIT_BUFFER;
-        }
-
-        return endpoint;
-    }
-
-    public void setJacksonDataFormat(JacksonDataFormat jacksonDataFormat) {
-        this.jacksonDataFormat = jacksonDataFormat;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/b7194f47/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportFromSourceRouteBuilder.java
----------------------------------------------------------------------
diff --git 
a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportFromSourceRouteBuilder.java
 
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportFromSourceRouteBuilder.java
index 7f54884..2dc87f3 100644
--- 
a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportFromSourceRouteBuilder.java
+++ 
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportFromSourceRouteBuilder.java
@@ -23,8 +23,8 @@ import org.apache.camel.component.kafka.KafkaEndpoint;
 import org.apache.camel.model.ProcessorDefinition;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.unomi.router.api.ImportConfiguration;
-import org.apache.unomi.router.api.services.ImportExportConfigurationService;
 import org.apache.unomi.router.api.RouterConstants;
+import org.apache.unomi.router.api.services.ImportExportConfigurationService;
 import org.apache.unomi.router.core.exception.BadProfileDataFormatException;
 import org.apache.unomi.router.core.processor.LineSplitFailureHandler;
 import org.apache.unomi.router.core.processor.LineSplitProcessor;
@@ -39,15 +39,13 @@ import java.util.Map;
  * Created by amidani on 26/04/2017.
  */
 
-public class ProfileImportFromSourceRouteBuilder extends 
ProfileImportAbstractRouteBuilder {
+public class ProfileImportFromSourceRouteBuilder extends 
RouterAbstractRouteBuilder {
 
     private static final Logger logger = 
LoggerFactory.getLogger(ProfileImportFromSourceRouteBuilder.class.getName());
 
     private List<ImportConfiguration> importConfigurationList;
     private ImportExportConfigurationService<ImportConfiguration> 
importConfigurationService;
 
-    private String allowedEndpoints;
-
     public ProfileImportFromSourceRouteBuilder(Map<String, String> kafkaProps, 
String configType) {
         super(kafkaProps, configType);
     }
@@ -67,9 +65,9 @@ public class ProfileImportFromSourceRouteBuilder extends 
ProfileImportAbstractRo
                 .process(new LineSplitFailureHandler());
 
         if (RouterConstants.CONFIG_TYPE_KAFKA.equals(configType)) {
-            prDefErr.to((KafkaEndpoint) 
getEndpointURI(RouterConstants.DIRECTION_FROM));
+            prDefErr.to((KafkaEndpoint) 
getEndpointURI(RouterConstants.DIRECTION_FROM, 
RouterConstants.DIRECT_IMPORT_DEPOSIT_BUFFER));
         } else {
-            prDefErr.to((String) 
getEndpointURI(RouterConstants.DIRECTION_FROM));
+            prDefErr.to((String) 
getEndpointURI(RouterConstants.DIRECTION_FROM, 
RouterConstants.DIRECT_IMPORT_DEPOSIT_BUFFER));
         }
 
         //Loop on multiple import configuration
@@ -113,9 +111,9 @@ public class ProfileImportFromSourceRouteBuilder extends 
ProfileImportAbstractRo
                             .convertBodyTo(String.class);
 
                     if (RouterConstants.CONFIG_TYPE_KAFKA.equals(configType)) {
-                        prDef.to((KafkaEndpoint) 
getEndpointURI(RouterConstants.DIRECTION_FROM));
+                        prDef.to((KafkaEndpoint) 
getEndpointURI(RouterConstants.DIRECTION_FROM, 
RouterConstants.DIRECT_IMPORT_DEPOSIT_BUFFER));
                     } else {
-                        prDef.to((String) 
getEndpointURI(RouterConstants.DIRECTION_FROM));
+                        prDef.to((String) 
getEndpointURI(RouterConstants.DIRECTION_FROM, 
RouterConstants.DIRECT_IMPORT_DEPOSIT_BUFFER));
                     }
                 } else {
                     logger.error("Endpoint scheme {} is not allowed, route {} 
will be skipped.", endpoint.substring(0, endpoint.indexOf(':')), 
importConfiguration.getItemId());
@@ -132,8 +130,4 @@ public class ProfileImportFromSourceRouteBuilder extends 
ProfileImportAbstractRo
         this.importConfigurationService = importConfigurationService;
     }
 
-    public void setAllowedEndpoints(String allowedEndpoints) {
-        this.allowedEndpoints = allowedEndpoints;
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/b7194f47/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportOneShotRouteBuilder.java
----------------------------------------------------------------------
diff --git 
a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportOneShotRouteBuilder.java
 
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportOneShotRouteBuilder.java
index a94b5ed..0913876 100644
--- 
a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportOneShotRouteBuilder.java
+++ 
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportOneShotRouteBuilder.java
@@ -17,7 +17,6 @@
 package org.apache.unomi.router.core.route;
 
 import org.apache.camel.LoggingLevel;
-import org.apache.camel.component.jackson.JacksonDataFormat;
 import org.apache.camel.component.kafka.KafkaEndpoint;
 import org.apache.camel.model.ProcessorDefinition;
 import org.apache.unomi.router.api.RouterConstants;
@@ -33,15 +32,12 @@ import java.util.Map;
 /**
  * Created by amidani on 22/05/2017.
  */
-public class ProfileImportOneShotRouteBuilder extends 
ProfileImportAbstractRouteBuilder {
+public class ProfileImportOneShotRouteBuilder extends 
RouterAbstractRouteBuilder {
 
     private Logger logger = 
LoggerFactory.getLogger(ProfileImportOneShotRouteBuilder.class.getName());
-
     private ImportConfigByFileNameProcessor importConfigByFileNameProcessor;
     private String uploadDir;
 
-    private final String IMPORT_ONESHOT_ROUTE_ID = "ONE_SHOT_ROUTE";
-
     public ProfileImportOneShotRouteBuilder(Map<String, String> kafkaProps, 
String configType) {
         super(kafkaProps, configType);
     }
@@ -57,27 +53,27 @@ public class ProfileImportOneShotRouteBuilder extends 
ProfileImportAbstractRoute
                 .process(new LineSplitFailureHandler());
 
         if (RouterConstants.CONFIG_TYPE_KAFKA.equals(configType)) {
-            prDefErr.to((KafkaEndpoint) 
getEndpointURI(RouterConstants.DIRECTION_FROM));
+            prDefErr.to((KafkaEndpoint) 
getEndpointURI(RouterConstants.DIRECTION_FROM, 
RouterConstants.DIRECT_IMPORT_DEPOSIT_BUFFER));
         } else {
-            prDefErr.to((String) 
getEndpointURI(RouterConstants.DIRECTION_FROM));
+            prDefErr.to((String) 
getEndpointURI(RouterConstants.DIRECTION_FROM, 
RouterConstants.DIRECT_IMPORT_DEPOSIT_BUFFER));
         }
 
         LineSplitProcessor lineSplitProcessor = new LineSplitProcessor();
 
-        ProcessorDefinition prDef = 
from("file://"+uploadDir+"?include=.*.csv&consumer.delay=1m")
-                .routeId(IMPORT_ONESHOT_ROUTE_ID)
+        ProcessorDefinition prDef = from("file://" + uploadDir + 
"?include=.*.csv&consumer.delay=1m")
+                .routeId(RouterConstants.IMPORT_ONESHOT_ROUTE_ID)
                 .autoStartup(true)
                 .process(importConfigByFileNameProcessor)
                 
.split(bodyAs(String.class).tokenize("${in.header.importConfigOneShot.getLineSeparator}"))
-                .setHeader("configType", constant(configType))
+                .setHeader(RouterConstants.HEADER_CONFIG_TYPE, 
constant(configType))
                 .process(lineSplitProcessor)
                 .to("log:org.apache.unomi.router?level=INFO")
                 .marshal(jacksonDataFormat)
                 .convertBodyTo(String.class);
-        if(RouterConstants.CONFIG_TYPE_KAFKA.equals(configType)){
-            prDef.to((KafkaEndpoint) 
getEndpointURI(RouterConstants.DIRECTION_FROM));
+        if (RouterConstants.CONFIG_TYPE_KAFKA.equals(configType)) {
+            prDef.to((KafkaEndpoint) 
getEndpointURI(RouterConstants.DIRECTION_FROM, 
RouterConstants.DIRECT_IMPORT_DEPOSIT_BUFFER));
         } else {
-            prDef.to((String) getEndpointURI(RouterConstants.DIRECTION_FROM));
+            prDef.to((String) getEndpointURI(RouterConstants.DIRECTION_FROM, 
RouterConstants.DIRECT_IMPORT_DEPOSIT_BUFFER));
         }
     }
 
@@ -88,8 +84,4 @@ public class ProfileImportOneShotRouteBuilder extends 
ProfileImportAbstractRoute
     public void setUploadDir(String uploadDir) {
         this.uploadDir = uploadDir;
     }
-
-    public void setJacksonDataFormat(JacksonDataFormat jacksonDataFormat) {
-        this.jacksonDataFormat = jacksonDataFormat;
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/b7194f47/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportToUnomiRouteBuilder.java
----------------------------------------------------------------------
diff --git 
a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportToUnomiRouteBuilder.java
 
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportToUnomiRouteBuilder.java
index d75977b..759dde4 100644
--- 
a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportToUnomiRouteBuilder.java
+++ 
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportToUnomiRouteBuilder.java
@@ -17,11 +17,10 @@
 package org.apache.unomi.router.core.route;
 
 import org.apache.camel.LoggingLevel;
-import org.apache.camel.component.jackson.JacksonDataFormat;
 import org.apache.camel.component.kafka.KafkaEndpoint;
 import org.apache.camel.model.RouteDefinition;
 import org.apache.unomi.router.api.RouterConstants;
-import org.apache.unomi.router.core.processor.RouteCompletionProcessor;
+import org.apache.unomi.router.core.processor.ImportRouteCompletionProcessor;
 import org.apache.unomi.router.core.processor.UnomiStorageProcessor;
 import org.apache.unomi.router.core.strategy.ArrayListAggregationStrategy;
 import org.slf4j.Logger;
@@ -32,12 +31,12 @@ import java.util.Map;
 /**
  * Created by amidani on 26/04/2017.
  */
-public class ProfileImportToUnomiRouteBuilder extends 
ProfileImportAbstractRouteBuilder {
+public class ProfileImportToUnomiRouteBuilder extends 
RouterAbstractRouteBuilder {
 
     private Logger logger = 
LoggerFactory.getLogger(ProfileImportToUnomiRouteBuilder.class.getName());
 
     private UnomiStorageProcessor unomiStorageProcessor;
-    private RouteCompletionProcessor routeCompletionProcessor;
+    private ImportRouteCompletionProcessor importRouteCompletionProcessor;
 
     public ProfileImportToUnomiRouteBuilder(Map<String, String> kafkaProps, 
String configType) {
         super(kafkaProps, configType);
@@ -50,9 +49,9 @@ public class ProfileImportToUnomiRouteBuilder extends 
ProfileImportAbstractRoute
 
         RouteDefinition rtDef;
         if (RouterConstants.CONFIG_TYPE_KAFKA.equals(configType)) {
-            rtDef = from((KafkaEndpoint) 
getEndpointURI(RouterConstants.DIRECTION_TO));
+            rtDef = from((KafkaEndpoint) 
getEndpointURI(RouterConstants.DIRECTION_TO, 
RouterConstants.DIRECT_IMPORT_DEPOSIT_BUFFER));
         } else {
-            rtDef = from((String) 
getEndpointURI(RouterConstants.DIRECTION_TO));
+            rtDef = from((String) getEndpointURI(RouterConstants.DIRECTION_TO, 
RouterConstants.DIRECT_IMPORT_DEPOSIT_BUFFER));
         }
         rtDef.choice()
                 .when(header(RouterConstants.HEADER_FAILED_MESSAGE).isNull())
@@ -64,7 +63,7 @@ public class ProfileImportToUnomiRouteBuilder extends 
ProfileImportAbstractRoute
                 .aggregate(constant(true), new ArrayListAggregationStrategy())
                 
.completionPredicate(exchangeProperty("CamelSplitComplete").isEqualTo("true"))
                 .eagerCheckCompletion()
-                .process(routeCompletionProcessor)
+                .process(importRouteCompletionProcessor)
                 .to("log:org.apache.unomi.router?level=INFO");
     }
 
@@ -72,11 +71,7 @@ public class ProfileImportToUnomiRouteBuilder extends 
ProfileImportAbstractRoute
         this.unomiStorageProcessor = unomiStorageProcessor;
     }
 
-    public void setRouteCompletionProcessor(RouteCompletionProcessor 
routeCompletionProcessor) {
-        this.routeCompletionProcessor = routeCompletionProcessor;
-    }
-
-    public void setJacksonDataFormat(JacksonDataFormat jacksonDataFormat) {
-        this.jacksonDataFormat = jacksonDataFormat;
+    public void 
setImportRouteCompletionProcessor(ImportRouteCompletionProcessor 
importRouteCompletionProcessor) {
+        this.importRouteCompletionProcessor = importRouteCompletionProcessor;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/b7194f47/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/RouterAbstractRouteBuilder.java
----------------------------------------------------------------------
diff --git 
a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/RouterAbstractRouteBuilder.java
 
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/RouterAbstractRouteBuilder.java
new file mode 100644
index 0000000..5db9917
--- /dev/null
+++ 
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/RouterAbstractRouteBuilder.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.unomi.router.core.route;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.jackson.JacksonDataFormat;
+import org.apache.camel.component.kafka.KafkaComponent;
+import org.apache.camel.component.kafka.KafkaConfiguration;
+import org.apache.camel.component.kafka.KafkaEndpoint;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.unomi.router.api.RouterConstants;
+
+import java.util.Map;
+
+/**
+ * Created by amidani on 13/06/2017.
+ */
+public abstract class RouterAbstractRouteBuilder extends RouteBuilder {
+
+    protected JacksonDataFormat jacksonDataFormat;
+
+    protected String kafkaHost;
+    protected String kafkaPort;
+    protected String kafkaImportTopic;
+    protected String kafkaExportTopic;
+    protected String kafkaImportGroupId;
+    protected String kafkaExportGroupId;
+    protected String kafkaConsumerCount;
+    protected String kafkaAutoCommit;
+
+    protected String configType;
+    protected String allowedEndpoints;
+
+    public RouterAbstractRouteBuilder(Map<String, String> kafkaProps, String 
configType) {
+        this.kafkaHost = kafkaProps.get("kafkaHost");
+        this.kafkaPort = kafkaProps.get("kafkaPort");
+        this.kafkaImportTopic = kafkaProps.get("kafkaImportTopic");
+        this.kafkaExportTopic = kafkaProps.get("kafkaExportTopic");
+        this.kafkaImportGroupId = kafkaProps.get("kafkaImportGroupId");
+        this.kafkaExportGroupId = kafkaProps.get("kafkaExportGroupId");
+        this.kafkaConsumerCount = kafkaProps.get("kafkaConsumerCount");
+        this.kafkaAutoCommit = kafkaProps.get("kafkaAutoCommit");
+        this.configType = configType;
+    }
+
+    public Object getEndpointURI(String direction, String 
operationDepositBuffer) {
+        Object endpoint;
+        if (RouterConstants.CONFIG_TYPE_KAFKA.equals(configType)) {
+            String kafkaTopic = kafkaImportTopic;
+            String kafkaGroupId = kafkaImportGroupId;
+            if 
(RouterConstants.DIRECT_EXPORT_DEPOSIT_BUFFER.equals(operationDepositBuffer)) {
+                kafkaTopic = kafkaExportTopic;
+                kafkaGroupId = kafkaExportGroupId;
+            }
+            //Prepare Kafka Deposit
+            StringBuilder kafkaUri = new StringBuilder("kafka:");
+            
kafkaUri.append(kafkaHost).append(":").append(kafkaPort).append("?topic=").append(kafkaTopic);
+            if (StringUtils.isNotBlank(kafkaGroupId)) {
+                kafkaUri.append("&groupId=" + kafkaGroupId);
+            }
+            if (RouterConstants.DIRECTION_TO.equals(direction)) {
+                kafkaUri.append("&autoCommitEnable=" + kafkaAutoCommit + 
"&consumersCount=" + kafkaConsumerCount);
+            }
+            KafkaConfiguration kafkaConfiguration = new KafkaConfiguration();
+            kafkaConfiguration.setBrokers(kafkaHost + ":" + kafkaPort);
+            kafkaConfiguration.setTopic(kafkaTopic);
+            kafkaConfiguration.setGroupId(kafkaGroupId);
+            endpoint = new KafkaEndpoint(kafkaUri.toString(), new 
KafkaComponent(this.getContext()));
+            ((KafkaEndpoint) endpoint).setConfiguration(kafkaConfiguration);
+        } else {
+            endpoint = operationDepositBuffer;
+        }
+
+        return endpoint;
+    }
+
+    public void setJacksonDataFormat(JacksonDataFormat jacksonDataFormat) {
+        this.jacksonDataFormat = jacksonDataFormat;
+    }
+
+    public void setAllowedEndpoints(String allowedEndpoints) {
+        this.allowedEndpoints = allowedEndpoints;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/b7194f47/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/strategy/ArrayListAggregationStrategy.java
----------------------------------------------------------------------
diff --git 
a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/strategy/ArrayListAggregationStrategy.java
 
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/strategy/ArrayListAggregationStrategy.java
index a53e34b..ca87ad3 100644
--- 
a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/strategy/ArrayListAggregationStrategy.java
+++ 
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/strategy/ArrayListAggregationStrategy.java
@@ -18,8 +18,6 @@ package org.apache.unomi.router.core.strategy;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.processor.aggregate.AggregationStrategy;
-import org.apache.camel.processor.aggregate.CompletionAwareAggregationStrategy;
-import org.apache.unomi.router.core.processor.RouteCompletionProcessor;
 
 import java.util.ArrayList;
 

http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/b7194f47/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/strategy/StringLinesAggregationStrategy.java
----------------------------------------------------------------------
diff --git 
a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/strategy/StringLinesAggregationStrategy.java
 
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/strategy/StringLinesAggregationStrategy.java
new file mode 100644
index 0000000..5a69001
--- /dev/null
+++ 
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/strategy/StringLinesAggregationStrategy.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.unomi.router.core.strategy;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.processor.aggregate.AggregationStrategy;
+import org.apache.unomi.router.api.ExportConfiguration;
+
+/**
+ * Created by amidani on 29/06/2017.
+ */
+public class StringLinesAggregationStrategy implements AggregationStrategy {
+
+    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+        Object newBody = newExchange.getIn().getBody(String.class);
+        String lineSeparator = newExchange.getIn().getHeader("exportConfig", 
ExportConfiguration.class).getLineSeparator();
+        if (oldExchange != null) {
+            String fileContent = oldExchange.getIn().getBody(String.class);
+
+            fileContent += lineSeparator + newBody;
+            oldExchange.getIn().setBody(fileContent);
+            return oldExchange;
+        } else {
+            return newExchange;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/b7194f47/extensions/router/router-core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
----------------------------------------------------------------------
diff --git 
a/extensions/router/router-core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
 
b/extensions/router/router-core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
index 5ae1e9c..3b155b2 100644
--- 
a/extensions/router/router-core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
+++ 
b/extensions/router/router-core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
@@ -27,15 +27,16 @@
         <cm:default-properties>
             <cm:property name="config.type" value="nobroker"/>
             <cm:property name="config.allowedEndpoints" value="file,ftp"/>
-            <cm:property name="config.internalPort" value="8233"/>
             <cm:property name="kafka.host" value="localhost"/>
             <cm:property name="kafka.port" value="9092"/>
-            <cm:property name="kafka.import.topic" value="camel-deposit"/>
+            <cm:property name="kafka.import.topic" value="import-deposit"/>
+            <cm:property name="kafka.export.topic" value="export-deposit"/>
             <cm:property name="kafka.import.groupId" 
value="unomi-import-group"/>
-            <cm:property name="kafka.import.consumerCount" value="10"/>
-            <cm:property name="kafka.import.autoCommit" value="true"/>
+            <cm:property name="kafka.export.groupId" 
value="unomi-export-group"/>
+            <cm:property name="kafka.consumerCount" value="10"/>
+            <cm:property name="kafka.autoCommit" value="true"/>
             <cm:property name="import.oneshot.uploadDir" 
value="/tmp/oneshot_import_configs/"/>
-            <cm:property name="import.executionsHistory.size" value="5"/>
+            <cm:property name="executionsHistory.size" value="5"/>
         </cm:default-properties>
     </cm:property-placeholder>
 
@@ -43,9 +44,14 @@
         <property name="profileImportService" ref="profileImportService"/>
     </bean>
 
-    <bean id="routeCompletionProcessor" 
class="org.apache.unomi.router.core.processor.RouteCompletionProcessor">
+    <bean id="importRouteCompletionProcessor" 
class="org.apache.unomi.router.core.processor.ImportRouteCompletionProcessor">
         <property name="importConfigurationService" 
ref="importConfigurationService"/>
-        <property name="executionsHistorySize" 
value="${import.executionsHistory.size}"/>
+        <property name="executionsHistorySize" 
value="${executionsHistory.size}"/>
+    </bean>
+
+    <bean id="exportRouteCompletionProcessor" 
class="org.apache.unomi.router.core.processor.ExportRouteCompletionProcessor">
+        <property name="exportConfigurationService" 
ref="exportConfigurationService"/>
+        <property name="executionsHistorySize" 
value="${executionsHistory.size}"/>
     </bean>
 
     <bean id="importConfigByFileNameProcessor" 
class="org.apache.unomi.router.core.processor.ImportConfigByFileNameProcessor">
@@ -86,26 +92,29 @@
           init-method="initCamelContext" destroy-method="preDestroy">
         <property name="configType" value="${config.type}"/>
         <property name="allowedEndpoints" value="${config.allowedEndpoints}"/>
+        <property name="uploadDir" value="${import.oneshot.uploadDir}"/>
+        <property name="bundleContext" ref="blueprintBundleContext"/>
+        <property name="jacksonDataFormat" ref="jacksonDataFormat"/>
         <property name="kafkaProps">
             <map>
                 <entry key="kafkaHost" value="${kafka.host}"/>
                 <entry key="kafkaPort" value="${kafka.port}"/>
                 <entry key="kafkaImportTopic" value="${kafka.import.topic}"/>
+                <entry key="kafkaExportTopic" value="${kafka.export.topic}"/>
                 <entry key="kafkaImportGroupId" 
value="${kafka.import.groupId}"/>
-                <entry key="kafkaImportConsumerCount" 
value="${kafka.import.consumerCount}"/>
-                <entry key="kafkaImportAutoCommit" 
value="${kafka.import.autoCommit}"/>
+                <entry key="kafkaExportGroupId" 
value="${kafka.export.groupId}"/>
+                <entry key="kafkaConsumerCount" 
value="${kafka.consumerCount}"/>
+                <entry key="kafkaAutoCommit" value="${kafka.autoCommit}"/>
             </map>
         </property>
-        <property name="uploadDir" value="${import.oneshot.uploadDir}"/>
         <property name="unomiStorageProcessor" ref="unomiStorageProcessor"/>
-        <property name="routeCompletionProcessor" 
ref="routeCompletionProcessor"/>
+        <property name="importRouteCompletionProcessor" 
ref="importRouteCompletionProcessor"/>
+        <property name="exportRouteCompletionProcessor" 
ref="exportRouteCompletionProcessor"/>
         <property name="importConfigByFileNameProcessor" 
ref="importConfigByFileNameProcessor"/>
-        <property name="importConfigurationService" 
ref="importConfigurationService"/>
+        <property name="configSharingService" ref="configSharingService" />
         <property name="exportConfigurationService" 
ref="exportConfigurationService"/>
+        <property name="importConfigurationService" 
ref="importConfigurationService"/>
         <property name="persistenceService" ref="persistenceService"/>
-        <property name="jacksonDataFormat" ref="jacksonDataFormat"/>
-        <property name="bundleContext" ref="blueprintBundleContext"/>
-        <property name="configSharingService" ref="configSharingService" />
     </bean>
 
     <camel:camelContext id="httpEndpoint" 
xmlns="http://camel.apache.org/schema/blueprint";>
@@ -116,6 +125,10 @@
         <property name="routerCamelContext" ref="camelContext"/>
     </bean>
 
+    <bean id="collectProfileBean" 
class="org.apache.unomi.router.core.bean.CollectProfileBean">
+        <property name="persistenceService" ref="persistenceService"/>
+    </bean>
+
     <reference id="configSharingService" 
interface="org.apache.unomi.api.services.ConfigSharingService" />
     <reference id="httpService" interface="org.osgi.service.http.HttpService"/>
     <reference id="profileImportService" 
interface="org.apache.unomi.router.api.services.ProfileImportService"/>

http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/b7194f47/extensions/router/router-core/src/main/resources/org.apache.unomi.router.cfg
----------------------------------------------------------------------
diff --git 
a/extensions/router/router-core/src/main/resources/org.apache.unomi.router.cfg 
b/extensions/router/router-core/src/main/resources/org.apache.unomi.router.cfg
index 2aa385f..8f29d65 100644
--- 
a/extensions/router/router-core/src/main/resources/org.apache.unomi.router.cfg
+++ 
b/extensions/router/router-core/src/main/resources/org.apache.unomi.router.cfg
@@ -23,19 +23,18 @@ import.config.type=nobroker
 #Kafka
 #kafka.host=localhost
 #kafka.port=9092
-#kafka.import.topic=camel-deposit
+#kafka.import.topic=import-deposit
+#kafka.export.topic=export-deposit
 #kafka.import.groupId=unomi-import-group
-#kafka.import.consumerCount=10
-#kafka.import.autoCommit=true
+#kafka.export.groupId=unomi-import-group
+#kafka.consumerCount=10
+#kafka.autoCommit=true
 
 #Import One Shot upload directory
 import.oneshot.uploadDir=${karaf.data}/tmp/unomi_oneshot_import_configs/
 
-#Import executions history size
-import.executionsHistory.size=5
+#Import/Export executions history size
+executionsHistory.size=5
 
 #Allowed source endpoints
-config.allowedEndpoints=file,ftp
-
-#Internal Camel REST services port (Not public - DO NOT OPEN TO PUBLIC)
-config.internalPort=8233
\ No newline at end of file
+config.allowedEndpoints=file,ftp
\ No newline at end of file

Reply via email to