Hi Thomas, I saw that example, and wasn’t quite following it.
Is the DelayOperator, just a generic pass-through, that we put our output to, and then read from in the interative operator we want to go back to so that it handles the correct windowing, etc? Jim From: Thomas Weise [mailto:[email protected]] Sent: Saturday, September 3, 2016 11:47 AM To: [email protected] Subject: Re: HDHT question - looking for the datatorrent gurus! Jim, You need to use the delay operator for iterative processing. Here is an example: https://github.com/apache/apex-malhar/blob/master/demos/iteration/src/main/java/com/datatorrent/demos/iteration/Application.java Thomas On Sat, Sep 3, 2016 at 9:05 AM, Jim <[email protected]<mailto:[email protected]>> wrote: Tushar, I am trying to implement what you described, and I get a validation error: Loops In Graph, and can't seem to find any reference to this. Below I have pasted my Application.java file; what could be causing this error: java.lang.RuntimeException: Error creating local cluster at com.datatorrent.stram.LocalModeImpl.getController(LocalModeImpl.java:78) at supplies.facility.edi.sellars.ApplicationTest.testApplication(ApplicationTest.java:30) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:44) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:15) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:41) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:20) at org.junit.runners.BlockJUnit4ClassRunner.runNotIgnored(BlockJUnit4ClassRunner.java:79) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:71) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:49) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:193) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:52) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:191) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:42) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:184) at org.junit.runners.ParentRunner.run(ParentRunner.java:236) at org.junit.runner.JUnitCore.run(JUnitCore.java:157) at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:117) at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:42) at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:262) at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:84) Caused by: javax.validation.ValidationException: Loops in graph: [[OperatorMeta{name=operator997, operator=FunctionalAcknowledgmentOperator{name=null}, attributes={Attribute{defaultValue=1024, name=com.datatorrent.api.Context.OperatorContext.MEMORY_MB, codec=com.datatorrent.api.StringCodec$Integer2String@7b7fdc8}=192}<mailto:codec=com.datatorrent.api.StringCodec$Integer2String@7b7fdc8%7d=192%7d>}, OperatorMeta{name=operator856, operator=ShipNotificationOperator{name=null}, attributes={Attribute{defaultValue=1024, name=com.datatorrent.api.Context.OperatorContext.MEMORY_MB, codec=com.datatorrent.api.StringCodec$Integer2String@7b7fdc8}=192}<mailto:codec=com.datatorrent.api.StringCodec$Integer2String@7b7fdc8%7d=192%7d>}, OperatorMeta{name=operator855, operator=POAcknowledgmentOperator{name=null}, attributes={Attribute{defaultValue=1024, name=com.datatorrent.api.Context.OperatorContext.MEMORY_MB, codec=com.datatorrent.api.StringCodec$Integer2String@7b7fdc8}=192}<mailto:codec=com.datatorrent.api.StringCodec$Integer2String@7b7fdc8%7d=192%7d>}, OperatorMeta{name=ediRouter, operator=EDIRoutingOperator{name=null}, attributes={Attribute{defaultValue=1024, name=com.datatorrent.api.Context.OperatorContext.MEMORY_MB, codec=com.datatorrent.api.StringCodec$Integer2String@7b7fdc8}=512}<mailto:codec=com.datatorrent.api.StringCodec$Integer2String@7b7fdc8%7d=512%7d>}, OperatorMeta{name=operator810, operator=InvoiceOperator{name=null}, attributes={Attribute{defaultValue=1024, name=com.datatorrent.api.Context.OperatorContext.MEMORY_MB, codec=com.datatorrent.api.StringCodec$Integer2String@7b7fdc8}=192}<mailto:codec=com.datatorrent.api.StringCodec$Integer2String@7b7fdc8%7d=192%7d>}]] at com.datatorrent.stram.plan.logical.LogicalPlan.validate(LogicalPlan.java:1775) at com.datatorrent.stram.StramLocalCluster.<init>(StramLocalCluster.java:278) at com.datatorrent.stram.LocalModeImpl.getController(LocalModeImpl.java:76) ... 23 more ============================================ Application.java starts here ============================================================ /** * Put your copyright and license info here. */ package supplies.facility.edi.sellars; import com.datatorrent.api.DAG; import com.datatorrent.api.DAG.Locality; import com.datatorrent.api.StreamingApplication; import com.datatorrent.api.annotation.ApplicationAnnotation; import com.datatorrent.contrib.kinesis.AbstractKinesisInputOperator; import com.datatorrent.contrib.kinesis.KinesisConsumer; import com.datatorrent.contrib.kinesis.KinesisStringInputOperator; import com.datatorrent.contrib.kinesis.ShardManager; import com.datatorrent.lib.db.jdbc.JdbcStore; import com.datatorrent.netlet.util.DTThrowable; import org.apache.hadoop.conf.Configuration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import supplies.facility.apex.contrib.elasticsearch.ElasticSearchMapOutputOperator; import supplies.facility.edi.helpers.EdiSmtpOutputOperator; import supplies.facility.edi.helpers.KinesisEdiTransactionSetInputOperator; import supplies.facility.edi.helpers.SqsTransactionSetInputOperator; import supplies.facility.edi.helpers.StatefulShardManager; import java.sql.PreparedStatement; import java.sql.ResultSet; @ApplicationAnnotation(name="FsEdiSellars") public class Application implements StreamingApplication { private final Locality locality = null; private static final Logger LOG = LoggerFactory.getLogger(Application.class); private JdbcStore jdbcStore; @Override public void populateDAG(DAG dag, Configuration conf) { try { //Read from SQS FS_EDI_SELLARS //SQSConnectionFactory SqsTransactionSetInputOperator sqsReader = dag.addOperator("TransactionsQueue", new SqsTransactionSetInputOperator()); // Define all the operators EDIRoutingOperator ediRouter = dag.addOperator("ediRouter", new EDIRoutingOperator()); InvoiceOperator operator810 = dag.addOperator("operator810", new InvoiceOperator()); POAcknowledgmentOperator operator855 = dag.addOperator("operator855", new POAcknowledgmentOperator()); ShipNotificationOperator operator856 = dag.addOperator("operator856", new ShipNotificationOperator()); FunctionalAcknowledgmentOperator operator997 = dag.addOperator("operator997", new FunctionalAcknowledgmentOperator()); // Set up the EDIRoutingOperator that takes the inbound EDI data, and puts it into multiple // outbound streams for each of the applicable EDI transactions: // 810 = Invoice // 855 = Purchase Order Acknowledgment // 856 = Automatic Ship Notification(ASN) // 997 = Functional Acknowledgment // as well as one for an elasticsearch logger handler to log all incoming transactions. dag.addStream("ediInboundStream", sqsReader.output, ediRouter.inboundEdiPort).setLocality(locality.CONTAINER_LOCAL); dag.addStream("loopBack855", operator855.loopbackPort855, ediRouter.loopbackPort855).setLocality(locality.CONTAINER_LOCAL); dag.addStream("loopBack856", operator856.loopbackPort856, ediRouter.loopbackPort856).setLocality(locality.CONTAINER_LOCAL); dag.addStream("loopBack810", operator810.loopbackPort810, ediRouter.loopbackPort810).setLocality(locality.CONTAINER_LOCAL); dag.addStream("loopBack997", operator997.loopbackPort997, ediRouter.loopbackPort997).setLocality(locality.CONTAINER_LOCAL); // Set up the Invoice operator and tie it to the output stream created in the EDIRouter dag.addStream("invoices", ediRouter.outputPort810, operator810.InputPort).setLocality(locality); // Set up the Purchase Order Acknowledgment operator and tie it to the output stream created in the EDIRouter dag.addStream("PoAcknowledgments", ediRouter.outputPort855, operator855.InputPort).setLocality(locality); // Set up the Functional Acknowledgment operator and tie it to the output stream created in the dag.addStream("shipNotifications", ediRouter.outputPort856, operator856.InputPort).setLocality(locality); // Set up the Functional Acknowledgment operator and tie it to the output stream created in the EDIRouter dag.addStream("functionalAcknowledgments", ediRouter.outputPort997, operator997.InputPort).setLocality(locality); // Set up the elasticsearch operator and tie it to the output stream created in the EDIRouter ElasticSearchMapOutputOperator operatorEs = dag.addOperator("operatorEs", new ElasticSearchMapOutputOperator()); dag.addStream("esLogger", ediRouter.outputPortEs, operatorEs.input).setLocality(locality); // Set up the smtp output operator to use for the Ship Notificaiton messages EdiSmtpOutputOperator operatorSmtp855 = dag.addOperator("operatorSmtp855", new EdiSmtpOutputOperator()); dag.addStream("smtpOutput855", operator855.outputEmails, operatorSmtp855.input).setLocality(locality); // Set up the smtp output operator to use for the Ship Notificaiton messages EdiSmtpOutputOperator operatorSmtp856 = dag.addOperator("operatorSmtp856", new EdiSmtpOutputOperator()); dag.addStream("smtpOutput856", operator856.outputEmails, operatorSmtp856.input).setLocality(locality); } catch (Exception exc) { DTThrowable.rethrow(exc); } } } ============================================== here is one of the operator files, stripped of everything but the module declarations ============= import com.datatorrent.api.DefaultOutputPort; import com.datatorrent.api.annotation.Stateless; import org.milyn.payload.StringResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.w3c.dom.Document; import supplies.facility.edi.helpers.AbstractEDIProcessor; /** * This Operator handles the processing for the EDI 810, Invoice transactions. * * As invoices arrive, they are added to the accounting invoicing system for payment, * and the order status is updated to show that the item is now closed. * * Created by jradke on 2/7/2016. */ @Stateless public class InvoiceOperator extends AbstractEDIProcessor { private static final Logger logger = LoggerFactory.getLogger(InvoiceOperator.class); public InvoiceOperator() { super("files/smooks-config810.xml"); } public transient DefaultOutputPort<Long> loopbackPort810= new DefaultOutputPort<Long>(); @Override protected void processXML(Document xmlDocument) { logger.trace("Entered InvoiceOperator processXML"); } } Thanks! Jim -----Original Message----- From: Tushar Gosavi [mailto:[email protected]<mailto:[email protected]>] Sent: Thursday, September 1, 2016 1:06 PM To: [email protected]<mailto:[email protected]> Subject: Re: HDHT question - looking for the datatorrent gurus! Hi Jim, Yes this is what I had in mind, The manage state needs to have separate input for each of the 5 operators. The platform does not support connecting multiple output port to a single input port, but you could achieve similar effect using stream merge operator (https://github.com/apache/apex-malhar/blob/3ce83708f795b081d564be357a8333928154398e/library/src/main/java/com/datatorrent/lib/stream/StreamMerger.java) - Tushar. On Thu, Sep 1, 2016 at 10:37 PM, Jim <[email protected]<mailto:[email protected]>> wrote: > Tushar, > > Funny that you described it that way, as that is exactly what I was thinking > about this morning. > > > So the flow would be: > > > > Router Operator > > > | > > > Managed State Operator > > > | > > --------------------------------------------------------------------------------------------------------------------------------------- > | > | | > | > > General Acknowledgement Detailed Acknowledgement > Ship Notification Invoice > > | > | | > | > > --------------------------------------------------------------------------------------------------------------------------------------- > > | > > ------------------------------------------------------------------------------------------------------------------------------------------ > / each of the 4 operators at the end of > processing emits a record back to Managed State Operator / > > ---------------------------------------------------------------------- > -------------------------------------------------------------------- > > > In this scenario, would the managed state operator just have 1 input, > that all the other operators emit to, or would it need to have separate > inputs for each of the 5 operators that would be emitting to it? > > This is what you were describing too, correct? > > Thanks, > > Jim > > -----Original Message----- > From: Tushar Gosavi > [mailto:[email protected]<mailto:[email protected]>] > Sent: Thursday, September 1, 2016 11:49 AM > To: [email protected]<mailto:[email protected]> > Subject: Re: HDHT question - looking for the datatorrent gurus! > > Hi Jim, > > Currently HDHT is accessible only to single operator in a DAG. Single HDHT > store can not be managed by two different operator at a time which could > cause metadata corruption. Theoretically HDHT bucket could be read from > multiple operators, but only one writer is allowed. > > In your case a stage in transaction is processed completely by different > operator and then only next stage can start. It could still be achieved by > using a single operator which manages HDHT state, and having a loop in DAG to > send completed transaction ids to sequencer. > > - Sequence operator will emit transaction to transaction processing operator. > - If it receives an out of order transaction it will note it down in HDHT. > - The processing operator will send completed transaction id on a port which > is connected back to sequence operator. > - On receiving data on this loopback port, sequence operator will update HDHT > and search for next transaction in order, which could be stored in HDHT and > will emit to next processing operator. > > - Tushar. > > > On Sat, Aug 27, 2016 at 1:31 AM, Jim > <[email protected]<mailto:[email protected]>> wrote: >> Good afternoon, >> >> >> >> I have an apex application where I may receive edi transactions, but >> sometimes they arrive out of order and I want to hold any out of >> sequence transactions till the correct time in the flow to process them. >> >> >> >> For example for a standard order, we will receive from the remote vendor: >> >> >> >> 1.) General Acknowledgement >> >> 2.) Detailed Acknowledgement >> >> 3.) Ship Notification >> >> 4.) Invoice >> >> >> >> They are supposed to be sent and received in that order. >> >> >> >> However sometimes vendors systems have problems, etc. so they send >> the all of these at the same time, and then we can receive them out of >> sequence. >> Data packets for these are very small, say from 1 to 512 bytes, and >> the only time they will be out of sequence, we will receive them very >> closely together. >> >> >> >> I am trying to think of the best way to do this in my datatorrent / >> Hadoop / yarn facilities, instead of creating a datatable in >> postgreSQl and using that. >> >> >> >> Can I create a flow that works like this (I am not sure if this makes >> sense, or is the best way to solve my problem, while keeping state, >> etc. maintained for all the operators): >> >> >> >> 1.) In the inbound transaction router, check the hdht store for the order >> number, if it doesn’t exist, this means it is a new order, if the >> transaction trying to process is the general acknowledgment, emit the >> data to the general acknowledgement operator; if it is not – store >> the transaction data into the correct bucket identifying the >> transaction is it for, as well as the next step to be the general >> acknowledgement in HDHT by order number. >> >> 2.) Say the next transaction is the ship notification, in the router, we >> would check the HDHT store, see this is not the next expected >> transaction (say it is supposed to be the detail acknowledgement), so >> we would just post the data for the ship notification into HDHT the store >> and say we are done. >> >> 3.) Say we now receive the detailed acknowledgement for an order whose >> next step IS the detailed acknowledgement, we would see this is the >> correct next transaction, emit it to the detailed acknowledgement >> operator, and update the HDHT store to show that the next transaction >> should be the ship notification. NOTE: we can’t emit the ship >> notification yet, till we have confirmed that the detailed ackkowledgment >> has been completed. >> >> 4.) In each of the 4 transaction operators at the end of the processing, >> we would update the HDHT store to show the next expected step, and if >> we already received data for the next expected step pull it from the >> HDHT store, and write the transaction into our SQS queue which is the >> input into the inbound transaction router at the beginning of the >> application, so it processes through the system. >> >> >> >> I believe HDHT can be used to pass data throughout an entire >> application, and is not limited to just a per operator basis, correct? >> >> >> >> Any comments / feedback? >> >> >> >> Thanks, >> >> >> >> Jim
