
Thank you very much for all the support.
I could able to convert XML format to json  using custom flume source.

Now I would need ConvertJSONToSQL processor to insert data into SQL.
I am trying to get hands-on on this processor. Will update you on this.
Meanwhile if any example you could share to use this processor for a sample
json data, then it would be great.


1) I tried using ConvertJSONToSQL processor with the below sample json file:

    {"firstname":"John", "lastname":"Doe"},
    {"firstname":"Anna", "lastname":"Smith"}

2) I created table *details *in the postgreSQL
* select * from details ;*
* firstname | lastname*
*(0 rows)*

3) ConvertJSONToSQL Processor property details are as below:
*Property  *                                               *Value*
JDBC Connection PoolInfo            DBCPConnectionPool
Statement TypeInfo                      INSERT
Table NameInfo                            details
Catalog NameInfo                         No value set
Translate Field NamesInfo             false
Unmatched Field BehaviorInfo       Ignore Unmatched Fields
Update KeysInfo                           No value set

But I am getting the below mentioned error in ConvertJSONToSQL Processor.
2015-10-12 05:15:19,584 ERROR [Timer-Driven Process Thread-1]
ConvertJSONToSQL[id=0e964781-6914-486f-8bb7-214c6a1cd66e] Failed to convert
[resourceClaim=StandardResourceClaim[id=1444483036971-1, container=default,
section=1], offset=115045, length=104],offset=0,name=json,size=104] to a
SQL INSERT statement due to
org.apache.nifi.processor.exception.ProcessException: None of the fields in
the JSON map to the columns defined by the details table; routing to
failure: org.apache.nifi.processor.exception.ProcessException: None of the
fields in the JSON map to the columns defined by the details table

Thanks and Regards,

On Sat, Oct 10, 2015 at 9:45 PM, Joey Echeverria <joe...@gmail.com> wrote:

