Jim, You need to use the delay operator for iterative processing. Here is an example:
https://github.com/apache/apex-malhar/blob/master/demos/iteration/src/main/java/com/datatorrent/demos/iteration/Application.java Thomas On Sat, Sep 3, 2016 at 9:05 AM, Jim <[email protected]> wrote: > Tushar, > > I am trying to implement what you described, and I get a validation > error: Loops In Graph, and can't seem to find any reference to this. > > Below I have pasted my Application.java file; what could be causing this > error: > > java.lang.RuntimeException: Error creating local cluster > > at com.datatorrent.stram.LocalModeImpl.getController( > LocalModeImpl.java:78) > at supplies.facility.edi.sellars.ApplicationTest.testApplication( > ApplicationTest.java:30) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at sun.reflect.NativeMethodAccessorImpl.invoke( > NativeMethodAccessorImpl.java:62) > at sun.reflect.DelegatingMethodAccessorImpl.invoke( > DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall( > FrameworkMethod.java:44) > at org.junit.internal.runners.model.ReflectiveCallable.run( > ReflectiveCallable.java:15) > at org.junit.runners.model.FrameworkMethod.invokeExplosively( > FrameworkMethod.java:41) > at org.junit.internal.runners.statements.InvokeMethod. > evaluate(InvokeMethod.java:20) > at org.junit.runners.BlockJUnit4ClassRunner.runNotIgnored( > BlockJUnit4ClassRunner.java:79) > at org.junit.runners.BlockJUnit4ClassRunner.runChild( > BlockJUnit4ClassRunner.java:71) > at org.junit.runners.BlockJUnit4ClassRunner.runChild( > BlockJUnit4ClassRunner.java:49) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:193) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:52) > at org.junit.runners.ParentRunner.runChildren( > ParentRunner.java:191) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:42) > at org.junit.runners.ParentRunner$2.evaluate( > ParentRunner.java:184) > at org.junit.runners.ParentRunner.run(ParentRunner.java:236) > at org.junit.runner.JUnitCore.run(JUnitCore.java:157) > at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs( > JUnit4IdeaTestRunner.java:117) > at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs( > JUnit4IdeaTestRunner.java:42) > at com.intellij.rt.execution.junit.JUnitStarter. > prepareStreamsAndStart(JUnitStarter.java:262) > at com.intellij.rt.execution.junit.JUnitStarter.main( > JUnitStarter.java:84) > Caused by: javax.validation.ValidationException: Loops in graph: > [[OperatorMeta{name=operator997, > operator=FunctionalAcknowledgmentOperator{name=null}, > attributes={Attribute{defaultValue=1024, name=com.datatorrent.api. > Context.OperatorContext.MEMORY_MB, codec=com.datatorrent.api. > StringCodec$Integer2String@7b7fdc8}=192}}, OperatorMeta{name=operator856, > operator=ShipNotificationOperator{name=null}, > attributes={Attribute{defaultValue=1024, > name=com.datatorrent.api.Context.OperatorContext.MEMORY_MB, > codec=com.datatorrent.api.StringCodec$Integer2String@7b7fdc8}=192}}, > OperatorMeta{name=operator855, operator=POAcknowledgmentOperator{name=null}, > attributes={Attribute{defaultValue=1024, name=com.datatorrent.api. > Context.OperatorContext.MEMORY_MB, codec=com.datatorrent.api. > StringCodec$Integer2String@7b7fdc8}=192}}, OperatorMeta{name=ediRouter, > operator=EDIRoutingOperator{name=null}, > attributes={Attribute{defaultValue=1024, > name=com.datatorrent.api.Context.OperatorContext.MEMORY_MB, > codec=com.datatorrent.api.StringCodec$Integer2String@7b7fdc8}=512}}, > OperatorMeta{name=operator810, operator=InvoiceOperator{name=null}, > attributes={Attribute{defaultValue=1024, name=com.datatorrent.api. > Context.OperatorContext.MEMORY_MB, codec=com.datatorrent.api. > StringCodec$Integer2String@7b7fdc8}=192}}]] > at com.datatorrent.stram.plan.logical.LogicalPlan.validate( > LogicalPlan.java:1775) > at com.datatorrent.stram.StramLocalCluster.<init>( > StramLocalCluster.java:278) > at com.datatorrent.stram.LocalModeImpl.getController( > LocalModeImpl.java:76) > ... 23 more > > ============================================ Application.java starts > here ============================================================ > > /** > * Put your copyright and license info here. > */ > package supplies.facility.edi.sellars; > > import com.datatorrent.api.DAG; > import com.datatorrent.api.DAG.Locality; > import com.datatorrent.api.StreamingApplication; > import com.datatorrent.api.annotation.ApplicationAnnotation; > import com.datatorrent.contrib.kinesis.AbstractKinesisInputOperator; > import com.datatorrent.contrib.kinesis.KinesisConsumer; > import com.datatorrent.contrib.kinesis.KinesisStringInputOperator; > import com.datatorrent.contrib.kinesis.ShardManager; > import com.datatorrent.lib.db.jdbc.JdbcStore; > import com.datatorrent.netlet.util.DTThrowable; > import org.apache.hadoop.conf.Configuration; > import org.slf4j.Logger; > import org.slf4j.LoggerFactory; > import supplies.facility.apex.contrib.elasticsearch. > ElasticSearchMapOutputOperator; > import supplies.facility.edi.helpers.EdiSmtpOutputOperator; > import supplies.facility.edi.helpers.KinesisEdiTransactionSetInputO > perator; > import supplies.facility.edi.helpers.SqsTransactionSetInputOperator; > import supplies.facility.edi.helpers.StatefulShardManager; > > import java.sql.PreparedStatement; > import java.sql.ResultSet; > > @ApplicationAnnotation(name="FsEdiSellars") > public class Application implements StreamingApplication { > private final Locality locality = null; > > private static final Logger LOG = LoggerFactory.getLogger( > Application.class); > > private JdbcStore jdbcStore; > > @Override > public void populateDAG(DAG dag, Configuration conf) { > > try { > //Read from SQS FS_EDI_SELLARS > //SQSConnectionFactory > SqsTransactionSetInputOperator sqsReader = > dag.addOperator("TransactionsQueue", > new SqsTransactionSetInputOperator()); > > // Define all the operators > EDIRoutingOperator ediRouter = dag.addOperator("ediRouter", new > EDIRoutingOperator()); > InvoiceOperator operator810 = dag.addOperator("operator810", new > InvoiceOperator()); > POAcknowledgmentOperator operator855 = > dag.addOperator("operator855", new POAcknowledgmentOperator()); > ShipNotificationOperator operator856 = > dag.addOperator("operator856", new ShipNotificationOperator()); > FunctionalAcknowledgmentOperator operator997 = > dag.addOperator("operator997", new FunctionalAcknowledgmentOperator()); > > // Set up the EDIRoutingOperator that takes the inbound EDI data, > and puts it into multiple > // outbound streams for each of the applicable EDI transactions: > // 810 = Invoice > // 855 = Purchase Order Acknowledgment > // 856 = Automatic Ship Notification(ASN) > // 997 = Functional Acknowledgment > // as well as one for an elasticsearch logger handler to log all > incoming transactions. > dag.addStream("ediInboundStream", sqsReader.output, > ediRouter.inboundEdiPort).setLocality(locality.CONTAINER_LOCAL); > dag.addStream("loopBack855", operator855.loopbackPort855, > ediRouter.loopbackPort855).setLocality(locality.CONTAINER_LOCAL); > dag.addStream("loopBack856", operator856.loopbackPort856, > ediRouter.loopbackPort856).setLocality(locality.CONTAINER_LOCAL); > dag.addStream("loopBack810", operator810.loopbackPort810, > ediRouter.loopbackPort810).setLocality(locality.CONTAINER_LOCAL); > dag.addStream("loopBack997", operator997.loopbackPort997, > ediRouter.loopbackPort997).setLocality(locality.CONTAINER_LOCAL); > > // Set up the Invoice operator and tie it to the output stream > created in the EDIRouter > dag.addStream("invoices", ediRouter.outputPort810, > operator810.InputPort).setLocality(locality); > > // Set up the Purchase Order Acknowledgment operator and tie it to > the output stream created in the EDIRouter > dag.addStream("PoAcknowledgments", ediRouter.outputPort855, > operator855.InputPort).setLocality(locality); > > // Set up the Functional Acknowledgment operator and tie it to the > output stream created in the > dag.addStream("shipNotifications", ediRouter.outputPort856, > operator856.InputPort).setLocality(locality); > > // Set up the Functional Acknowledgment operator and tie it to the > output stream created in the EDIRouter > dag.addStream("functionalAcknowledgments", ediRouter.outputPort997, > operator997.InputPort).setLocality(locality); > > // Set up the elasticsearch operator and tie it to the output stream > created in the EDIRouter > ElasticSearchMapOutputOperator operatorEs = > dag.addOperator("operatorEs", new ElasticSearchMapOutputOperator()); > dag.addStream("esLogger", ediRouter.outputPortEs, > operatorEs.input).setLocality(locality); > > // Set up the smtp output operator to use for the Ship Notificaiton > messages > EdiSmtpOutputOperator operatorSmtp855 = > dag.addOperator("operatorSmtp855", > new EdiSmtpOutputOperator()); > dag.addStream("smtpOutput855", operator855.outputEmails, > operatorSmtp855.input).setLocality(locality); > > // Set up the smtp output operator to use for the Ship Notificaiton > messages > EdiSmtpOutputOperator operatorSmtp856 = > dag.addOperator("operatorSmtp856", > new EdiSmtpOutputOperator()); > dag.addStream("smtpOutput856", operator856.outputEmails, > operatorSmtp856.input).setLocality(locality); > > } catch (Exception exc) { > DTThrowable.rethrow(exc); > } > } > } > > > ============================================== here is one of the > operator files, stripped of everything but the module declarations > ============= > > import com.datatorrent.api.DefaultOutputPort; > import com.datatorrent.api.annotation.Stateless; > import org.milyn.payload.StringResult; > import org.slf4j.Logger; > import org.slf4j.LoggerFactory; > import org.w3c.dom.Document; > import supplies.facility.edi.helpers.AbstractEDIProcessor; > > /** > * This Operator handles the processing for the EDI 810, Invoice > transactions. > * > * As invoices arrive, they are added to the accounting invoicing system > for payment, > * and the order status is updated to show that the item is now closed. > * > * Created by jradke on 2/7/2016. > */ > @Stateless > public class InvoiceOperator extends AbstractEDIProcessor { > > private static final Logger logger = LoggerFactory.getLogger( > InvoiceOperator.class); > > public InvoiceOperator() { > super("files/smooks-config810.xml"); > } > > public transient DefaultOutputPort<Long> loopbackPort810= new > DefaultOutputPort<Long>(); > > @Override > protected void processXML(Document xmlDocument) { > logger.trace("Entered InvoiceOperator processXML"); > } > } > > Thanks! > > Jim > > -----Original Message----- > From: Tushar Gosavi [mailto:[email protected]] > Sent: Thursday, September 1, 2016 1:06 PM > To: [email protected] > Subject: Re: HDHT question - looking for the datatorrent gurus! > > Hi Jim, > > Yes this is what I had in mind, The manage state needs to have separate > input for each of the 5 operators. The platform does not support connecting > multiple output port to a single input port, but you could achieve similar > effect using stream merge operator > (https://github.com/apache/apex-malhar/blob/3ce83708f795b081d564be357a8333 > 928154398e/library/src/main/java/com/datatorrent/lib/ > stream/StreamMerger.java) > > - Tushar. > > > On Thu, Sep 1, 2016 at 10:37 PM, Jim <[email protected]> wrote: > > Tushar, > > > > Funny that you described it that way, as that is exactly what I was > thinking about this morning. > > > > > > So the flow would be: > > > > > > > > Router Operator > > > > > > | > > > > > > Managed State Operator > > > > > | > > ------------------------------ > ------------------------------------------------------------ > --------------------------------------------- > > | > | > | | > > > > General Acknowledgement Detailed Acknowledgement > Ship Notification > Invoice > > > > | > | > | | > > ------------------------------ > ------------------------------------------------------------ > --------------------------------------------- > > > | > > ------------------------------ > ------------------------------------------------------------ > ------------------------------------------------ > > / each of the 4 operators at the end of > processing emits a record back to Managed State Operator / > > > > ---------------------------------------------------------------------- > > -------------------------------------------------------------------- > > > > > > In this scenario, would the managed state operator just have 1 input, > > that all the other operators emit to, or would it need to have separate > inputs for each of the 5 operators that would be emitting to it? > > > > This is what you were describing too, correct? > > > > Thanks, > > > > Jim > > > > -----Original Message----- > > From: Tushar Gosavi [mailto:[email protected]] > > Sent: Thursday, September 1, 2016 11:49 AM > > To: [email protected] > > Subject: Re: HDHT question - looking for the datatorrent gurus! > > > > Hi Jim, > > > > Currently HDHT is accessible only to single operator in a DAG. Single > HDHT store can not be managed by two different operator at a time which > could cause metadata corruption. Theoretically HDHT bucket could be read > from multiple operators, but only one writer is allowed. > > > > In your case a stage in transaction is processed completely by different > operator and then only next stage can start. It could still be achieved by > using a single operator which manages HDHT state, and having a loop in DAG > to send completed transaction ids to sequencer. > > > > - Sequence operator will emit transaction to transaction processing > operator. > > - If it receives an out of order transaction it will note it down in > HDHT. > > - The processing operator will send completed transaction id on a port > which is connected back to sequence operator. > > - On receiving data on this loopback port, sequence operator will update > HDHT and search for next transaction in order, which could be stored in > HDHT and will emit to next processing operator. > > > > - Tushar. > > > > > > On Sat, Aug 27, 2016 at 1:31 AM, Jim <[email protected]> wrote: > >> Good afternoon, > >> > >> > >> > >> I have an apex application where I may receive edi transactions, but > >> sometimes they arrive out of order and I want to hold any out of > >> sequence transactions till the correct time in the flow to process them. > >> > >> > >> > >> For example for a standard order, we will receive from the remote > vendor: > >> > >> > >> > >> 1.) General Acknowledgement > >> > >> 2.) Detailed Acknowledgement > >> > >> 3.) Ship Notification > >> > >> 4.) Invoice > >> > >> > >> > >> They are supposed to be sent and received in that order. > >> > >> > >> > >> However sometimes vendors systems have problems, etc. so they send > >> the all of these at the same time, and then we can receive them out of > sequence. > >> Data packets for these are very small, say from 1 to 512 bytes, and > >> the only time they will be out of sequence, we will receive them very > >> closely together. > >> > >> > >> > >> I am trying to think of the best way to do this in my datatorrent / > >> Hadoop / yarn facilities, instead of creating a datatable in > >> postgreSQl and using that. > >> > >> > >> > >> Can I create a flow that works like this (I am not sure if this makes > >> sense, or is the best way to solve my problem, while keeping state, > >> etc. maintained for all the operators): > >> > >> > >> > >> 1.) In the inbound transaction router, check the hdht store for the > order > >> number, if it doesn’t exist, this means it is a new order, if the > >> transaction trying to process is the general acknowledgment, emit the > >> data to the general acknowledgement operator; if it is not – store > >> the transaction data into the correct bucket identifying the > >> transaction is it for, as well as the next step to be the general > >> acknowledgement in HDHT by order number. > >> > >> 2.) Say the next transaction is the ship notification, in the > router, we > >> would check the HDHT store, see this is not the next expected > >> transaction (say it is supposed to be the detail acknowledgement), so > >> we would just post the data for the ship notification into HDHT the > store and say we are done. > >> > >> 3.) Say we now receive the detailed acknowledgement for an order > whose > >> next step IS the detailed acknowledgement, we would see this is the > >> correct next transaction, emit it to the detailed acknowledgement > >> operator, and update the HDHT store to show that the next transaction > >> should be the ship notification. NOTE: we can’t emit the ship > >> notification yet, till we have confirmed that the detailed > ackkowledgment has been completed. > >> > >> 4.) In each of the 4 transaction operators at the end of the > processing, > >> we would update the HDHT store to show the next expected step, and if > >> we already received data for the next expected step pull it from the > >> HDHT store, and write the transaction into our SQS queue which is the > >> input into the inbound transaction router at the beginning of the > >> application, so it processes through the system. > >> > >> > >> > >> I believe HDHT can be used to pass data throughout an entire > >> application, and is not limited to just a per operator basis, correct? > >> > >> > >> > >> Any comments / feedback? > >> > >> > >> > >> Thanks, > >> > >> > >> > >> Jim >
