Hi

You need to think more about your design. If the data is kept in the
table you need to consider using idempotent consumer to filter out old
data. Or you can update the data in the table either by updating a row
by marking it as "seen" or delete it etc.

Or alternative you can use a timer / scheduler to trigger how often to
run, and call the sql in a to, and then to ES

On Mon, Jun 15, 2015 at 11:58 AM, alexis.jacquemart <aceso...@hotmail.fr> wrote:
> I'm currently trying to import some data from Oracle to ElasticSearch (in
> JSON format) using Apache Camel. I'm totally new on this framework, so I was
> thinking that you might help with it !
>
> Here is my code :
>
> from("sql://select log_instance_id, calculated_status, begin_date, end_date,
> mediation_descr_code, duration from
> of_log_mediation_instance?dataSource=myDataSource")
> .to("elasticsearch://boa-elasticsearch?operation=INDEX&indexName=twitter&indexType=tweet")
>
> The problem is that the route never stop because I call it with a from("sql:
> select ...") It finished the first time, then rerun the route until I stop
> the context at the end of a Thread.sleep(10000). I've tried to begin with
> other endpoints but I often have this error :
>
> org.apache.camel.TypeConversionException: Error during type conversion from
> type: java.lang.String to the required type:
> org.elasticsearch.action.index.IndexRequest with value
> [{LOG_INSTANCE_ID=4029927, CALCULATED_STATUS=flowError,
> BEGIN_DATE=2015-05-21 16:54:31.45436, END_DATE=2015-05-21 16:54:31.495735,
> MEDIATION_DESCR_CODE=DiffuserMAJOfrCial, DURATION=0.041},
> {LOG_INSTANCE_ID=4140006, CALCULATED_STATUS=flowCompleted,
> BEGIN_DATE=2015-06-01 11:11:31.569414, END_DATE=2015-06-01 11:11:33.109604,
> MEDIATION_DESCR_CODE=RecevoirDemOpeContrat, DURATION=1.54},
> {LOG_INSTANCE_ID=4038409, CALCULATED_STATUS=flowError, BEGIN_DATE=2015-05-22
> 10:18:11.295776, END_DATE=2015-05-22 10:18:11.299125,
> MEDIATION_DESCR_CODE=DiffuserMAJOfrCial, DURATION=0.004},
> {LOG_INSTANCE_ID=4144321, CALCULATED_STATUS=flowCompleted,
> BEGIN_DATE=2015-06-01 15:33:12.37841, END_DATE=2015-06-01 15:33:12.822529,
> MEDIATION_DESCR_CODE=RecevoirDemOpeContrat, DURATION=0.444},
> {LOG_INSTANCE_ID=4039159, CALCULATED_STATUS=flowError, BEGIN_DATE=2015-05-22
> 11:23:56.462823, END_DATE=2015-05-22 11:23:58.15667,
> MEDIATION_DESCR_CODE=DiffuserMAJOfrCial, DURATION=1.694},
> {LOG_INSTANCE_ID=4045264, CALCULATED_STATUS=... [Body clipped after 1000
> chars, total length is 16546230] due java.lang.NullPointerException
>     at
> org.apache.camel.impl.converter.BaseTypeConverterRegistry.createTypeConversionException(BaseTypeConverterRegistry.java:571)[camel-core-2.15.2.jar:2.15.2]
>     at
> org.apache.camel.impl.converter.BaseTypeConverterRegistry.convertTo(BaseTypeConverterRegistry.java:129)[camel-core-2.15.2.jar:2.15.2]
>     at
> org.apache.camel.impl.MessageSupport.getBody(MessageSupport.java:72)[camel-core-2.15.2.jar:2.15.2]
>     at
> org.apache.camel.impl.MessageSupport.getBody(MessageSupport.java:47)[camel-core-2.15.2.jar:2.15.2]
>     at
> org.apache.camel.component.elasticsearch.ElasticsearchProducer.process(ElasticsearchProducer.java:116)[camel-elasticsearch-2.15.2.jar:2.15.2]
>     at
> org.apache.camel.util.AsyncProcessorConverterHelper$ProcessorToAsyncProcessorBridge.process(AsyncProcessorConverterHelper.java:61)[camel-core-2.15.2.jar:2.15.2]
>     at
> org.apache.camel.processor.SendProcessor.process(SendProcessor.java:129)[camel-core-2.15.2.jar:2.15.2]
>     at
> org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77)[camel-core-2.15.2.jar:2.15.2]
>     at
> org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:448)[camel-core-2.15.2.jar:2.15.2]
>     at
> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)[camel-core-2.15.2.jar:2.15.2]
>     at
> org.apache.camel.processor.Pipeline.process(Pipeline.java:118)[camel-core-2.15.2.jar:2.15.2]
>     at
> org.apache.camel.processor.Pipeline.process(Pipeline.java:80)[camel-core-2.15.2.jar:2.15.2]
>     at
> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)[camel-core-2.15.2.jar:2.15.2]
>     at
> org.apache.camel.processor.loadbalancer.QueueLoadBalancer.process(QueueLoadBalancer.java:44)[camel-core-2.15.2.jar:2.15.2]
>     at
> org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:109)[camel-core-2.15.2.jar:2.15.2]
>     at
> org.apache.camel.processor.loadbalancer.LoadBalancerSupport.process(LoadBalancerSupport.java:87)[camel-core-2.15.2.jar:2.15.2]
>     at
> org.apache.camel.component.quartz2.CamelJob.execute(CamelJob.java:56)[camel-quartz2-2.15.2.jar:2.15.2]
>     at
> org.quartz.core.JobRunShell.run(JobRunShell.java:202)[quartz-2.2.1.jar:]
>     at
> org.quartz.simpl.SimpleThreadPool$WorkerThread.run(SimpleThreadPool.java:573)[quartz-2.2.1.jar:]
> [artzScheduler-camel-1_Worker-1] CamelJob                       ERROR Error
> processing exchange. Exchange[Message: [{LOG_INSTANCE_ID=4029927,
> CALCULATED_STATUS=flowError, BEGIN_DATE=2015-05-21 16:54:31.45436,
> END_DATE=2015-05-21 16:54:31.495735,
> MEDIATION_DESCR_CODE=DiffuserMAJOfrCial, DURATION=0.041},
> {LOG_INSTANCE_ID=4140006, CALCULATED_STATUS=flowCompleted,
> BEGIN_DATE=2015-06-01 11:11:31.569414, END_DATE=2015-06-01 11:11:33.109604,
> MEDIATION_DESCR_CODE=RecevoirDemOpeContrat, DURATION=1.54},
> {LOG_INSTANCE_ID=4038409, CALCULATED_STATUS=flowError, BEGIN_DATE=2015-05-22
> 10:18:11.295776, END_DATE=2015-05-22 10:18:11.299125,
> MEDIATION_DESCR_CODE=DiffuserMAJOfrCial, DURATION=0.004},
> {LOG_INSTANCE_ID=4144321, CALCULATED_STATUS=flowCompleted,
> BEGIN_DATE=2015-06-01 15:33:12.37841, END_DATE=2015-06-01 15:33:12.822529,
> MEDIATION_DESCR_CODE=RecevoirDemOpeContrat, DURATION=0.444},
> {LOG_INSTANCE_ID=4039159, CALCULATED_STATUS=flowError, BEGIN_DATE=2015-05-22
> 11:23:56.462823, END_DATE=2015-05-22 11:23:58.15667,
> MEDIATION_DESCR_CODE=DiffuserMAJOfrCial, DURATION=1.694},
> {LOG_INSTANCE_ID=4045264, CALCULATED_STATUS=... [Body clipped after 1000
> chars, total length is 16546230]]. Caused by:
> [org.quartz.JobExecutionException -
> org.apache.camel.TypeConversionException: Error during type conversion from
> type: java.lang.String to the required type:
> org.elasticsearch.action.index.IndexRequest with value
> [{LOG_INSTANCE_ID=4029927, CALCULATED_STATUS=flowError,
> BEGIN_DATE=2015-05-21 16:54:31.45436, END_DATE=2015-05-21 16:54:31.495735,
> MEDIATION_DESCR_CODE=DiffuserMAJOfrCial, DURATION=0.041},
> {LOG_INSTANCE_ID=4140006, CALCULATED_STATUS=flowCompleted,
> BEGIN_DATE=2015-06-01 11:11:31.569414, END_DATE=2015-06-01 11:11:33.109604,
> MEDIATION_DESCR_CODE=RecevoirDemOpeContrat, DURATION=1.54},
> {LOG_INSTANCE_ID=4038409, CALCULATED_STATUS=flowError, BEGIN_DATE=2015-05-22
> 10:18:11.295776, END_DATE=2015-05-22 10:18:11.299125,
> MEDIATION_DESCR_CODE=DiffuserMAJOfrCial, DURATION=0.004},
> {LOG_INSTANCE_ID=4144321, CALCULATED_STATUS=flowCompleted,
> BEGIN_DATE=2015-06-01 15:33:12.37841, END_DATE=2015-06-01 15:33:12.822529,
> MEDIATION_DESCR_CODE=RecevoirDemOpeContrat, DURATION=0.444},
> {LOG_INSTANCE_ID=4039159, CALCULATED_STATUS=flowError, BEGIN_DATE=2015-05-22
> 11:23:56.462823, END_DATE=2015-05-22 11:23:58.15667,
> MEDIATION_DESCR_CODE=DiffuserMAJOfrCial, DURATION=1.694},
> {LOG_INSTANCE_ID=4045264, CALCULATED_STATUS=... [Body clipped after 1000
> chars, total length is 16546230] due java.lang.NullPointerException]
>
> I've tried direct, quartz scheluder, .end(), loop(1), ...
>
> Thank you !
>
>
>
> --
> View this message in context: 
> http://camel.465427.n5.nabble.com/Stop-a-SQL-route-in-Apache-Camel-tp5768252.html
> Sent from the Camel - Users mailing list archive at Nabble.com.



-- 
Claus Ibsen
-----------------
Red Hat, Inc.
Email: cib...@redhat.com
Twitter: davsclaus
Blog: http://davsclaus.com
Author of Camel in Action: http://www.manning.com/ibsen
hawtio: http://hawt.io/
fabric8: http://fabric8.io/

Reply via email to