> I've done something like this by wrapping the command in a shell script:
> http://ingest.tips/2014/12/22/getting-started-with-apache-nifi/
> My use case was slightly different, but I'm pretty sure you can adapt the
> same idea.
> -Joey
> On Oct 10, 2015, at 03:52, Parul Agrawal <parulagrawa...@gmail.com> wrote:
> Hi,
> I actually need to get the data from pipe.
> So the actual command I would need is mkfifo /tmp/packet;tshark -i ens160
> -T pdml >/tmp/packet.
> Is it possible to use ExecuteProcessor for multiple commands ?
> On Sat, Oct 10, 2015 at 1:04 PM, Parul Agrawal <parulagrawa...@gmail.com>
> wrote:
>> Hi,
>> I added custom flume source and when flume source is sending the data to
>> flume sink, below mentioned error is thrown at flume sink.
>>  Administratively Yielded for 1 sec due to processing failure
>> 2015-10-10 02:30:45,027 WARN [Timer-Driven Process Thread-9]
>> o.a.n.c.t.ContinuallyRunProcessorTask Administratively Yielding
>> ExecuteFlumeSink[id=2d08dfe7-4fd1-4a10-9d25-0b007a2c41bf] due to uncaught
>> Exception: java.lang.IllegalStateException: close() called when transaction
>> is OPEN - you must either commit or rollback first
>> 2015-10-10 02:30:45,028 WARN [Timer-Driven Process Thread-9]
>> o.a.n.c.t.ContinuallyRunProcessorTask
>> java.lang.IllegalStateException: close() called when transaction is OPEN
>> - you must either commit or rollback first
>>         at
>> com.google.common.base.Preconditions.checkState(Preconditions.java:172)
>> ~[guava-r05.jar:na]
>>         at
>> org.apache.flume.channel.BasicTransactionSemantics.close(BasicTransactionSemantics.java:179)
>> ~[flume-ng-core-1.6.0.jar:1.6.0]
>>         at org.apache.flume.sink.LoggerSink.process(LoggerSink.java:105)
>> ~[flume-ng-core-1.6.0.jar:1.6.0]
>>         at
>> org.apache.nifi.processors.flume.ExecuteFlumeSink.onTrigger(ExecuteFlumeSink.java:139)
>> ~[na:na]
>>         at
>> org.apache.nifi.processors.flume.AbstractFlumeProcessor.onTrigger(AbstractFlumeProcessor.java:148)
>> ~[na:na]
>>         at
>> org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1077)
>> ~[nifi-framework-core-0.3.0.jar:0.3.0]
>>         at
>> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:127)
>> [nifi-framework-core-0.3.0.jar:0.3.0]
>>         at
>> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:49)
>> [nifi-framework-core-0.3.0.jar:0.3.0]
>>         at
>> org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:119)
>> [nifi-framework-core-0.3.0.jar:0.3.0]
>>         at
>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>> [na:1.7.0_85]
>>         at
>> java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
>> [na:1.7.0_85]
>>         at
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
>> [na:1.7.0_85]
>>         at
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>> [na:1.7.0_85]
>>         at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>> [na:1.7.0_85]
>>         at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>> [na:1.7.0_85]
>>         at java.lang.Thread.run(Thread.java:745) [na:1.7.0_85]
>> 2015-10-10 02:30:46,029 ERROR [Timer-Driven Process Thread-9]
>> o.a.n.processors.flume.ExecuteFlumeSink
>> ExecuteFlumeSink[id=2d08dfe7-4fd1-4a10-9d25-0b007a2c41bf]
>> ExecuteFlumeSink[id=2d08dfe7-4fd1-4a10-9d25-0b007a2c41bf] failed to process
>> due to org.apache.nifi.processor.exception.FlowFileHandlingException:
>> StandardFlowFileRecord[uuid=8832b036-51a4-49cf-9703-fc4ed443ab80,claim=StandardContentClaim
>> [resourceClaim=StandardResourceClaim[id=1444462207782-7, container=default,
>> section=7], offset=180436,
>> length=14078],offset=0,name=8311685679474355,size=14078] is not known in
>> this session (StandardProcessSession[id=218318]); rolling back session:
>> org.apache.nifi.processor.exception.FlowFileHandlingException:
>> StandardFlowFileRecord[uuid=8832b036-51a4-49cf-9703-fc4ed443ab80,claim=StandardContentClaim
>> [resourceClaim=StandardResourceClaim[id=1444462207782-7, container=default,
>> section=7], offset=180436,
>> length=14078],offset=0,name=8311685679474355,size=14078] is not known in
>> this session (StandardProcessSession[id=218318])
>> Any idea what could be wrong in this.
>> Thanks and Regards,
>> Parul
>> On Fri, Oct 9, 2015 at 6:32 PM, Bryan Bende <bbe...@gmail.com> wrote:
>>> Hi Parul,
>>> I think it would be good to keep the convo going on the users list since
>>> there are more people who can offer help there, and also helps everyone
>>> learn new solutions.
>>> The quick answer though is that NiFi has an ExecuteProcess processor
>>> which could execute "tshark -i eth0 -T pdml".
>>> There is not currently an XmlToJson processor, so this could be a place
>>> where you need a custom processor. For simple cases you can use an
>>> EvaluateXPath processor to extract values from the XML, and then a
>>> ReplaceText processor to build a new json document from those extracted
>>> values.
>>> -Bryan
>>> On Fri, Oct 9, 2015 at 3:39 AM, Parul Agrawal <parulagrawa...@gmail.com>
>>> wrote:
>>>> Hi,
>>>> Little more to add.....
>>>>  I need to keep reading the flowfile till END_TAG is received. i.e. we
>>>> may need to concatenate the flowfile data till END_TAG.
>>>> and then convert it to json and call PutFile() processor.
>>>> Thanks and Regards,
>>>> Parul
>>>> On Fri, Oct 9, 2015 at 10:56 AM, Parul Agrawal <
>>>> parulagrawa...@gmail.com> wrote:
>>>>> Hi,
>>>>> Thank you very much again for the guidance provided.
>>>>> Basically I would need a processor which would convert XML file to
>>>>> Json.
>>>>> Currently I have a flume source which is of type "exec" and the
>>>>> command used is "tshark -i eth0 -T pdml".
>>>>> Here Flume source keeps sending data to flume sink. This flow file
>>>>> would be of PDML format.
>>>>> Now I need a processor which would do the following
>>>>> 1) Form a complete XML file based on START TAG (<packet>)
>>>>> and END TAG (</packet>)
>>>>> 2) Once the XML message is formed convert it to json.
>>>>> 3) Place a json file to local directory using PutFile() processor.
>>>>> I am not sure if I could able to explain the processor requirement.
>>>>> Would be really great if you could help me in this.
>>>>> Thanks and Regards,
>>>>> Parul
>>>>> On Thu, Oct 8, 2015 at 10:02 PM, Joey Echeverria <joe...@gmail.com>
>>>>> wrote:
>>>>>> > If you plan to use NiFi for the long term, it might be worth
>>>>>> investing in converting your custom Flume components to NiFi processors. 
>>>>>> We
>>>>>> can help you get started if you need any guidance going that route.
>>>>>> +1. Running Flume sources/sinks is meant as a transition step. It's
>>>>>> really useful if you have a complex Flume flow and want to migrate
>>>>>> only parts of it over to NiFi at a time. I would port any custom
>>>>>> sources and sinks to NiFi once you knew that it would meet your needs
>>>>>> well. NiFi has a lot of documentation on writing processors and the
>>>>>> concepts map pretty well if you're already familiar with Flume's
>>>>>> execution model.
>>>>>> -Joey
>>>>>> On Thu, Oct 8, 2015 at 9:48 AM, Bryan Bende <bbe...@gmail.com> wrote:
>>>>>> >
>>>>>> > Hi Parul,
>>>>>> >
>>>>>> > It is possible to deploy a custom Flume source/sink to NiFi, but
>>>>>> due to the way the Flume processors load the classes for the sources and
>>>>>> sinks, the jar you deploy to the lib directory also needs to include the
>>>>>> other dependencies your source/sink needs (or they each need to
>>>>>> individually be in lib/ directly).
>>>>>> >
>>>>>> > So here is a sample project I created that makes a shaded jar:
>>>>>> > https://github.com/bbende/my-flume-source
>>>>>> >
>>>>>> > It will contain the custom source and following dependencies all in
>>>>>> one jar:
>>>>>> >
>>>>>> > org.apache.flume:my-flume-source:jar:1.0-SNAPSHOT
>>>>>> > +- org.apache.flume:flume-ng-sdk:jar:1.6.0:compile
>>>>>> > +- org.apache.flume:flume-ng-core:jar:1.6.0:compile
>>>>>> > +- org.apache.flume:flume-ng-configuration:jar:1.6.0:compile
>>>>>> > +- org.apache.flume:flume-ng-auth:jar:1.6.0:compile
>>>>>> >   \- com.google.guava:guava:jar:11.0.2:compile
>>>>>> >      \- com.google.code.findbugs:jsr305:jar:1.3.9:compile
>>>>>> >
>>>>>> > I copied that to NiFi lib, restarted, created an ExecuteFlumeSource
>>>>>> processor with the following config:
>>>>>> >
>>>>>> > Source Type = org.apache.flume.MySource
>>>>>> > Agent Name = a1
>>>>>> > Source Name = r1
>>>>>> > Flume Configuration = a1.sources = r1
>>>>>> >
>>>>>> > And I was getting the output in nifi/logs/nifi-bootstrap.log
>>>>>> >
>>>>>> > Keep in mind that this could become risky because any classes found
>>>>>> in the lib directory would be accessible to all NARs in NiFi and would be
>>>>>> found before classes within a NAR because the parent is checked first
>>>>>> during class loading. This example isn't too risky because we are only
>>>>>> bringing in flume jars and one guava jar, but for example if another nar
>>>>>> uses a different version of guava this is going to cause a problem.
>>>>>> >
>>>>>> > If you plan to use NiFi for the long term, it might be worth
>>>>>> investing in converting your custom Flume components to NiFi processors. 
>>>>>> We
>>>>>> can help you get started if you need any guidance going that route.
>>>>>> >
>>>>>> > -Bryan
>>>>>> >
>>>>>> >
>>>>>> > On Thu, Oct 8, 2015 at 2:30 AM, Parul Agrawal <
>>>>>> parulagrawa...@gmail.com> wrote:
>>>>>> >>
>>>>>> >> Hello Bryan,
>>>>>> >>
>>>>>> >> Thank you very much for your response.
>>>>>> >>
>>>>>> >> Is it possible to have customized flume source and sink in Nifi?
>>>>>> >> I have my own customized source and sink? I followed below steps
>>>>>> to add my own customized source but it did not work.
>>>>>> >>
>>>>>> >> 1) Created Maven project and added customized source. (flume.jar
>>>>>> was created after this step)
>>>>>> >> 2) Added flume.jar file to nifi-0.3.0/lib folder.
>>>>>> >> 3) Added flume source processor with the below configuration
>>>>>> >>
>>>>>> >> Property           Value
>>>>>> >> Source Type         com.flume.source.Source
>>>>>> >> Agent Name      a1
>>>>>> >> Source Name         k1.
>>>>>> >>
>>>>>> >> But I am getting the below error in Flume Source Processor.
>>>>>> >> "Failed to run validation due to java.lang.NoClassDefFoundError :
>>>>>> /org/apache/flume/PollableSource."
>>>>>> >>
>>>>>> >> Can you please help me in this regard. Any step/configuration I
>>>>>> missed.
>>>>>> >>
>>>>>> >> Thanks and Regards,
>>>>>> >> Parul
>>>>>> >>
>>>>>> >>
>>>>>> >> On Wed, Oct 7, 2015 at 6:57 PM, Bryan Bende <bbe...@gmail.com>
>>>>>> wrote:
>>>>>> >>>
>>>>>> >>> Hello,
>>>>>> >>>
>>>>>> >>> The NiFi Flume processors are for running Flume sources and sinks
>>>>>> with in NiFi. They don't communicate with an external Flume process.
>>>>>> >>>
>>>>>> >>> In your example you would need an ExecuteFlumeSource configured
>>>>>> to run the netcat source, connected to a ExecuteFlumeSink configured with
>>>>>> the logger.
>>>>>> >>>
>>>>>> >>> -Bryan
>>>>>> >>>
>>>>>> >>> On Wednesday, October 7, 2015, Parul Agrawal <
>>>>>> parulagrawa...@gmail.com> wrote:
>>>>>> >>>>
>>>>>> >>>> Hi,
>>>>>> >>>>
>>>>>> >>>> I was trying to run Nifi Flume processor with the below mentioned
>>>>>> >>>> details but not could bring it up.
>>>>>> >>>>
>>>>>> >>>> I already started flume with the sample configuration file
>>>>>> >>>> =============================================
>>>>>> >>>> # example.conf: A single-node Flume configuration
>>>>>> >>>>
>>>>>> >>>> # Name the components on this agent
>>>>>> >>>> a1.sources = r1
>>>>>> >>>> a1.sinks = k1
>>>>>> >>>> a1.channels = c1
>>>>>> >>>>
>>>>>> >>>> # Describe/configure the source
>>>>>> >>>> a1.sources.r1.type = netcat
>>>>>> >>>> a1.sources.r1.bind = localhost
>>>>>> >>>> a1.sources.r1.port = 44444
>>>>>> >>>>
>>>>>> >>>> # Describe the sink
>>>>>> >>>> a1.sinks.k1.type = logger
>>>>>> >>>>
>>>>>> >>>> # Use a channel which buffers events in memory
>>>>>> >>>> a1.channels.c1.type = memory
>>>>>> >>>> a1.channels.c1.capacity = 1000
>>>>>> >>>> a1.channels.c1.transactionCapacity = 100
>>>>>> >>>>
>>>>>> >>>> # Bind the source and sink to the channel
>>>>>> >>>> a1.sources.r1.channels = c1
>>>>>> >>>> a1.sinks.k1.channel = c1
>>>>>> >>>> =============================================
>>>>>> >>>>
>>>>>> >>>> Command used to start flume : $ bin/flume-ng agent --conf conf
>>>>>> >>>> --conf-file example.conf --name a1
>>>>>> -Dflume.root.logger=INFO,console
>>>>>> >>>>
>>>>>> >>>> In the Nifi browser of ExecuteFlumeSink following configuration
>>>>>> was done:
>>>>>> >>>> Property           Value
>>>>>> >>>> Sink Type         logger
>>>>>> >>>> Agent Name      a1
>>>>>> >>>> Sink Name         k1.
>>>>>> >>>>
>>>>>> >>>> Event is sent to the flume using:
>>>>>> >>>> $ telnet localhost 44444
>>>>>> >>>> Trying
>>>>>> >>>> Connected to localhost.localdomain (
>>>>>> >>>> Escape character is '^]'.
>>>>>> >>>> Hello world! <ENTER>
>>>>>> >>>> OK
>>>>>> >>>>
>>>>>> >>>> But I could not get any data in the nifi flume processor.
>>>>>> Request your
>>>>>> >>>> help in this.
>>>>>> >>>> Do i need to change the example.conf file of flume so that Nifi
>>>>>> Flume
>>>>>> >>>> Sink should get the data.
>>>>>> >>>>
>>>>>> >>>> Thanks and Regards,
>>>>>> >>>> Parul
>>>>>> >>>
>>>>>> >>>
>>>>>> >>>
>>>>>> >>> --
>>>>>> >>> Sent from Gmail Mobile
>>>>>> >>
>>>>>> >>
>>>>>> >

Reply via email to