Hi Thomas,

I saw that example, and wasn’t quite following it.

Is the DelayOperator, just a generic pass-through, that we put our output to, 
and then read from in the interative operator we want to go back to so that it 
handles the correct windowing, etc?

Jim

From: Thomas Weise [mailto:[email protected]]
Sent: Saturday, September 3, 2016 11:47 AM
To: [email protected]
Subject: Re: HDHT question - looking for the datatorrent gurus!

Jim,

You need to use the delay operator for iterative processing. Here is an example:

https://github.com/apache/apex-malhar/blob/master/demos/iteration/src/main/java/com/datatorrent/demos/iteration/Application.java

Thomas

On Sat, Sep 3, 2016 at 9:05 AM, Jim 
<[email protected]<mailto:[email protected]>> wrote:
Tushar,

I am trying to implement what you described, and I get a validation error:  
Loops In Graph, and can't seem to find any reference to this.

Below I have pasted my Application.java file; what could be causing this error:

java.lang.RuntimeException: Error creating local cluster

        at 
com.datatorrent.stram.LocalModeImpl.getController(LocalModeImpl.java:78)
        at 
supplies.facility.edi.sellars.ApplicationTest.testApplication(ApplicationTest.java:30)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:44)
        at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:15)
        at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:41)
        at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:20)
        at 
org.junit.runners.BlockJUnit4ClassRunner.runNotIgnored(BlockJUnit4ClassRunner.java:79)
        at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:71)
        at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:49)
        at org.junit.runners.ParentRunner$3.run(ParentRunner.java:193)
        at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:52)
        at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:191)
        at org.junit.runners.ParentRunner.access$000(ParentRunner.java:42)
        at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:184)
        at org.junit.runners.ParentRunner.run(ParentRunner.java:236)
        at org.junit.runner.JUnitCore.run(JUnitCore.java:157)
        at 
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:117)
        at 
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:42)
        at 
com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:262)
        at 
com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:84)
Caused by: javax.validation.ValidationException: Loops in graph: 
[[OperatorMeta{name=operator997, 
operator=FunctionalAcknowledgmentOperator{name=null}, 
attributes={Attribute{defaultValue=1024, 
name=com.datatorrent.api.Context.OperatorContext.MEMORY_MB, 
codec=com.datatorrent.api.StringCodec$Integer2String@7b7fdc8}=192}<mailto:codec=com.datatorrent.api.StringCodec$Integer2String@7b7fdc8%7d=192%7d>},
 OperatorMeta{name=operator856, operator=ShipNotificationOperator{name=null}, 
attributes={Attribute{defaultValue=1024, 
name=com.datatorrent.api.Context.OperatorContext.MEMORY_MB, 
codec=com.datatorrent.api.StringCodec$Integer2String@7b7fdc8}=192}<mailto:codec=com.datatorrent.api.StringCodec$Integer2String@7b7fdc8%7d=192%7d>},
 OperatorMeta{name=operator855, operator=POAcknowledgmentOperator{name=null}, 
attributes={Attribute{defaultValue=1024, 
name=com.datatorrent.api.Context.OperatorContext.MEMORY_MB, 
codec=com.datatorrent.api.StringCodec$Integer2String@7b7fdc8}=192}<mailto:codec=com.datatorrent.api.StringCodec$Integer2String@7b7fdc8%7d=192%7d>},
 OperatorMeta{name=ediRouter, operator=EDIRoutingOperator{name=null}, 
attributes={Attribute{defaultValue=1024, 
name=com.datatorrent.api.Context.OperatorContext.MEMORY_MB, 
codec=com.datatorrent.api.StringCodec$Integer2String@7b7fdc8}=512}<mailto:codec=com.datatorrent.api.StringCodec$Integer2String@7b7fdc8%7d=512%7d>},
 OperatorMeta{name=operator810, operator=InvoiceOperator{name=null}, 
attributes={Attribute{defaultValue=1024, 
name=com.datatorrent.api.Context.OperatorContext.MEMORY_MB, 
codec=com.datatorrent.api.StringCodec$Integer2String@7b7fdc8}=192}<mailto:codec=com.datatorrent.api.StringCodec$Integer2String@7b7fdc8%7d=192%7d>}]]
        at 
com.datatorrent.stram.plan.logical.LogicalPlan.validate(LogicalPlan.java:1775)
        at 
