I think ConvertJSONToSQL expects a flat document of key/value pairs, or an
array of flat documents. So I think your JSON would be:

[
    {"firstname":"John", "lastname":"Doe"},
    {"firstname":"Anna", "lastname":"Smith"}
]

The table name will come from the Table Name property.

Let us know if this doesn't work.

-Bryan


On Mon, Oct 12, 2015 at 12:19 PM, Parul Agrawal <parulagrawa...@gmail.com>
wrote:

> Hi,
>
> Thank you very much for all the support.
> I could able to convert XML format to json  using custom flume source.
>
> Now I would need ConvertJSONToSQL processor to insert data into SQL.
> I am trying to get hands-on on this processor. Will update you on this.
> Meanwhile if any example you could share to use this processor for a sample
> json data, then it would be great.
>
> ===============
>
> 1) I tried using ConvertJSONToSQL processor with the below sample json
> file:
>
> "details":[
>     {"firstname":"John", "lastname":"Doe"},
>     {"firstname":"Anna", "lastname":"Smith"}
> ]
>
> 2) I created table *details *in the postgreSQL
> * select * from details ;*
> * firstname | lastname*
> *-----------+----------*
> *(0 rows)*
>
> 3) ConvertJSONToSQL Processor property details are as below:
> *Property  *                                               *Value*
> JDBC Connection PoolInfo            DBCPConnectionPool
> Statement TypeInfo                      INSERT
> Table NameInfo                            details
> Catalog NameInfo                         No value set
> Translate Field NamesInfo             false
> Unmatched Field BehaviorInfo       Ignore Unmatched Fields
> Update KeysInfo                           No value set
>
> But I am getting the below mentioned error in ConvertJSONToSQL Processor.
> 2015-10-12 05:15:19,584 ERROR [Timer-Driven Process Thread-1]
> o.a.n.p.standard.ConvertJSONToSQL
> ConvertJSONToSQL[id=0e964781-6914-486f-8bb7-214c6a1cd66e] Failed to convert
> StandardFlowFileRecord[uuid=3a58716b-1474-4d75-91c1-e2fc3b9175ba,claim=StandardContentClaim
> [resourceClaim=StandardResourceClaim[id=1444483036971-1, container=default,
> section=1], offset=115045, length=104],offset=0,name=json,size=104] to a
> SQL INSERT statement due to
> org.apache.nifi.processor.exception.ProcessException: None of the fields in
> the JSON map to the columns defined by the details table; routing to
> failure: org.apache.nifi.processor.exception.ProcessException: None of the
> fields in the JSON map to the columns defined by the details table
>
> Thanks and Regards,
> Parul
>
> On Sat, Oct 10, 2015 at 9:45 PM, Joey Echeverria <joe...@gmail.com> wrote:
>
>> I've done something like this by wrapping the command in a shell script:
>>
>> http://ingest.tips/2014/12/22/getting-started-with-apache-nifi/
>>
>> My use case was slightly different, but I'm pretty sure you can adapt the
>> same idea.
>>
>> -Joey
>>
>> On Oct 10, 2015, at 03:52, Parul Agrawal <parulagrawa...@gmail.com>
>> wrote:
>>
>> Hi,
>>
>> I actually need to get the data from pipe.
>> So the actual command I would need is mkfifo /tmp/packet;tshark -i ens160
>> -T pdml >/tmp/packet.
>> Is it possible to use ExecuteProcessor for multiple commands ?
>>
>> On Sat, Oct 10, 2015 at 1:04 PM, Parul Agrawal <parulagrawa...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> I added custom flume source and when flume source is sending the data to
>>> flume sink, below mentioned error is thrown at flume sink.
>>>
>>>  Administratively Yielded for 1 sec due to processing failure
>>> 2015-10-10 02:30:45,027 WARN [Timer-Driven Process Thread-9]
>>> o.a.n.c.t.ContinuallyRunProcessorTask Administratively Yielding
>>> ExecuteFlumeSink[id=2d08dfe7-4fd1-4a10-9d25-0b007a2c41bf] due to uncaught
>>> Exception: java.lang.IllegalStateException: close() called when transaction
>>> is OPEN - you must either commit or rollback first
>>> 2015-10-10 02:30:45,028 WARN [Timer-Driven Process Thread-9]
>>> o.a.n.c.t.ContinuallyRunProcessorTask
>>> java.lang.IllegalStateException: close() called when transaction is OPEN
>>> - you must either commit or rollback first
>>>         at
>>> com.google.common.base.Preconditions.checkState(Preconditions.java:172)
>>> ~[guava-r05.jar:na]
>>>         at
>>> org.apache.flume.channel.BasicTransactionSemantics.close(BasicTransactionSemantics.java:179)
>>> ~[flume-ng-core-1.6.0.jar:1.6.0]
>>>         at org.apache.flume.sink.LoggerSink.process(LoggerSink.java:105)
>>> ~[flume-ng-core-1.6.0.jar:1.6.0]
>>>         at
>>> org.apache.nifi.processors.flume.ExecuteFlumeSink.onTrigger(ExecuteFlumeSink.java:139)
>>> ~[na:na]
>>>         at
>>> org.apache.nifi.processors.flume.AbstractFlumeProcessor.onTrigger(AbstractFlumeProcessor.java:148)
>>> ~[na:na]
>>>         at
>>> org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1077)
>>> ~[nifi-framework-core-0.3.0.jar:0.3.0]
>>>         at
>>> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:127)
>>> [nifi-framework-core-0.3.0.jar:0.3.0]
>>>         at
>>> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:49)
>>> [nifi-framework-core-0.3.0.jar:0.3.0]
>>>         at
>>> org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:119)
>>> [nifi-framework-core-0.3.0.jar:0.3.0]
>>>         at
>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>>> [na:1.7.0_85]
>>>         at
>>> java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
>>> [na:1.7.0_85]
>>>         at
>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
>>> [na:1.7.0_85]
>>>         at
>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>>> [na:1.7.0_85]
>>>         at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>> [na:1.7.0_85]
>>>         at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>> [na:1.7.0_85]
>>>         at java.lang.Thread.run(Thread.java:745) [na:1.7.0_85]
>>> 2015-10-10 02:30:46,029 ERROR [Timer-Driven Process Thread-9]
>>> o.a.n.processors.flume.ExecuteFlumeSink
>>> ExecuteFlumeSink[id=2d08dfe7-4fd1-4a10-9d25-0b007a2c41bf]
>>> ExecuteFlumeSink[id=2d08dfe7-4fd1-4a10-9d25-0b007a2c41bf] failed to process
>>> due to org.apache.nifi.processor.exception.FlowFileHandlingException:
>>> StandardFlowFileRecord[uuid=8832b036-51a4-49cf-9703-fc4ed443ab80,claim=StandardContentClaim
>>> [resourceClaim=StandardResourceClaim[id=1444462207782-7, container=default,
>>> section=7], offset=180436,
>>> length=14078],offset=0,name=8311685679474355,size=14078] is not known in
>>> this session (StandardProcessSession[id=218318]); rolling back session:
>>> org.apache.nifi.processor.exception.FlowFileHandlingException:
>>> StandardFlowFileRecord[uuid=8832b036-51a4-49cf-9703-fc4ed443ab80,claim=StandardContentClaim
>>> [resourceClaim=StandardResourceClaim[id=1444462207782-7, container=default,
>>> section=7], offset=180436,
>>> length=14078],offset=0,name=8311685679474355,size=14078] is not known in
>>> this session (StandardProcessSession[id=218318])
>>>
>>> Any idea what could be wrong in this.
>>>
>>> Thanks and Regards,
>>> Parul
>>>
>>>
>>> On Fri, Oct 9, 2015 at 6:32 PM, Bryan Bende <bbe...@gmail.com> wrote:
>>>
>>>> Hi Parul,
>>>>
>>>> I think it would be good to keep the convo going on the users list
>>>> since there are more people who can offer help there, and also helps
>>>> everyone learn new solutions.
>>>>
>>>> The quick answer though is that NiFi has an ExecuteProcess processor
>>>> which could execute "tshark -i eth0 -T pdml".
>>>>
>>>> There is not currently an XmlToJson processor, so this could be a place
>>>> where you need a custom processor. For simple cases you can use an
>>>> EvaluateXPath processor to extract values from the XML, and then a
>>>> ReplaceText processor to build a new json document from those extracted
>>>> values.
>>>>
>>>> -Bryan
>>>>
>>>>
>>>> On Fri, Oct 9, 2015 at 3:39 AM, Parul Agrawal <parulagrawa...@gmail.com
>>>> > wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> Little more to add.....
>>>>>  I need to keep reading the flowfile till END_TAG is received. i.e. we
>>>>> may need to concatenate the flowfile data till END_TAG.
>>>>> and then convert it to json and call PutFile() processor.
>>>>>
>>>>> Thanks and Regards,
>>>>> Parul
>>>>>
>>>>>
>>>>>
>>>>> On Fri, Oct 9, 2015 at 10:56 AM, Parul Agrawal <
>>>>> parulagrawa...@gmail.com> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> Thank you very much again for the guidance provided.
>>>>>> Basically I would need a processor which would convert XML file to
>>>>>> Json.
>>>>>>
>>>>>> Currently I have a flume source which is of type "exec" and the
>>>>>> command used is "tshark -i eth0 -T pdml".
>>>>>>
>>>>>> Here Flume source keeps sending data to flume sink. This flow file
>>>>>> would be of PDML format.
>>>>>>
>>>>>> Now I need a processor which would do the following
>>>>>>
>>>>>> 1) Form a complete XML file based on START TAG (<packet>)
>>>>>> and END TAG (</packet>)
>>>>>> 2) Once the XML message is formed convert it to json.
>>>>>> 3) Place a json file to local directory using PutFile() processor.
>>>>>>
>>>>>> I am not sure if I could able to explain the processor requirement.
>>>>>> Would be really great if you could help me in this.
>>>>>>
>>>>>> Thanks and Regards,
>>>>>> Parul
>>>>>>
>>>>>>
>>>>>> On Thu, Oct 8, 2015 at 10:02 PM, Joey Echeverria <joe...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> > If you plan to use NiFi for the long term, it might be worth
>>>>>>> investing in converting your custom Flume components to NiFi 
>>>>>>> processors. We
>>>>>>> can help you get started if you need any guidance going that route.
>>>>>>>
>>>>>>> +1. Running Flume sources/sinks is meant as a transition step. It's
>>>>>>> really useful if you have a complex Flume flow and want to migrate
>>>>>>> only parts of it over to NiFi at a time. I would port any custom
>>>>>>> sources and sinks to NiFi once you knew that it would meet your needs
>>>>>>> well. NiFi has a lot of documentation on writing processors and the
>>>>>>> concepts map pretty well if you're already familiar with Flume's
>>>>>>> execution model.
>>>>>>>
>>>>>>> -Joey
>>>>>>>
>>>>>>> On Thu, Oct 8, 2015 at 9:48 AM, Bryan Bende <bbe...@gmail.com>
>>>>>>> wrote:
>>>>>>> >
>>>>>>> > Hi Parul,
>>>>>>> >
>>>>>>> > It is possible to deploy a custom Flume source/sink to NiFi, but
>>>>>>> due to the way the Flume processors load the classes for the sources and
>>>>>>> sinks, the jar you deploy to the lib directory also needs to include the
>>>>>>> other dependencies your source/sink needs (or they each need to
>>>>>>> individually be in lib/ directly).
>>>>>>> >
>>>>>>> > So here is a sample project I created that makes a shaded jar:
>>>>>>> > https://github.com/bbende/my-flume-source
>>>>>>> >
>>>>>>> > It will contain the custom source and following dependencies all
>>>>>>> in one jar:
>>>>>>> >
>>>>>>> > org.apache.flume:my-flume-source:jar:1.0-SNAPSHOT
>>>>>>> > +- org.apache.flume:flume-ng-sdk:jar:1.6.0:compile
>>>>>>> > +- org.apache.flume:flume-ng-core:jar:1.6.0:compile
>>>>>>> > +- org.apache.flume:flume-ng-configuration:jar:1.6.0:compile
>>>>>>> > +- org.apache.flume:flume-ng-auth:jar:1.6.0:compile
>>>>>>> >   \- com.google.guava:guava:jar:11.0.2:compile
>>>>>>> >      \- com.google.code.findbugs:jsr305:jar:1.3.9:compile
>>>>>>> >
>>>>>>> > I copied that to NiFi lib, restarted, created an
>>>>>>> ExecuteFlumeSource processor with the following config:
>>>>>>> >
>>>>>>> > Source Type = org.apache.flume.MySource
>>>>>>> > Agent Name = a1
>>>>>>> > Source Name = r1
>>>>>>> > Flume Configuration = a1.sources = r1
>>>>>>> >
>>>>>>> > And I was getting the output in nifi/logs/nifi-bootstrap.log
>>>>>>> >
>>>>>>> > Keep in mind that this could become risky because any classes
>>>>>>> found in the lib directory would be accessible to all NARs in NiFi and
>>>>>>> would be found before classes within a NAR because the parent is checked
>>>>>>> first during class loading. This example isn't too risky because we are
>>>>>>> only bringing in flume jars and one guava jar, but for example if 
>>>>>>> another
>>>>>>> nar uses a different version of guava this is going to cause a problem.
>>>>>>> >
>>>>>>> > If you plan to use NiFi for the long term, it might be worth
>>>>>>> investing in converting your custom Flume components to NiFi 
>>>>>>> processors. We
>>>>>>> can help you get started if you need any guidance going that route.
>>>>>>> >
>>>>>>> > -Bryan
>>>>>>> >
>>>>>>> >
>>>>>>> > On Thu, Oct 8, 2015 at 2:30 AM, Parul Agrawal <
>>>>>>> parulagrawa...@gmail.com> wrote:
>>>>>>> >>
>>>>>>> >> Hello Bryan,
>>>>>>> >>
>>>>>>> >> Thank you very much for your response.
>>>>>>> >>
>>>>>>> >> Is it possible to have customized flume source and sink in Nifi?
>>>>>>> >> I have my own customized source and sink? I followed below steps
>>>>>>> to add my own customized source but it did not work.
>>>>>>> >>
>>>>>>> >> 1) Created Maven project and added customized source. (flume.jar
>>>>>>> was created after this step)
>>>>>>> >> 2) Added flume.jar file to nifi-0.3.0/lib folder.
>>>>>>> >> 3) Added flume source processor with the below configuration
>>>>>>> >>
>>>>>>> >> Property           Value
>>>>>>> >> Source Type         com.flume.source.Source
>>>>>>> >> Agent Name      a1
>>>>>>> >> Source Name         k1.
>>>>>>> >>
>>>>>>> >> But I am getting the below error in Flume Source Processor.
>>>>>>> >> "Failed to run validation due to java.lang.NoClassDefFoundError :
>>>>>>> /org/apache/flume/PollableSource."
>>>>>>> >>
>>>>>>> >> Can you please help me in this regard. Any step/configuration I
>>>>>>> missed.
>>>>>>> >>
>>>>>>> >> Thanks and Regards,
>>>>>>> >> Parul
>>>>>>> >>
>>>>>>> >>
>>>>>>> >> On Wed, Oct 7, 2015 at 6:57 PM, Bryan Bende <bbe...@gmail.com>
>>>>>>> wrote:
>>>>>>> >>>
>>>>>>> >>> Hello,
>>>>>>> >>>
>>>>>>> >>> The NiFi Flume processors are for running Flume sources and
>>>>>>> sinks with in NiFi. They don't communicate with an external Flume 
>>>>>>> process.
>>>>>>> >>>
>>>>>>> >>> In your example you would need an ExecuteFlumeSource configured
>>>>>>> to run the netcat source, connected to a ExecuteFlumeSink configured 
>>>>>>> with
>>>>>>> the logger.
>>>>>>> >>>
>>>>>>> >>> -Bryan
>>>>>>> >>>
>>>>>>> >>> On Wednesday, October 7, 2015, Parul Agrawal <
>>>>>>> parulagrawa...@gmail.com> wrote:
>>>>>>> >>>>
>>>>>>> >>>> Hi,
>>>>>>> >>>>
>>>>>>> >>>> I was trying to run Nifi Flume processor with the below
>>>>>>> mentioned
>>>>>>> >>>> details but not could bring it up.
>>>>>>> >>>>
>>>>>>> >>>> I already started flume with the sample configuration file
>>>>>>> >>>> =============================================
>>>>>>> >>>> # example.conf: A single-node Flume configuration
>>>>>>> >>>>
>>>>>>> >>>> # Name the components on this agent
>>>>>>> >>>> a1.sources = r1
>>>>>>> >>>> a1.sinks = k1
>>>>>>> >>>> a1.channels = c1
>>>>>>> >>>>
>>>>>>> >>>> # Describe/configure the source
>>>>>>> >>>> a1.sources.r1.type = netcat
>>>>>>> >>>> a1.sources.r1.bind = localhost
>>>>>>> >>>> a1.sources.r1.port = 44444
>>>>>>> >>>>
>>>>>>> >>>> # Describe the sink
>>>>>>> >>>> a1.sinks.k1.type = logger
>>>>>>> >>>>
>>>>>>> >>>> # Use a channel which buffers events in memory
>>>>>>> >>>> a1.channels.c1.type = memory
>>>>>>> >>>> a1.channels.c1.capacity = 1000
>>>>>>> >>>> a1.channels.c1.transactionCapacity = 100
>>>>>>> >>>>
>>>>>>> >>>> # Bind the source and sink to the channel
>>>>>>> >>>> a1.sources.r1.channels = c1
>>>>>>> >>>> a1.sinks.k1.channel = c1
>>>>>>> >>>> =============================================
>>>>>>> >>>>
>>>>>>> >>>> Command used to start flume : $ bin/flume-ng agent --conf conf
>>>>>>> >>>> --conf-file example.conf --name a1
>>>>>>> -Dflume.root.logger=INFO,console
>>>>>>> >>>>
>>>>>>> >>>> In the Nifi browser of ExecuteFlumeSink following configuration
>>>>>>> was done:
>>>>>>> >>>> Property           Value
>>>>>>> >>>> Sink Type         logger
>>>>>>> >>>> Agent Name      a1
>>>>>>> >>>> Sink Name         k1.
>>>>>>> >>>>
>>>>>>> >>>> Event is sent to the flume using:
>>>>>>> >>>> $ telnet localhost 44444
>>>>>>> >>>> Trying 127.0.0.1...
>>>>>>> >>>> Connected to localhost.localdomain (127.0.0.1).
>>>>>>> >>>> Escape character is '^]'.
>>>>>>> >>>> Hello world! <ENTER>
>>>>>>> >>>> OK
>>>>>>> >>>>
>>>>>>> >>>> But I could not get any data in the nifi flume processor.
>>>>>>> Request your
>>>>>>> >>>> help in this.
>>>>>>> >>>> Do i need to change the example.conf file of flume so that Nifi
>>>>>>> Flume
>>>>>>> >>>> Sink should get the data.
>>>>>>> >>>>
>>>>>>> >>>> Thanks and Regards,
>>>>>>> >>>> Parul
>>>>>>> >>>
>>>>>>> >>>
>>>>>>> >>>
>>>>>>> >>> --
>>>>>>> >>> Sent from Gmail Mobile
>>>>>>> >>
>>>>>>> >>
>>>>>>> >
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to