[jira] [Closed] (APEXMALHAR-2034) Avro File To POJO Module

2017-12-20 Thread Thomas Weise (JIRA)

 [ 
https://issues.apache.org/jira/browse/APEXMALHAR-2034?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas Weise closed APEXMALHAR-2034.

Resolution: Incomplete

> Avro File To POJO Module
> 
>
> Key: APEXMALHAR-2034
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2034
> Project: Apache Apex Malhar
>  Issue Type: New Feature
>Reporter: devendra tagare
>Assignee: Saumya Mohan
>
> Issue:
> Avro objects are not serialized by Kryo causing the Avro GenericRecord to not 
> be available to downstream operators if users don't explicitly mark the 
> stream locality at container_local or thread_local. 
> Solution:
> This JIRA is used to create a Module on top of AvroFileInputOperator and 
> AvroToPojo operators such that downstream operators will access POJO instead 
> of Avro GenericRecord.  It, therefore, removes the exposure of GenericRecord 
> to downstream operators and instead exposes the created POJO to downstream 
> operators.
> In this Module, the stream between the two encapsulated operators 
> (AvroFileInputOperator and AvroToPojo) is set to CONTAINER_LOCAL.
> -Along with this new module, existing avro support files are moved from 
> contrib module to a new 'avro' module.-
> -
> *Unit Test*
> Unit test for this Avro module has been added in malhar-avro package.
> -*Move to new package and Backward compatibility*-
> -Additionally, this module is part of a new package 'malhar-avro' and the 
> operator files/tests are all moved from contrib package to the new package.-
> -Old operator files are marked deprecated and made to extend from new 
> operator files for backward compatibility.-
> -Creating a new maven module for Avro is in accordance with the JIRA 
> "https://issues.apache.org/jira/browse/APEXMALHAR-1843."-
> -Git history of all the moved files is maintained-
> *Application Level Testing*
> - To test the module, I created a sample StreamingApplication and a POJO 
> class. This application adds the new AvroToPojoModule, and ConsoleOperator to 
> the DAG. ConsoleOperator received and displayed POJO from the module
> -To test backward compatibility, I created sample application which adds 
> AvroFileInputOperator and AvroToPojo from the old package to the DAG. It also 
> adds ConsoleOperator to the DAG. ConsoleOperator received and displayed POJO 
> from the module-



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (APEXMALHAR-2034) Avro File To POJO Module

2017-12-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16299411#comment-16299411
 ] 

ASF GitHub Bot commented on APEXMALHAR-2034:


tweise closed pull request #670: APEXMALHAR-2034 Adding new Avro Module to 
encapsulate Container File to Avro GenericRecord to POJO transformations
URL: https://github.com/apache/apex-malhar/pull/670
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/contrib/src/main/java/com/datatorrent/contrib/avro/AvroFileToPojoModule.java 
b/contrib/src/main/java/com/datatorrent/contrib/avro/AvroFileToPojoModule.java
new file mode 100644
index 00..8ad00dfc89
--- /dev/null
+++ 
b/contrib/src/main/java/com/datatorrent/contrib/avro/AvroFileToPojoModule.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.avro;
+
+import org.apache.apex.malhar.lib.wal.FSWindowDataManager;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.Module;
+
+/**
+ * 
+ * Avro File To Pojo Module
+ * 
+ * This module emits Pojo based on the schema derived from the
+ * input file
+ *
+ * Example of how to configure and add this module to DAG
+ *
+ * AvroFileToPojoModule avroFileToPojoModule = new AvroFileToPojoModule();
+ * avroFileToPojoModule.setPojoClass([className.class]);
+ * avroFileToPojoModule.setAvroFileDirectory(conf.get("[configuration 
property]", "[default file directory]"));
+ * avroFileToPojoModule = dag.addModule("avroFileToPojoModule", 
avroFileToPojoModule);
+ *
+ * No need to provide schema,its inferred from the file
+ *
+ * Users can add the {@link FSWindowDataManager}
+ * to ensure exactly once semantics with a HDFS backed WAL.
+ *
+ * @displayName AvroFileToPojoModule
+ * @category Input
+ * @tags fs, file,avro, input operator, generic record, pojo
+ *
+ * @since
+ */
+public class AvroFileToPojoModule implements Module
+{
+  public final transient ProxyOutputPort output = new 
ProxyOutputPort<>();
+  public final transient ProxyOutputPort errorPort = new 
ProxyOutputPort<>();
+  //output ports from AvroFileInputOperator
+  public final transient ProxyOutputPort completedAvroFilesPort = new 
ProxyOutputPort<>();
+  public final transient ProxyOutputPort avroErrorRecordsPort = new 
ProxyOutputPort<>();
+
+  private AvroFileInputOperator avroFileInputOperator = new 
AvroFileInputOperator();
+  Class pojoClass = null;
+
+  @Override
+  public void populateDAG(DAG dag, Configuration configuration)
+  {
+AvroFileInputOperator avroFileInputOperator = 
dag.addOperator("AvroFileInputOperator", this.avroFileInputOperator);
+AvroToPojo avroToPojo = dag.addOperator("AvroGenericObjectToPojo", new 
AvroToPojo());
+
+dag.setOutputPortAttribute(avroToPojo.output, 
Context.PortContext.TUPLE_CLASS, pojoClass);
+
+dag.addStream("avroFileContainerToPojo", avroFileInputOperator.output, 
avroToPojo.data)
+.setLocality(DAG.Locality.CONTAINER_LOCAL);
+
+output.set(avroToPojo.output);
+errorPort.set(avroToPojo.errorPort);
+
+completedAvroFilesPort.set(avroFileInputOperator.completedFilesPort);
+avroErrorRecordsPort.set(avroFileInputOperator.errorRecordsPort);
+  }
+
+  public void setPojoClass(Class pojoClass)
+  {
+this.pojoClass = pojoClass;
+  }
+
+  public void setAvroFileDirectory(String directory)
+  {
+avroFileInputOperator.setDirectory(directory);
+  }
+}
diff --git 
a/contrib/src/test/java/com/datatorrent/contrib/avro/AvroFileToPojoModuleTest.java
 