com.datatorrent.stram.StramLocalCluster.<init>(StramLocalCluster.java:278)
        at 
com.datatorrent.stram.LocalModeImpl.getController(LocalModeImpl.java:76)
        ... 23 more

============================================  Application.java starts here 
============================================================

/**
 * Put your copyright and license info here.
 */
package supplies.facility.edi.sellars;

import com.datatorrent.api.DAG;
import com.datatorrent.api.DAG.Locality;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.annotation.ApplicationAnnotation;
import com.datatorrent.contrib.kinesis.AbstractKinesisInputOperator;
import com.datatorrent.contrib.kinesis.KinesisConsumer;
import com.datatorrent.contrib.kinesis.KinesisStringInputOperator;
import com.datatorrent.contrib.kinesis.ShardManager;
import com.datatorrent.lib.db.jdbc.JdbcStore;
import com.datatorrent.netlet.util.DTThrowable;
import org.apache.hadoop.conf.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import 
supplies.facility.apex.contrib.elasticsearch.ElasticSearchMapOutputOperator;
import supplies.facility.edi.helpers.EdiSmtpOutputOperator;
import supplies.facility.edi.helpers.KinesisEdiTransactionSetInputOperator;
import supplies.facility.edi.helpers.SqsTransactionSetInputOperator;
import supplies.facility.edi.helpers.StatefulShardManager;

import java.sql.PreparedStatement;
import java.sql.ResultSet;

@ApplicationAnnotation(name="FsEdiSellars")
public class Application implements StreamingApplication {
  private final Locality locality = null;

  private static final Logger LOG = LoggerFactory.getLogger(Application.class);

  private JdbcStore jdbcStore;

  @Override
  public void populateDAG(DAG dag, Configuration conf) {

    try {
      //Read from SQS FS_EDI_SELLARS
      //SQSConnectionFactory
      SqsTransactionSetInputOperator sqsReader = 
dag.addOperator("TransactionsQueue", new SqsTransactionSetInputOperator());

      // Define all the operators
      EDIRoutingOperator ediRouter = dag.addOperator("ediRouter", new 
EDIRoutingOperator());
      InvoiceOperator operator810 = dag.addOperator("operator810", new 
InvoiceOperator());
      POAcknowledgmentOperator operator855 = dag.addOperator("operator855", new 
POAcknowledgmentOperator());
      ShipNotificationOperator operator856 = dag.addOperator("operator856", new 
ShipNotificationOperator());
      FunctionalAcknowledgmentOperator operator997 = 
dag.addOperator("operator997", new FunctionalAcknowledgmentOperator());

      // Set up the EDIRoutingOperator that takes the inbound EDI data, and 
puts it into multiple
      // outbound streams for each of the applicable EDI transactions:
      //       810 = Invoice
      //       855 = Purchase Order Acknowledgment
      //       856 = Automatic Ship Notification(ASN)
      //       997 = Functional Acknowledgment
      // as well as one for an elasticsearch logger handler to log all incoming 
transactions.
      dag.addStream("ediInboundStream", sqsReader.output, 
ediRouter.inboundEdiPort).setLocality(locality.CONTAINER_LOCAL);
      dag.addStream("loopBack855", operator855.loopbackPort855, 
ediRouter.loopbackPort855).setLocality(locality.CONTAINER_LOCAL);
      dag.addStream("loopBack856", operator856.loopbackPort856, 
ediRouter.loopbackPort856).setLocality(locality.CONTAINER_LOCAL);
      dag.addStream("loopBack810", operator810.loopbackPort810, 
ediRouter.loopbackPort810).setLocality(locality.CONTAINER_LOCAL);
      dag.addStream("loopBack997", operator997.loopbackPort997, 
ediRouter.loopbackPort997).setLocality(locality.CONTAINER_LOCAL);

      // Set up the Invoice operator and tie it to the output stream created in 
the EDIRouter
      dag.addStream("invoices", ediRouter.outputPort810, 
operator810.InputPort).setLocality(locality);

      // Set up the Purchase Order Acknowledgment operator and tie it to the 
output stream created in the EDIRouter
      dag.addStream("PoAcknowledgments", ediRouter.outputPort855, 
operator855.InputPort).setLocality(locality);

      // Set up the Functional Acknowledgment operator and tie it to the output 
stream created in the
      dag.addStream("shipNotifications", ediRouter.outputPort856, 
operator856.InputPort).setLocality(locality);

      // Set up the Functional Acknowledgment operator and tie it to the output 
stream created in the EDIRouter
      dag.addStream("functionalAcknowledgments", ediRouter.outputPort997, 
operator997.InputPort).setLocality(locality);

      // Set up the elasticsearch operator and tie it to the output stream 
created in the EDIRouter
      ElasticSearchMapOutputOperator operatorEs = dag.addOperator("operatorEs", 
new ElasticSearchMapOutputOperator());
      dag.addStream("esLogger", ediRouter.outputPortEs, 
operatorEs.input).setLocality(locality);

      // Set up the smtp output operator to use for the Ship Notificaiton 
messages
      EdiSmtpOutputOperator operatorSmtp855 = 
dag.addOperator("operatorSmtp855", new EdiSmtpOutputOperator());
      dag.addStream("smtpOutput855", operator855.outputEmails, 
operatorSmtp855.input).setLocality(locality);

      // Set up the smtp output operator to use for the Ship Notificaiton 
messages
      EdiSmtpOutputOperator operatorSmtp856 = 
dag.addOperator("operatorSmtp856", new EdiSmtpOutputOperator());
      dag.addStream("smtpOutput856", operator856.outputEmails, 
operatorSmtp856.input).setLocality(locality);

    } catch (Exception exc) {
      DTThrowable.rethrow(exc);
    }
  }
}


