Hi community, Let me repost this cause I don't get replies in github issue here https://github.com/apache/camel-kafka-connector/issues/1568
I'm testing camel-azure-storage-blob-sink-kafka-connector from k8s. So far, I've confirmed CamelHeader option works <https://github.com/apache/camel-kafka-connector/issues/1530> and the log is uploaded to an arbitrary path. Now I'm testing camel.sink.marshal option to save some usage on Azure Blob Storage, but haven't been successful yet. I've followed some configuration like zipfile for camel.sink.marshal option, but faced the following errors. 2023-09-27 03:14:35,322 ERROR [blob-connector|task-0] WorkerSinkTask{id=blob-connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.Work erTask) [task-thread-blob-connector-0] org.apache.kafka.connect.errors.ConnectException: Failed to create and start Camel context at org.apache.camel.kafkaconnector.CamelSinkTask.start(CamelSinkTask.java:159) at org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:315) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:200) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:257) at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:177) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) at java.base/java.lang.Thread.run(Thread.java:833) Caused by: org.apache.camel.RuntimeCamelException: org.apache.camel.VetoCamelContextStartException: Failure creating route from template: ckcMarshal at org.apache.camel.RuntimeCamelException.wrapRuntimeException(RuntimeCamelException.java:66) at org.apache.camel.support.service.BaseService.doFail(BaseService.java:413) at org.apache.camel.impl.engine.AbstractCamelContext.doFail(AbstractCamelContext.java:3550) at org.apache.camel.support.service.BaseService.fail(BaseService.java:342) at org.apache.camel.impl.engine.AbstractCamelContext.failOnStartup(AbstractCamelContext.java:5204) at org.apache.camel.impl.engine.AbstractCamelContext.init(AbstractCamelContext.java:2642) at org.apache.camel.support.service.BaseService.start(BaseService.java:111) at org.apache.camel.impl.engine.AbstractCamelContext.start(AbstractCamelContext.java:2649) at org.apache.camel.impl.DefaultCamelContext.start(DefaultCamelContext.java:262) at org.apache.camel.main.SimpleMain.doStart(SimpleMain.java:43) at org.apache.camel.support.service.BaseService.start(BaseService.java:119) at org.apache.camel.kafkaconnector.CamelSinkTask.start(CamelSinkTask.java:152) ... 9 more Caused by: org.apache.camel.VetoCamelContextStartException: Failure creating route from template: ckcMarshal at org.apache.camel.component.kamelet.KameletComponent$LifecycleHandler.onContextInitialized(KameletComponent.java:433) at org.apache.camel.impl.engine.AbstractCamelContext.doInit(AbstractCamelContext.java:2956) at org.apache.camel.support.service.BaseService.init(BaseService.java:83) at org.apache.camel.impl.engine.AbstractCamelContext.init(AbstractCamelContext.java:2630) ... 15 more Caused by: org.apache.camel.component.kamelet.KameletNotFoundException: Kamelet with id ckcMarshal not found in locations: classpath:/kamelets at org.apache.camel.component.kamelet.KameletComponent$LifecycleHandler.createRouteForEndpoint(KameletComponent.java:421) at org.apache.camel.component.kamelet.KameletComponent$LifecycleHandler.onContextInitialized(KameletComponent.java:430) ... 18 more Caused by: org.apache.camel.FailedToCreateRouteException: Failed to create route ckcMarshal-4 at: >>> Marshal[CustomDataFormat[{{marshal}}]] <<< in route: Route(ckcMarshal-4)[From[kamelet://source?routeId=ckcMarshal... because of Cannot find data format in registry with ref: zipfile at org.apache.camel.reifier.RouteReifier.doCreateRoute(RouteReifier.java:241) at org.apache.camel.reifier.RouteReifier.createRoute(RouteReifier.java:75) at org.apache.camel.impl.DefaultModelReifierFactory.createRoute(DefaultModelReifierFactory.java:49) at org.apache.camel.impl.DefaultCamelContext.startRouteDefinitions(DefaultCamelContext.java:862) at org.apache.camel.component.kamelet.KameletComponent$LifecycleHandler.createRouteForEndpoint(KameletComponent.java:416) ... 19 more Caused by: java.lang.IllegalArgumentException: Cannot find data format in registry with ref: zipfile at org.apache.camel.reifier.dataformat.DataFormatReifier.getDataFormat(DataFormatReifier.java:142) at org.apache.camel.reifier.dataformat.DataFormatReifier.getDataFormat(DataFormatReifier.java:115) at org.apache.camel.reifier.dataformat.CustomDataFormatReifier.doCreateDataFormat(CustomDataFormatReifier.java:35) at org.apache.camel.reifier.dataformat.DataFormatReifier.createDataFormat(DataFormatReifier.java:266) at org.apache.camel.reifier.dataformat.DataFormatReifier.getDataFormat(DataFormatReifier.java:151) at org.apache.camel.reifier.dataformat.DataFormatReifier.getDataFormat(DataFormatReifier.java:111) at org.apache.camel.reifier.MarshalReifier.createProcessor(MarshalReifier.java:35) at org.apache.camel.reifier.ProcessorReifier.makeProcessor(ProcessorReifier.java:847) at org.apache.camel.reifier.ProcessorReifier.addRoutes(ProcessorReifier.java:588) at org.apache.camel.reifier.RouteReifier.doCreateRoute(RouteReifier.java:237) ... 23 more 2023-09-27 03:14:35,323 INFO [blob-connector|task-0] Stopping CamelSinkTask connector task (org.apache.camel.kafkaconnector.CamelSinkTask) [task-thread-blob-connector-0] I've also tested with org.apache.camel.model.dataformat.ZipFileDataFormat, then faced the following. 2023-09-27 03:13:40,704 ERROR [blob-connector|task-0] WorkerSinkTask{id=blob-connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.Work erTask) [task-thread-blob-connector-0] org.apache.kafka.connect.errors.ConnectException: Failed to create and start Camel context at org.apache.camel.kafkaconnector.CamelSinkTask.start(CamelSinkTask.java:159) at org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:315) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:200) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:257) at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:177) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) at java.base/java.lang.Thread.run(Thread.java:833) Caused by: org.apache.camel.RuntimeCamelException: org.apache.camel.VetoCamelContextStartException: Failure creating route from template: ckcMarshal at org.apache.camel.RuntimeCamelException.wrapRuntimeException(RuntimeCamelException.java:66) at org.apache.camel.support.service.BaseService.doFail(BaseService.java:413) at org.apache.camel.impl.engine.AbstractCamelContext.doFail(AbstractCamelContext.java:3550) at org.apache.camel.support.service.BaseService.fail(BaseService.java:342) at org.apache.camel.impl.engine.AbstractCamelContext.failOnStartup(AbstractCamelContext.java:5204) at org.apache.camel.impl.engine.AbstractCamelContext.init(AbstractCamelContext.java:2642) at org.apache.camel.support.service.BaseService.start(BaseService.java:111) at org.apache.camel.impl.engine.AbstractCamelContext.start(AbstractCamelContext.java:2649) at org.apache.camel.impl.DefaultCamelContext.start(DefaultCamelContext.java:262) at org.apache.camel.main.SimpleMain.doStart(SimpleMain.java:43) at org.apache.camel.support.service.BaseService.start(BaseService.java:119) at org.apache.camel.kafkaconnector.CamelSinkTask.start(CamelSinkTask.java:152) ... 9 more Caused by: org.apache.camel.VetoCamelContextStartException: Failure creating route from template: ckcMarshal at org.apache.camel.component.kamelet.KameletComponent$LifecycleHandler.onContextInitialized(KameletComponent.java:433) at org.apache.camel.impl.engine.AbstractCamelContext.doInit(AbstractCamelContext.java:2956) at org.apache.camel.support.service.BaseService.init(BaseService.java:83) at org.apache.camel.impl.engine.AbstractCamelContext.init(AbstractCamelContext.java:2630) ... 15 more Caused by: org.apache.camel.component.kamelet.KameletNotFoundException: Kamelet with id ckcMarshal not found in locations: classpath:/kamelets at org.apache.camel.component.kamelet.KameletComponent$LifecycleHandler.createRouteForEndpoint(KameletComponent.java:421) at org.apache.camel.component.kamelet.KameletComponent$LifecycleHandler.onContextInitialized(KameletComponent.java:430) ... 18 more Caused by: org.apache.camel.FailedToCreateRouteException: Failed to create route ckcMarshal-1 at: >>> Marshal[CustomDataFormat[{{marshal}}]] <<< in route: Route(ckcMarshal-1)[From[kamelet://source?routeId=ckcMarshal... because of Resolving datafor mat: org.apache.camel.model.dataformat.ZipFileDataFormat detected type conflict: Not a DataFormat implementation. Found: org.apache.camel.model.dataformat.ZipFileDataFormat at org.apache.camel.reifier.RouteReifier.doCreateRoute(RouteReifier.java:241) at org.apache.camel.reifier.RouteReifier.createRoute(RouteReifier.java:75) at org.apache.camel.impl.DefaultModelReifierFactory.createRoute(DefaultModelReifierFactory.java:49) at org.apache.camel.impl.DefaultCamelContext.startRouteDefinitions(DefaultCamelContext.java:862) at org.apache.camel.component.kamelet.KameletComponent$LifecycleHandler.createRouteForEndpoint(KameletComponent.java:416) ... 19 more Caused by: java.lang.IllegalArgumentException: Resolving dataformat: org.apache.camel.model.dataformat.ZipFileDataFormat detected type conflict: Not a DataFormat implementation. Found: org.apache.camel.model.dataformat.ZipFileDataFormat at org.apache.camel.impl.engine.DefaultDataFormatResolver.createDataFormatFromResource(DefaultDataFormatResolver.java:76) at org.apache.camel.impl.engine.DefaultDataFormatResolver.createDataFormat(DefaultDataFormatResolver.java:47) at org.apache.camel.impl.engine.AbstractCamelContext.lambda$resolveDataFormat$3(AbstractCamelContext.java:4473) at java.base/java.util.Optional.orElseGet(Optional.java:364) at org.apache.camel.impl.engine.AbstractCamelContext.lambda$resolveDataFormat$4(AbstractCamelContext.java:4473) at java.base/java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1708) at org.apache.camel.impl.engine.AbstractCamelContext.resolveDataFormat(AbstractCamelContext.java:4464) at org.apache.camel.reifier.dataformat.DataFormatReifier.getDataFormat(DataFormatReifier.java:140) at org.apache.camel.reifier.dataformat.DataFormatReifier.getDataFormat(DataFormatReifier.java:115) at org.apache.camel.reifier.dataformat.CustomDataFormatReifier.doCreateDataFormat(CustomDataFormatReifier.java:35) at org.apache.camel.reifier.dataformat.DataFormatReifier.createDataFormat(DataFormatReifier.java:266) at org.apache.camel.reifier.dataformat.DataFormatReifier.getDataFormat(DataFormatReifier.java:151) at org.apache.camel.reifier.dataformat.DataFormatReifier.getDataFormat(DataFormatReifier.java:111) at org.apache.camel.reifier.MarshalReifier.createProcessor(MarshalReifier.java:35) at org.apache.camel.reifier.ProcessorReifier.makeProcessor(ProcessorReifier.java:847) at org.apache.camel.reifier.ProcessorReifier.addRoutes(ProcessorReifier.java:588) at org.apache.camel.reifier.RouteReifier.doCreateRoute(RouteReifier.java:237) ... 23 more Can I have a bit more detailed explanation on how to specify this option? My configuration is something like below so far. spec: class: org.apache.camel.kafkaconnector.azurestorageblobsink.CamelAzurestorageblobsinkSinkConnector config: camel.kamelet.azure-storage-blob-sink.accountName: "my_account" camel.kamelet.azure-storage-blob-sink.accessKey: "xxx" camel.kamelet.azure-storage-blob-sink.containerName: "my_container" camel.beans.aggregate: "#class:org.apache.camel.kafkaconnector.aggregator.StringAggregator" camel.aggregation.size: 10 camel.aggregation.timeout: 500000 key.converter: org.apache.kafka.connect.converters.ByteArrayConverter value.converter: org.apache.kafka.connect.converters.ByteArrayConverter topics: my-logs camel.sink.marshal: zipfile I'm using camel-azure-storage-blob-sink-kafka-connector-3.18.2 and use `uploadBlockBlob` for `CamelHeader.CamelAzureStorageBlobOperation`. Thanks in advance,Satoshi