b/contrib/src/test/java/com/datatorrent/contrib/avro/AvroFileToPojoModuleTest.java
new file mode 100644
index 00..696547512f
--- /dev/null
+++ 
b/contrib/src/test/java/com/datatorrent/contrib/avro/AvroFileToPojoModuleTest.java
@@ -0,0 +1,260 @@
+/**
+ * Licensed to the Apache Software Foundation 

Re: [Discuss] Design of the python execution operator

2017-12-20 Thread Pramod Immaneni
On Wed, Dec 20, 2017 at 3:34 PM, Thomas Weise  wrote:

> It is exciting to see this move forward, the ability to use Python opens
> many new possibilities.
>
> Regarding use of worker threads, this is a pattern that we are using
> elsewhere (for example in the Kafka input operator). When the operator
> performs blocking operations and consumes little memory and/or CPU, then it
> is more economic to first use threads to increase parallelism and
> throughput (up to a limit), and then the more expensive containers for
> horizontal scaling (multiple threads to make good use of container
> resources and then scale using the usual partitioning).
>

I think there is a difference. In case of kafka or other input operators
the threads are less constrained. They can operate with independence and
can dictate the pace limited only by back pressure. In this case the
operator is most likely going to be downsteram in the DAG and would have
constraints for processing guarantees. For scalability, container local
could also be used as a substitue for multiple threads without resorting to
using separate containers. I can understand use of a separate thread to be
able to get around problems like stalled processing but would first try to
see if something like container local would work for scaling.


> It is also correct that generally there is no ordering guarantee within a
> streaming window, and that would be the case when multiple input ports are
> present as well. (The platform cannot guarantee such ordering, this would
> need to be done by the operator).



> Idempotency can be expensive (latency and/or space complexity), and not all
> applications need it (like certain use cases that process record by record
> and don't accumulate state). An example might be Python logic that is used
> for scoring against a model that was built offline. Idempotency would
> actually be rather difficult to implement, since the operator would need to
> remember which tuples were emitted in a given interval and on replay block
> until they are available (and also hold others that may be processed sooner
> than in the original order). It may be easier to record emitted tuples to a
> WAL instead of reprocessing.
>

Ordering cannot be guaranteed but the operator would need to finish the
work it is given a window within the window boundary, otherwise there is a
chance for data loss in recovery scenarios. You could make checkpoint the
boundary by which all pending work is completed instead of every window
boundary but then downstream operators cannot rely on window level
idempotency for exactly once. Something like file output operator would
work but not our db kind of operator. Both options could be supported in
the operator.


> Regarding not emitting stragglers until the next input arrives, can this
> not be accomplished using IdleTimeHandler?
>
> What is preventing the use of virtual environments?
>
> Thanks,
> Thomas
>
>
> On Tue, Dec 19, 2017 at 8:19 AM, Pramod Immaneni 
> wrote:
>
> > Hi Ananth,
> >
> > From your explanation, it looks like the threads overall allow you to
> > achieve two things. Have some sort of overall timeout if by which a tuple
> > doesn't finish processing then it is flagged as such. Second, it doesn't
> > block processing of subsequent tuples and you can still process them
> > meeting the SLA. By checkpoint, however, I think you should try to have a
> > resolution one way or the other for all the tuples received within the
> > checkpoint period or every window boundary (see idempotency below),
> > otherwise, there is a chance of data loss in case of operator restarts.
> If
> > a loss is acceptable for stragglers you could let straggler processing
> > continue beyond checkpoint boundary and let them finish when they can.
> You
> > could support both behaviors by use of a property. Furthermore, you may
> not
> > want all threads stuck with stragglers and then you are back to square
> one
> > so you may need to stop processing stragglers beyond a certain thread
> usage
> > threshold. Is there a way to interrupt the processing of the engine?
> >
> > Then there is the question of idempotency. I suspect it would be
> difficult
> > to maintain it unless you wait for processing to finish for all tuples
> > received during the window every window boundary. You may provide an
> option
> > for relaxing the strict guarantees for the stragglers like mentioned
> above.
> >
> > Pramod
> >
> > On Thu, Dec 14, 2017 at 10:49 AM, Ananth G 
> wrote:
> >
> > > Hello Pramod,
> > >
> > > Thanks for the comments. I adjusted the title of the JIRA. Here is
> what I
> > > was thinking for the worker pool implementation.
> > >
> > > - The main reason ( which I forgot to mention in the design points
> below
> > )
> > > is that the java embedded engine allows only the thread that created
> the
> > > instance to execute the python logic. This is more because of the JNI
> > > 

Re: [Discuss] Design of the python execution operator

2017-12-20 Thread Thomas Weise
It is exciting to see this move forward, the ability to use Python opens
many new possibilities.

Regarding use of worker threads, this is a pattern that we are using
elsewhere (for example in the Kafka input operator). When the operator
performs blocking operations and consumes little memory and/or CPU, then it
is more economic to first use threads to increase parallelism and
throughput (up to a limit), and then the more expensive containers for
horizontal scaling (multiple threads to make good use of container
resources and then scale using the usual partitioning).

It is also correct that generally there is no ordering guarantee within a
streaming window, and that would be the case when multiple input ports are
present as well. (The platform cannot guarantee such ordering, this would
need to be done by the operator).

Idempotency can be expensive (latency and/or space complexity), and not all
applications need it (like certain use cases that process record by record
and don't accumulate state). An example might be Python logic that is used
for scoring against a model that was built offline. Idempotency would
actually be rather difficult to implement, since the operator would need to
remember which tuples were emitted in a given interval and on replay block
until they are available (and also hold others that may be processed sooner
than in the original order). It may be easier to record emitted tuples to a
WAL instead of reprocessing.

Regarding not emitting stragglers until the next input arrives, can this
not be accomplished using IdleTimeHandler?

What is preventing the use of virtual environments?

Thanks,
Thomas


On Tue, Dec 19, 2017 at 8:19 AM, Pramod Immaneni 
wrote:

> Hi Ananth,
>
> From your explanation, it looks like the threads overall allow you to
> achieve two things. Have some sort of overall timeout if by which a tuple
> doesn't finish processing then it is flagged as such. Second, it doesn't
> block processing of subsequent tuples and you can still process them
> meeting the SLA. By checkpoint, however, I think you should try to have a
> resolution one way or the other for all the tuples received within the
> checkpoint period or every window boundary (see idempotency below),
> otherwise, there is a chance of data loss in case of operator restarts. If
> a loss is acceptable for stragglers you could let straggler processing
> continue beyond checkpoint boundary and let them finish when they can. You
> could support both behaviors by use of a property. Furthermore, you may not
> want all threads stuck with stragglers and then you are back to square one
> so you may need to stop processing stragglers beyond a certain thread usage
> threshold. Is there a way to interrupt the processing of the engine?
>
> Then there is the question of idempotency. I suspect it would be difficult
> to maintain it unless you wait for processing to finish for all tuples
> received during the window every window boundary. You may provide an option
> for relaxing the strict guarantees for the stragglers like mentioned above.
>
> Pramod
>
> On Thu, Dec 14, 2017 at 10:49 AM, Ananth G  wrote:
>
> > Hello Pramod,
> >
> > Thanks for the comments. I adjusted the title of the JIRA. Here is what I
> > was thinking for the worker pool implementation.
> >
> > - The main reason ( which I forgot to mention in the design points below
> )
> > is that the java embedded engine allows only the thread that created the
> > instance to execute the python logic. This is more because of the JNI
> > specification itself. Some hints here https://stackoverflow.com/
> > questions/18056347/jni-calling-java-from-c-with-multiple-threads <
> > https://stackoverflow.com/questions/18056347/jni-
> calling-java-from-c-with-
> > multiple-threads> and here http://journals.ecs.soton.ac.
> > uk/java/tutorial/native1.1/implementing/sync.html <
> > http://journals.ecs.soton.ac.uk/java/tutorial/native1.1/
> > implementing/sync.html>
> >
> > - This essentially means that the main operator thread will have to call
> > the python code execution logic if the design were otherwise.
> >
> > - Since the end user can choose to can write any kind of logic including
> > blocking I/O as part of the implementation, I did not want to stall the
> > operator thread for any usage pattern.
> >
> > - In fact there is only one overall interpreter in the JVM process space
> > and the interpreter thread is just a JNI wrapper around it to account for
> > the JNI limitations above.
> >
> > - It is for the very same reason, there is an API in the implementation
> to
> > support for registering Shared Modules across all of the interpreter
> > threads. Use cases for this exist when there is a global variable
> provided
> > by the underlying Python library and loading it multiple times can cause
> > issues. Hence the API to register a shared module which can be used by
> all
> > of the Interpreter Threads.
> >
> > - The operator 

Re: [Proposal] Simulate setting for application launch

2017-12-20 Thread Priyanka Gugale
+1
Sometimes this context is required. We shouldn't change any default
behaviour other than making this config available.

-Priyanka



On Wed, Dec 20, 2017 at 5:32 AM, Pramod Immaneni 
wrote:

> The external system recording was just an example, not a specific use case.
> The idea is to provide comprehensive information to populateDAG as to the
> context it is being called under. It is akin to the test mode or simulate
> flag that you see with various utilities. The platform cannot control what
> populateDAG does, even without this information, in multiple calls that you
> mention the application can return different DAGs by depending on
> any external factor such as time of day or some external variable. This is
> to merely provide more context information in the config. It is upto the
> application to do what it wishes with it.
>
> On Tue, Dec 19, 2017 at 2:28 PM, Vlad Rozov  wrote:
>
> > -0.5: populateDAG() may be called by the platform as many times as it
> > needs (even in case it calls it only once now to launch an application).
> > Passing different parameters to populateDAG() in simulate launch mode and
> > actual launch may lead to different DAG being constructed for those two
> > modes. Can't the use case you described be handled by a plugin?
> >
> > Thank you,
> >
> > Vlad
> >
> >
> > On 12/19/17 10:06, Sanjay Pujare wrote:
> >
> >> +1 although I prefer something that is more enforceable. So I like the
> >> idea
> >> of another method but that introduces incompatibility so may be in 4.0?
> >>
> >> On Tue, Dec 19, 2017 at 9:40 AM, Munagala Ramanath <
> >> amberar...@yahoo.com.invalid> wrote:
> >>
> >>   +1
> >>> Ram
> >>>  On Tuesday, December 19, 2017, 8:33:21 AM PST, Pramod Immaneni <
> >>> pra...@datatorrent.com> wrote:
> >>>
> >>>   I have a mini proposal. The command get-app-package-info runs the
> >>> populateDAG method of an application to construct the DAG but does not
> >>> actually launch the DAG. An application developer does not know in
> which
> >>> context the populateDAG is being called. For example, if they are
> >>> recording
> >>> application starts in an external system from populateDAG, they will
> have
> >>> false entries there. This can be solved in different ways such as
> >>> introducing another method in StreamingApplication or more parameters
> >>> to populateDAG but a non disruptive option would be to add a property
> in
> >>> the configuration object that is passed to populateDAG to indicate if
> it
> >>> is
> >>> simulate/test mode or real launch. An application developer can use
> this
> >>> property to take the appropriate actions.
> >>>
> >>> Thanks
> >>>
> >>>
> >>>
> >
>