==============================================  here is one of the operator 
files, stripped of everything but the module declarations =============

import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.annotation.Stateless;
import org.milyn.payload.StringResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.w3c.dom.Document;
import supplies.facility.edi.helpers.AbstractEDIProcessor;

/**
 * This Operator handles the processing for the EDI 810, Invoice transactions.
 *
 * As invoices arrive, they are added to the accounting invoicing system for 
payment,
 * and the order status is updated to show that the item is now closed.
 *
 * Created by jradke on 2/7/2016.
 */
@Stateless
public class InvoiceOperator extends AbstractEDIProcessor {

    private static final Logger logger = 
LoggerFactory.getLogger(InvoiceOperator.class);

    public InvoiceOperator() {
        super("files/smooks-config810.xml");
    }

    public transient DefaultOutputPort<Long> loopbackPort810= new 
DefaultOutputPort<Long>();

    @Override
    protected void processXML(Document xmlDocument) {
        logger.trace("Entered InvoiceOperator processXML");
    }
}

Thanks!

Jim

-----Original Message-----
From: Tushar Gosavi 
[mailto:[email protected]<mailto:[email protected]>]
Sent: Thursday, September 1, 2016 1:06 PM
To: [email protected]<mailto:[email protected]>
Subject: Re: HDHT question - looking for the datatorrent gurus!

