Tushar,
I am trying to implement what you described, and I get a validation error:
Loops In Graph, and can't seem to find any reference to this.
Below I have pasted my Application.java file; what could be causing this error:
java.lang.RuntimeException: Error creating local cluster
at
com.datatorrent.stram.LocalModeImpl.getController(LocalModeImpl.java:78)
at
supplies.facility.edi.sellars.ApplicationTest.testApplication(ApplicationTest.java:30)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:44)
at
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:15)
at
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:41)
at
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:20)
at
org.junit.runners.BlockJUnit4ClassRunner.runNotIgnored(BlockJUnit4ClassRunner.java:79)
at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:71)
at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:49)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:193)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:52)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:191)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:42)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:184)
at org.junit.runners.ParentRunner.run(ParentRunner.java:236)
at org.junit.runner.JUnitCore.run(JUnitCore.java:157)
at
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:117)
at
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:42)
at
com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:262)
at
com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:84)
Caused by: javax.validation.ValidationException: Loops in graph:
[[OperatorMeta{name=operator997,
operator=FunctionalAcknowledgmentOperator{name=null},
attributes={Attribute{defaultValue=1024,
name=com.datatorrent.api.Context.OperatorContext.MEMORY_MB,
codec=com.datatorrent.api.StringCodec$Integer2String@7b7fdc8}=192}},
OperatorMeta{name=operator856, operator=ShipNotificationOperator{name=null},
attributes={Attribute{defaultValue=1024,
name=com.datatorrent.api.Context.OperatorContext.MEMORY_MB,
codec=com.datatorrent.api.StringCodec$Integer2String@7b7fdc8}=192}},
OperatorMeta{name=operator855, operator=POAcknowledgmentOperator{name=null},
attributes={Attribute{defaultValue=1024,
name=com.datatorrent.api.Context.OperatorContext.MEMORY_MB,
codec=com.datatorrent.api.StringCodec$Integer2String@7b7fdc8}=192}},
OperatorMeta{name=ediRouter, operator=EDIRoutingOperator{name=null},
attributes={Attribute{defaultValue=1024,
name=com.datatorrent.api.Context.OperatorContext.MEMORY_MB,
codec=com.datatorrent.api.StringCodec$Integer2String@7b7fdc8}=512}},
OperatorMeta{name=operator810, operator=InvoiceOperator{name=null},
attributes={Attribute{defaultValue=1024,
name=com.datatorrent.api.Context.OperatorContext.MEMORY_MB,
codec=com.datatorrent.api.StringCodec$Integer2String@7b7fdc8}=192}}]]
at
com.datatorrent.stram.plan.logical.LogicalPlan.validate(LogicalPlan.java:1775)
at
com.datatorrent.stram.StramLocalCluster.<init>(StramLocalCluster.java:278)
at
com.datatorrent.stram.LocalModeImpl.getController(LocalModeImpl.java:76)
... 23 more
============================================ Application.java starts here
============================================================
/**
* Put your copyright and license info here.
*/
package supplies.facility.edi.sellars;
import com.datatorrent.api.DAG;
import com.datatorrent.api.DAG.Locality;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.annotation.ApplicationAnnotation;
import com.datatorrent.contrib.kinesis.AbstractKinesisInputOperator;
import com.datatorrent.contrib.kinesis.KinesisConsumer;
import com.datatorrent.contrib.kinesis.KinesisStringInputOperator;
import com.datatorrent.contrib.kinesis.ShardManager;
import com.datatorrent.lib.db.jdbc.JdbcStore;
import com.datatorrent.netlet.util.DTThrowable;
import org.apache.hadoop.conf.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import
supplies.facility.apex.contrib.elasticsearch.ElasticSearchMapOutputOperator;
import supplies.facility.edi.helpers.EdiSmtpOutputOperator;
import supplies.facility.edi.helpers.KinesisEdiTransactionSetInputOperator;
import supplies.facility.edi.helpers.SqsTransactionSetInputOperator;
import supplies.facility.edi.helpers.StatefulShardManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
@ApplicationAnnotation(name="FsEdiSellars")
public class Application implements StreamingApplication {
private final Locality locality = null;
private static final Logger LOG = LoggerFactory.getLogger(Application.class);
private JdbcStore jdbcStore;
@Override
public void populateDAG(DAG dag, Configuration conf) {
try {
//Read from SQS FS_EDI_SELLARS
//SQSConnectionFactory
SqsTransactionSetInputOperator sqsReader =
dag.addOperator("TransactionsQueue", new SqsTransactionSetInputOperator());
// Define all the operators
EDIRoutingOperator ediRouter = dag.addOperator("ediRouter", new
EDIRoutingOperator());
InvoiceOperator operator810 = dag.addOperator("operator810", new
InvoiceOperator());
POAcknowledgmentOperator operator855 = dag.addOperator("operator855", new
POAcknowledgmentOperator());
ShipNotificationOperator operator856 = dag.addOperator("operator856", new
ShipNotificationOperator());
FunctionalAcknowledgmentOperator operator997 =
dag.addOperator("operator997", new FunctionalAcknowledgmentOperator());
// Set up the EDIRoutingOperator that takes the inbound EDI data, and
puts it into multiple
// outbound streams for each of the applicable EDI transactions:
// 810 = Invoice
// 855 = Purchase Order Acknowledgment
// 856 = Automatic Ship Notification(ASN)
// 997 = Functional Acknowledgment
// as well as one for an elasticsearch logger handler to log all incoming
transactions.
dag.addStream("ediInboundStream", sqsReader.output,
ediRouter.inboundEdiPort).setLocality(locality.CONTAINER_LOCAL);
dag.addStream("loopBack855", operator855.loopbackPort855,
ediRouter.loopbackPort855).setLocality(locality.CONTAINER_LOCAL);
dag.addStream("loopBack856", operator856.loopbackPort856,
ediRouter.loopbackPort856).setLocality(locality.CONTAINER_LOCAL);
dag.addStream("loopBack810", operator810.loopbackPort810,
ediRouter.loopbackPort810).setLocality(locality.CONTAINER_LOCAL);
dag.addStream("loopBack997", operator997.loopbackPort997,
ediRouter.loopbackPort997).setLocality(locality.CONTAINER_LOCAL);
// Set up the Invoice operator and tie it to the output stream created in
the EDIRouter
dag.addStream("invoices", ediRouter.outputPort810,
operator810.InputPort).setLocality(locality);
// Set up the Purchase Order Acknowledgment operator and tie it to the
output stream created in the EDIRouter
dag.addStream("PoAcknowledgments", ediRouter.outputPort855,
operator855.InputPort).setLocality(locality);
// Set up the Functional Acknowledgment operator and tie it to the output
stream created in the
dag.addStream("shipNotifications", ediRouter.outputPort856,
operator856.InputPort).setLocality(locality);
// Set up the Functional Acknowledgment operator and tie it to the output
stream created in the EDIRouter
dag.addStream("functionalAcknowledgments", ediRouter.outputPort997,
operator997.InputPort).setLocality(locality);
// Set up the elasticsearch operator and tie it to the output stream
created in the EDIRouter
ElasticSearchMapOutputOperator operatorEs = dag.addOperator("operatorEs",
new ElasticSearchMapOutputOperator());
dag.addStream("esLogger", ediRouter.outputPortEs,
operatorEs.input).setLocality(locality);
// Set up the smtp output operator to use for the Ship Notificaiton
messages
EdiSmtpOutputOperator operatorSmtp855 =
dag.addOperator("operatorSmtp855", new EdiSmtpOutputOperator());
dag.addStream("smtpOutput855", operator855.outputEmails,
operatorSmtp855.input).setLocality(locality);
// Set up the smtp output operator to use for the Ship Notificaiton
messages
EdiSmtpOutputOperator operatorSmtp856 =
dag.addOperator("operatorSmtp856", new EdiSmtpOutputOperator());
dag.addStream("smtpOutput856", operator856.outputEmails,
operatorSmtp856.input).setLocality(locality);
} catch (Exception exc) {
DTThrowable.rethrow(exc);
}
}
}
============================================== here is one of the operator
files, stripped of everything but the module declarations =============
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.annotation.Stateless;
import org.milyn.payload.StringResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.w3c.dom.Document;
import supplies.facility.edi.helpers.AbstractEDIProcessor;
/**
* This Operator handles the processing for the EDI 810, Invoice transactions.
*
* As invoices arrive, they are added to the accounting invoicing system for
payment,
* and the order status is updated to show that the item is now closed.
*
* Created by jradke on 2/7/2016.
*/
@Stateless
public class InvoiceOperator extends AbstractEDIProcessor {
private static final Logger logger =
LoggerFactory.getLogger(InvoiceOperator.class);
public InvoiceOperator() {
super("files/smooks-config810.xml");
}
public transient DefaultOutputPort<Long> loopbackPort810= new
DefaultOutputPort<Long>();
@Override
protected void processXML(Document xmlDocument) {
logger.trace("Entered InvoiceOperator processXML");
}
}
Thanks!
Jim
-----Original Message-----
From: Tushar Gosavi [mailto:[email protected]]
Sent: Thursday, September 1, 2016 1:06 PM
To: [email protected]
Subject: Re: HDHT question - looking for the datatorrent gurus!
Hi Jim,
Yes this is what I had in mind, The manage state needs to have separate input
for each of the 5 operators. The platform does not support connecting multiple
output port to a single input port, but you could achieve similar effect using
stream merge operator
(https://github.com/apache/apex-malhar/blob/3ce83708f795b081d564be357a8333928154398e/library/src/main/java/com/datatorrent/lib/stream/StreamMerger.java)
- Tushar.
On Thu, Sep 1, 2016 at 10:37 PM, Jim <[email protected]> wrote:
> Tushar,
>
> Funny that you described it that way, as that is exactly what I was thinking
> about this morning.
>
>
> So the flow would be:
>
>
>
>
> Router Operator
>
>
>
> |
>
>
>
> Managed State Operator
>
>
> |
>
> ---------------------------------------------------------------------------------------------------------------------------------------
> |
> | |
> |
>
> General Acknowledgement Detailed Acknowledgement
> Ship Notification Invoice
>
> |
> | |
> |
>
> ---------------------------------------------------------------------------------------------------------------------------------------
>
> |
>
> ------------------------------------------------------------------------------------------------------------------------------------------
> / each of the 4 operators at the end of
> processing emits a record back to Managed State Operator /
>
> ----------------------------------------------------------------------
> --------------------------------------------------------------------
>
>
> In this scenario, would the managed state operator just have 1 input,
> that all the other operators emit to, or would it need to have separate
> inputs for each of the 5 operators that would be emitting to it?
>
> This is what you were describing too, correct?
>
> Thanks,
>
> Jim
>
> -----Original Message-----
> From: Tushar Gosavi [mailto:[email protected]]
> Sent: Thursday, September 1, 2016 11:49 AM
> To: [email protected]
> Subject: Re: HDHT question - looking for the datatorrent gurus!
>
> Hi Jim,
>
> Currently HDHT is accessible only to single operator in a DAG. Single HDHT
> store can not be managed by two different operator at a time which could
> cause metadata corruption. Theoretically HDHT bucket could be read from
> multiple operators, but only one writer is allowed.
>
> In your case a stage in transaction is processed completely by different
> operator and then only next stage can start. It could still be achieved by
> using a single operator which manages HDHT state, and having a loop in DAG to
> send completed transaction ids to sequencer.
>
> - Sequence operator will emit transaction to transaction processing operator.
> - If it receives an out of order transaction it will note it down in HDHT.
> - The processing operator will send completed transaction id on a port which
> is connected back to sequence operator.
> - On receiving data on this loopback port, sequence operator will update HDHT
> and search for next transaction in order, which could be stored in HDHT and
> will emit to next processing operator.
>
> - Tushar.
>
>
> On Sat, Aug 27, 2016 at 1:31 AM, Jim <[email protected]> wrote:
>> Good afternoon,
>>
>>
>>
>> I have an apex application where I may receive edi transactions, but
>> sometimes they arrive out of order and I want to hold any out of
>> sequence transactions till the correct time in the flow to process them.
>>
>>
>>
>> For example for a standard order, we will receive from the remote vendor:
>>
>>
>>
>> 1.) General Acknowledgement
>>
>> 2.) Detailed Acknowledgement
>>
>> 3.) Ship Notification
>>
>> 4.) Invoice
>>
>>
>>
>> They are supposed to be sent and received in that order.
>>
>>
>>
>> However sometimes vendors systems have problems, etc. so they send
>> the all of these at the same time, and then we can receive them out of
>> sequence.
>> Data packets for these are very small, say from 1 to 512 bytes, and
>> the only time they will be out of sequence, we will receive them very
>> closely together.
>>
>>
>>
>> I am trying to think of the best way to do this in my datatorrent /
>> Hadoop / yarn facilities, instead of creating a datatable in
>> postgreSQl and using that.
>>
>>
>>
>> Can I create a flow that works like this (I am not sure if this makes
>> sense, or is the best way to solve my problem, while keeping state,
>> etc. maintained for all the operators):
>>
>>
>>
>> 1.) In the inbound transaction router, check the hdht store for the order
>> number, if it doesn’t exist, this means it is a new order, if the
>> transaction trying to process is the general acknowledgment, emit the
>> data to the general acknowledgement operator; if it is not – store
>> the transaction data into the correct bucket identifying the
>> transaction is it for, as well as the next step to be the general
>> acknowledgement in HDHT by order number.
>>
>> 2.) Say the next transaction is the ship notification, in the router, we
>> would check the HDHT store, see this is not the next expected
>> transaction (say it is supposed to be the detail acknowledgement), so
>> we would just post the data for the ship notification into HDHT the store
>> and say we are done.
>>
>> 3.) Say we now receive the detailed acknowledgement for an order whose
>> next step IS the detailed acknowledgement, we would see this is the
>> correct next transaction, emit it to the detailed acknowledgement
>> operator, and update the HDHT store to show that the next transaction
>> should be the ship notification. NOTE: we can’t emit the ship
>> notification yet, till we have confirmed that the detailed ackkowledgment
>> has been completed.
>>
>> 4.) In each of the 4 transaction operators at the end of the processing,
>> we would update the HDHT store to show the next expected step, and if
>> we already received data for the next expected step pull it from the
>> HDHT store, and write the transaction into our SQS queue which is the
>> input into the inbound transaction router at the beginning of the
>> application, so it processes through the system.
>>
>>
>>
>> I believe HDHT can be used to pass data throughout an entire
>> application, and is not limited to just a per operator basis, correct?
>>
>>
>>
>> Any comments / feedback?
>>
>>
>>
>> Thanks,
>>
>>
>>
>> Jim