Hi Jim,
Yes this is what I had in mind, The manage state needs to have separate input 
for each of the 5 operators. The platform does not support connecting multiple 
output port to a single input port, but you could achieve similar effect using 
stream merge operator
(https://github.com/apache/apex-malhar/blob/3ce83708f795b081d564be357a8333928154398e/library/src/main/java/com/datatorrent/lib/stream/StreamMerger.java)

- Tushar.


On Thu, Sep 1, 2016 at 10:37 PM, Jim 
<[email protected]<mailto:[email protected]>> wrote:
> Tushar,
>
> Funny that you described it that way, as that is exactly what I was thinking 
> about this morning.
>
>
> So the flow would be:
>
>
>
> Router Operator
>
>
> |
>
>
> Managed State Operator
>
>                                                                               
>                                      |
>                                 
> ---------------------------------------------------------------------------------------------------------------------------------------
>                                |                                              
>              |                                                              | 
>                                                      |
>
>          General Acknowledgement             Detailed Acknowledgement         
>               Ship Notification                                  Invoice
>
>                                |                                              
>              |                                                              | 
>                                                      |
>                                 
> ---------------------------------------------------------------------------------------------------------------------------------------
>                                                                               
>                                     |
>                               
> ------------------------------------------------------------------------------------------------------------------------------------------
>                            /    each of the 4 operators at the end of 
> processing emits a  record back to Managed State Operator      /
>
> ----------------------------------------------------------------------
> --------------------------------------------------------------------
>
>
> In this scenario, would the managed state operator just have 1 input,
> that all the other operators emit to, or would it need to have separate 
> inputs for each of the 5 operators that would be emitting to it?
>
> This is what you were describing too, correct?
>
> Thanks,
>
> Jim
>
> -----Original Message-----
> From: Tushar Gosavi 
> [mailto:[email protected]<mailto:[email protected]>]
> Sent: Thursday, September 1, 2016 11:49 AM
> To: [email protected]<mailto:[email protected]>
> Subject: Re: HDHT question - looking for the datatorrent gurus!
>
> Hi Jim,
>
> Currently HDHT is accessible only to single operator in a DAG. Single HDHT 
> store can not be managed by two different operator at a time which could 
> cause metadata corruption. Theoretically HDHT bucket could be read from 
> multiple operators, but only one writer is allowed.
>
> In your case a stage in transaction is processed completely by different 
> operator and then only next stage can start. It could still be achieved by 
> using a single operator which manages HDHT state, and having a loop in DAG to 
> send completed transaction ids to sequencer.
>
> - Sequence operator will emit transaction to transaction processing operator.
> - If it receives an out of order transaction it will note it down in HDHT.
> - The processing operator will send completed transaction id on a port which 
> is connected back to sequence operator.
> - On receiving data on this loopback port, sequence operator will update HDHT 
> and search for next transaction in order, which could be stored in HDHT and 
> will emit to next processing operator.
>
> - Tushar.
>
>
> On Sat, Aug 27, 2016 at 1:31 AM, Jim 
> <[email protected]<mailto:[email protected]>> wrote:
>> Good afternoon,
>>
>>
>>
>> I have an apex application where I may receive edi transactions, but
>> sometimes they arrive out of order and I want to hold any out of
>> sequence transactions till the correct time in the flow to process them.
>>
>>
>>
>> For example for a standard order, we will receive from the remote vendor:
>>
>>
>>
>> 1.)    General Acknowledgement
>>
>> 2.)    Detailed Acknowledgement
>>
>> 3.)    Ship Notification
>>
>> 4.)    Invoice
>>
>>
>>
>> They are supposed to be sent and received in that order.
>>
>>
>>
>> However sometimes vendors systems have problems, etc. so they send
>> the all of these at the same time, and then we can receive them out of 
>> sequence.
>> Data packets for these are very small, say from 1 to 512 bytes, and
>> the only time they will be out of sequence, we will receive them very
>> closely together.
>>
>>
>>
>> I am trying to think of the best way to do this in my datatorrent /
>> Hadoop / yarn facilities, instead of creating a datatable in
>> postgreSQl and using that.
>>
>>
>>
>> Can I create a flow that works like this (I am not sure if this makes
>> sense, or is the best way to solve my problem, while keeping state,
>> etc. maintained for all the operators):
>>
>>
>>
>> 1.)    In the inbound transaction router, check the hdht store for the order
>> number, if it doesn’t exist, this means it is a new order, if the
>> transaction trying to process is the general acknowledgment, emit the
>> data to the general acknowledgement operator; if it is not – store
>> the transaction data into the correct bucket identifying the
>> transaction is it for, as well as the next step to be the general
>> acknowledgement in HDHT by order number.
>>
>> 2.)    Say the next transaction is the ship notification, in the router, we
>> would check the HDHT store, see this is not the next expected
>> transaction (say it is supposed to be the detail acknowledgement), so
>> we would just post the data for the ship notification into HDHT the store 
>> and say we are done.
>>
>> 3.)    Say we now receive the detailed acknowledgement for an order whose
>> next step IS the detailed acknowledgement, we would see this is the
>> correct next transaction, emit it to the detailed acknowledgement
>> operator, and update the HDHT store to show that the next transaction
>> should be the ship notification.  NOTE:  we can’t emit the ship
>> notification yet, till we have confirmed that the detailed ackkowledgment 
>> has been completed.
>>
>> 4.)    In each of the 4 transaction operators at the end of the processing,
>> we would update the HDHT store to show the next expected step, and if
>> we already received data for the next expected step pull it from the
>> HDHT store, and write the transaction into our SQS queue which is the
>> input into the inbound transaction router at the beginning of the
>> application, so it processes through the system.
>>
>>
>>
>> I believe HDHT can be used to pass data throughout an entire
>> application, and is not limited to just a per operator basis, correct?
>>
>>
>>
>> Any comments / feedback?
>>
>>
>>
>> Thanks,
>>
>>
>>
>> Jim

Reply via email to