Re: Table program cannot be compiled

2019-05-16 Thread shkob1
Hi Timo,

Thanks for the link.
Not sure i understand your suggestion though, is the goal here reducing the
amount of parameters coming to the UDF? if thats the case i can maybe have
the tag names there, but still need the expressions to get evaluated before
entering the eval. Do you see this in a different way?

I tried moving the tag names to be a member within the function instead of a
parameter, but apparently i still have too many arguments.

Let me know if this is not what you meant.

Thanks!
Shahar



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Table program cannot be compiled

2019-05-14 Thread shkob1
BTW looking at past posts on this issue[1] it should have been fixed? i'm
using version 1.7.2
Also the recommendation was to use a custom function, though that's exactly
what im doing with the conditionalArray function[2] 

Thanks!

[1]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/DataStreamCalcRule-1802-quot-grows-beyond-64-KB-when-execute-long-sql-td20832.html#a20841

[2]
public class ConditionalArrayFunction extends ScalarFunction {

public static final String NAME = "conditionalArray";

public String[] eval(Object... keyValues) {
if (keyValues.length == 0) {
return new String[]{};
}
final List keyValuesList = Arrays.asList(keyValues);
List trueItems = Lists.newArrayList();
for (int i = 0; i < keyValuesList.size(); i = i + 2){
final String key = (String)keyValuesList.get(i);
final Object value = keyValuesList.get(i + 1);

if (value != null && (boolean)value)
trueItems.add(key);
}
return trueItems.toArray(new String[0]);
}
}




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Table program cannot be compiled

2019-05-14 Thread shkob1
In a subsequent run i get
Caused by: org.codehaus.janino.JaninoRuntimeException: Code of method
"split$3681$(LDataStreamCalcRule$3682;)V" of class "DataStreamCalcRule$3682"
grows beyond 64 KB
 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Table program cannot be compiled

2019-05-14 Thread shkob1
Hey,

While running a SQL query i get an OutOfMemoryError exception and "Table
program cannot be compiled" [2].
In my scenario i'm trying to enrich an event using an array of tags, each
tag has a boolean classification (like a WHERE clause) and with a custom
function i'm filtering the array to keep only TRUE results.
While i cannot publish the actual query the form of the query is as follows:

/SELECT originalEvent, conditionalArray('Tag1', boolean_condition1, 'Tag2',
boolean_condition2).. FROM../
(more info at the use case here[1] ) 

I do have quite a lot of those tags (counting 126 tags now) but i had
something similar in the past without this error.
Any idea about it and if you can think of a workaround to this issue?

[1]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Reconstruct-object-through-partial-select-query-td27782.html

[2]
*org.apache.flink.api.common.InvalidProgramException: Table program cannot
be compiled. This is a bug. Please file an issue.*
at 
org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:36)
at
org.apache.flink.table.runtime.CRowProcessRunner.compile(CRowProcessRunner.scala:35)
at
org.apache.flink.table.runtime.CRowProcessRunner.open(CRowProcessRunner.scala:49)
at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at
org.apache.flink.streaming.api.operators.ProcessOperator.open(ProcessOperator.java:56)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
at java.lang.Thread.run(Thread.java:745)
*Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded*
at java.util.HashMap.newNode(HashMap.java:1734)
at java.util.HashMap.putVal(HashMap.java:641)
at java.util.HashMap.putMapEntries(HashMap.java:514)
at java.util.HashMap.putAll(HashMap.java:784)
at
org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3322)
at org.codehaus.janino.UnitCompiler.access$4900(UnitCompiler.java:212)
at
org.codehaus.janino.UnitCompiler$8.visitLocalVariableDeclarationStatement(UnitCompiler.java:3207)
at
org.codehaus.janino.UnitCompiler$8.visitLocalVariableDeclarationStatement(UnitCompiler.java:3175)
at
org.codehaus.janino.Java$LocalVariableDeclarationStatement.accept(Java.java:3348)
at
org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3174)
at
org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3231)
at org.codehaus.janino.UnitCompiler.access$3800(UnitCompiler.java:212)
at org.codehaus.janino.UnitCompiler$8.visitBlock(UnitCompiler.java:3193)
at org.codehaus.janino.UnitCompiler$8.visitBlock(UnitCompiler.java:3175)
at org.codehaus.janino.Java$Block.accept(Java.java:2753)
at
org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3174)
at
org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3267)
at org.codehaus.janino.UnitCompiler.access$4200(UnitCompiler.java:212)
at
org.codehaus.janino.UnitCompiler$8.visitIfStatement(UnitCompiler.java:3197)
at
org.codehaus.janino.UnitCompiler$8.visitIfStatement(UnitCompiler.java:3175)
at org.codehaus.janino.Java$IfStatement.accept(Java.java:2923)
at
org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3174)
at
org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3163)
at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:2986)
at
org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1313)
at
org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1286)
at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:785)
at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:436)
at org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:212)
at
org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:390)
at
org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:385)
at
org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java.java:1405)



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Reconstruct object through partial select query

2019-05-08 Thread shkob1
Just to be more clear on my goal -
Im trying to enrich the incoming stream with some meaningful tags based on
conditions from the event itself.
So the input stream could be an event looks like:
Class Car {
  int year;
  String modelName;
} 

i will have a config that are defining tags as:
"NiceCar" -> "year > 2015 AND position("Luxury" in modelName) > 0"

So ideally my output will be in the structure of

Class TaggedEvent {
   Car origin;
   String[] tags;
}






--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Reconstruct object through partial select query

2019-05-08 Thread shkob1
Hey,

I'm trying to create a SQL query which, given input from a stream with
generic class T type will create a new stream which will be in the structure
of 
{
  origin : T
  resultOfSomeSQLCalc : Array[String]
}

it seems that just by doing "SELECT *" i can convert the resulting table
back to a stream of the origin object. 
Im trying to understand how do i create that envelope with the origin
object. Something along the lines of "SELECT box(*), array(..)". I'm sensing
i need a function for that, but not sure how that looks.

Thanks for the help!
Shahar




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Calcite SQL Map to Pojo Map

2019-03-28 Thread shkob1
Apparently the solution is to force map creating using UDF and to have the
UDF return Types.GENERIC(Map.class)
That makes them compatible and treated both as GenericType

Thanks!



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Calcite SQL Map to Pojo Map

2019-03-27 Thread shkob1
Im trying to convert a SQL query that has a select map[..] into a pojo with
Map (using tableEnv.toRestractedStream )
It seems to fail when the field requestedTypeInfo is GenericTypeInfo with
GenericType while the field type itself is MapTypeInfo with
Map


Exception in thread "main" org.apache.flink.table.api.TableException: Result
field does not match requested type. Requested: GenericType;
Actual: Map

Any suggestion? 
Shahar



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Schema Evolution on Dynamic Schema

2019-03-26 Thread shkob1
Sorry to flood this thread, but keeping my experiments:

so far i've been using retract to a Row and then mapping to a dynamic pojo
that is created (using ByteBuddy) according to the select fields in the SQL.
Considering the error I'm trying now to remove thr usage in Row and use the
dynamic type directly when converting query result table to a retracted
stream.

However, since i dont have the compile time knowledge of the select field
types (the created pojo has them as "Object") i don't think i can create a
Kryo serializer for them (i guess by using Row i deferred my problem from
compile time to schema evolution time by using row -> dynamic object
conversion).

So i guess i need to find a solution in a way that i can either:
- figure out how to infer the type of a SQL select field based on the source
table somehow
- OR figure out how to create a (Kryo?) serializer that can convert to the
dynamic object in a similar way to the RowSerializer (supporting Object
fields). 

Would love to hear more thoughts







--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Schema Evolution on Dynamic Schema

2019-03-26 Thread shkob1
Debugging locally it seems like the state descriptor of "GroupAggregateState"
is creating an additional field (TypleSerializer of SumAccumulator)
serializer within the RowSerializer. Im guessing this is what causing
incompatibility? Is there any work around i can do?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Schema Evolution on Dynamic Schema

2019-03-26 Thread shkob1
Hi Fabian,

It seems like it didn't work.
Let me specify what i have done:

i have a SQL that looks something like:
Select a, sum(b), map[ 'sum_c', sum(c), 'sum_d', sum(d)] as my_map FROM... 
GROUP BY a

As you said im preventing keys in the state forever by doing idle state
retention time (+ im transforming to retracted stream along with a custom
trigger that sends the data to the sink).

I tried adding a new item to the map ( say 'sum_e', sum(e) ), cancelled with
savepoint and rerun from that savepoint and got the same error as above
about state incompatibility. 

Why do you think would that happen? 

Thanks
Shahar






--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Map UDF : The Nothing type cannot have a serializer

2019-03-21 Thread shkob1
Looking further into the RowType it seems like this field is translated as a
CURSOR rather than a map.. not sure why



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Map UDF : The Nothing type cannot have a serializer

2019-03-21 Thread shkob1
Hey,

As im building a SQL query, im trying to conditionally build a map such that
there won't be any keys with null values in it. AFAIK from Calcite there's
no native way to do it (other than using case to build the map in different
ways, but then i have a lot of key/value pairs so thats not reasonable). I
tried to implement it using a flink UDF but failed - i wonder if a table
function can return a map since im getting "The Nothing type cannot have a
serializer".

Example SQL:

Select a, b, c, removeNulls('key1', d, 'key2', e, 'key3', f) AS my_map
FROM... 

My Table Function code is:

public class RemoveNullValuesFunction extends
TableFunction> {

public static final String NAME = "removeNulls";

public void eval(String... keyValues) {

if (keyValues.length == 0) {
collect(Collections.emptyMap());
return;
}
final List keyValuesList = Arrays.asList(keyValues);
Map output = Maps.newHashMap();
for (int i = 0; i < keyValuesList.size(); i = i + 2){
final String key = keyValuesList.get(i);
final String value = keyValuesList.get(i + 1);
if (value != null)
output.put(key, value);
}
collect(output);

}

@Override
public TypeInformation> getResultType() {
return Types.MAP(Types.STRING(),Types.STRING());
}
}


I wonder if thats not possible or am i missing something in how the
serialization works?

Thanks!
Shahar







--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Schema Evolution on Dynamic Schema

2019-03-08 Thread shkob1
Thanks Rong,

I have made some quick test changing the SQL select (adding a select field
in the middle) and reran the job from a savepoint and it worked without any
errors. I want to make sure i understand how at what point the state is
stored and how does it work.

Let's simplify the scenario and forget my specific case of dynamically
generated pojo. let's focus on generic steps of: 
Source->register table->SQL select and group by session->retracted stream
(Row)->transformToPojo (Custom Map function) ->pushToSink

And let's assume the SQL select is changed (a field is added somewhere in
the middle of the select field).
So:
We had intermediate results that are in the old format that are loaded from
state to the new Row object in the retracted stream. is that an accurate
statement? at what operator/format is the state stored in this case? is it
the SQL result/Row? is it the Pojo? as this scenario does not fail for me im
trying to understand how/where it is handled in Flink?





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Schema Evolution on Dynamic Schema

2019-03-06 Thread shkob1
Hey,

My job is built on SQL that is injected as an input to the job. so lets take
an example of 

Select a,max(b) as MaxB,max(c) as MaxC FROM registered_table GROUP BY a

(side note: in order for the state not to grow indefinitely i'm transforming
to a retracted stream and filtering based on a custom trigger)

In order to get the output as a Json format i basically created a way to
dynamically generate a class and registering it to the class loader, so when
transforming to the retracted stream im doing something like:

Table result = tableEnv.sqlQuery(sqlExpression);
tableEnv.toRetractStream(result, Row.class, config)
.filter(tuple -> tuple.f0)
.map(new RowToDynamicClassMapper(sqlSelectFields))
.addSink(..)

This actually works pretty good (though i do need to make sure to register
the dynamic class to the class loader whenever the state is loaded)

Im now looking into "schema evolution" - which basically means what happens
when the query is changed (say max(c) is removed, and maybe max(d) is
added). I dont know if that fits the classic "schema evolution" feature or
should that be thought about differently. Would be happy to get some
thoughts.

Thanks!







--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Reducing the number of unique metrics

2019-02-26 Thread shkob1
Hey All,

Just read the excellent monitoring blog post
https://flink.apache.org/news/2019/02/25/monitoring-best-practices.html

I'm looking on reducing the number of unique metrics, especially on items i
can compromise on consolidating such as using indices instead of ids.
Specifically looking at task managers - i dont mind just having index of the
task manager such that multiple runs of the flink application (and Yarn
containers on EMR) won't create more unique metric paths but rather reuse
existing paths (tm.1, tm.2 .. ). Have anyone managed to do it or had to meet
the same requirement?

Thanks!
Shahar



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Linkage error when using DropwizardMeterWrapper

2019-02-14 Thread shkob1
Hey Jayant. Getting the same using gradle. my metrics reporter and my
application both using the flink-metrics-dropwizard dependency for reporting
Meters. how should i be solving it?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: SQL Query named operator exceeds 80 characters

2018-11-29 Thread shkob1
OK, thanks for the help



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


SQL Query named operator exceeds 80 characters

2018-11-28 Thread shkob1
It seems like the operator name for a SQL group by is the query string
itself. I get
"The operator name groupBy: (myGroupField), select: (myGroupField,
someOther... )... exceeded the 80 characters length limit and was truncated"

Is there a way to name the SQL query operator? 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Kinesis Shards and Parallelism

2018-11-16 Thread shkob1
Actually was looking at the task manager level, i did have more slots than
shards, so it does make sense i had an idle task manager while other task
managers split the shards between their slots

Thanks!



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Kinesis Shards and Parallelism

2018-11-12 Thread shkob1
Looking at the doc
https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kinesis.html
"When the number of shards is larger than the parallelism of the consumer,
then each consumer subtask can subscribe to multiple shards; otherwise if
the number of shards is smaller than the parallelism of the consumer, then
some consumer subtasks will simply be idle and wait until it gets assigned
new shards"

I have the *same number of shards as the configured parallelism*. Seems
though a task is grabbing multiple shards while others are idle. is that an
expected behavior?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: FLINK Kinesis Connector Error - ProvisionedThroughputExceededException

2018-11-09 Thread shkob1
If it's running in parallel aren't you just adding readers which maxes out
your provisioned throughput? probably doesn't belong in here but rather a
Kinesis thing, but i suggest increasing your number of shards?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Manually clean SQL keyed state

2018-11-09 Thread shkob1
Thanks Fabian!



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Manually clean SQL keyed state

2018-11-08 Thread shkob1
I have a scenario in which i do a non-windowed group by using SQL. something
like

"Select count(*) as events, shouldTrigger(..) as shouldTrigger from source
group by sessionId"
i'm then converting to a retracted stream, filtering by "add" messages, then
further filtering by "shouldTrigger" field and sends out the result to a
sink.

While i'm using the query config (idle state retention time), it seems like
i can reduce the state size by clearing the state of the specific session id
earlier ("shouldTrigger" marks the end of the session rather than a timed
window). 

Is there a way for me to clear that state assuming i need to use the SQL
API? 

Thanks!
Shahar





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Custom Trigger + SQL Pattern

2018-10-26 Thread shkob1
following up on the actual question - is there a way to register a
keyedstream as table(s) and have a trigger per key? 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Dynamically Generated Classes - Cannot load user class

2018-10-24 Thread shkob1
OK I think i figured it out - not sure though exactly the reason:

It seems that i need to have a stream type - Generic Type of the super class
- rather than a Pojo of the concrete generated class. It seems like the
operation definition otherwise cannot load the Pojo class on the task
creation.
So - if i don't declare the map produced type as the concrete generated
class and then work around the keyby which cannot use a field name to a key
selector. 
Doing all of that seems to work. Will be happy to hear about the reason for
it more in depth if anyone knows.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Dynamically Generated Classes - Cannot load user class

2018-10-23 Thread shkob1
Update on this - if i just do empty mapping and drop the sql part, it works
just fine. i wonder if there's any class loading that needs to be done when
using SQL, not sure how i do that



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Dynamically Generated Classes - Cannot load user class

2018-10-23 Thread shkob1
After removing some operators (which i still need, but wanted to understand
where my issues are) i get a slightly different stacktrace (though still
same issue).

my current operators are 
1. a sql select with group by (returns retracted stream  )
2. filter (take only non retracted)
3. map (tuple to Row)
3. map (Row to MyGeneratedClass -> this implements the classloader load of
the generated class on open())


org.apache.flink.streaming.runtime.tasks.StreamTaskException: Could not
instantiate outputs in order.
at
org.apache.flink.streaming.api.graph.StreamConfig.getOutEdgesInOrder(StreamConfig.java:398)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.createStreamRecordWriters(StreamTask.java:1165)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.(StreamTask.java:214)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.(StreamTask.java:193)
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.(SourceStreamTask.java:51)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at
org.apache.flink.runtime.taskmanager.Task.loadAndInstantiateInvokable(Task.java:1445)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:680)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: com.model.MyGeneratedClass
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:129)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at
org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:76)
at 
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1868)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)
at java.io.ObjectInputStream.readClass(ObjectInputStream.java:1716)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1556)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
at 
java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:561)
at
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.readObject(PojoSerializer.java:1038)
at sun.reflect.GeneratedMethodAccessor37.invoke(Unknown Source)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1170)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2178)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
at java.util.ArrayList.readObject(ArrayList.java:797)
at sun.reflect.GeneratedMethodAccessor18.invoke(Unknown Source)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1170)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2178)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
   

Dynamically Generated Classes - Cannot load user class

2018-10-22 Thread shkob1
Hey,

I'm trying to run a job which uses a dynamically generated class (through
Byte Buddy).
think of me having a complex schema as yaml text and generating a class from
it. Throughout the job i am using an artificial super class (MySuperClass)
of the generated class (as for example i need to specify the generic class
to extend RichMapFunction).



MyRichMapFunction extends RichMapFunction is
introducing the dynamic class. It will take the yaml in the CTOR and:
1. open - takes the schema and converts it into a Pojo class which extends
MySuperClass
2. getProducedType - does the same thing in order to correctly send the Pojo
with all the right fields

So basically my job is something like

env.addSource([stream of pojos])
.filter(...)
... (register table, running a query which generates Rows)
.map(myRichMapFunction)
.returns(myRichMapFunction.getProducedType)
.addSink(...)

My trouble now is that, when running on a cluster the classloader fails to
load my generated class.
i tried to use getRuntimeContext().getUserCodeClassLoader() as the loader to
use for Byte Buddy - but doesnt seem to be enough.

Was reading about it here:
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/debugging_classloading.html
 
Is there a hook maybe to get called when a job is loaded so i can load the
class?


Stacktrace:

org.apache.flink.client.program.ProgramInvocationException:
org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load
user class: commodel.MyGeneratedClass
ClassLoader info: URL ClassLoader:
file:
'/var/folders/f7/c4pvjrf902b6c73_tbzkxnjwgn/T/blobStore-4b685b0a-b8c1-43a1-a75d-f0b9c0156f4c/job_d1187ea7e783007b92ef6c0597d72fcb/blob_p-38b9e6dce2423b0374f82842a35dcaa92e10dedd-6f1056ab61afcccb3c1fca895ccb3eb0'
(valid JAR)
Class not resolvable through given classloader.
at
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:264)
at
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:464)
at
org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
at com.MainClass.main(MainClass.java:46)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:420)
at
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:404)
at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:785)
at 
org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:279)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:214)
at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1025)
at
org.apache.flink.client.cli.CliFrontend.lambda$main$9(CliFrontend.java:1101)
at
org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1101)
Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException:
Cannot load user class: com.model.DynamicSchema
ClassLoader info: URL ClassLoader:
file:
'/var/folders/f7/c4pvjrf902b6c73_tbzkxnjwgn/T/blobStore-4b685b0a-b8c1-43a1-a75d-f0b9c0156f4c/job_d1187ea7e783007b92ef6c0597d72fcb/blob_p-38b9e6dce2423b0374f82842a35dcaa92e10dedd-6f1056ab61afcccb3c1fca895ccb3eb0'
(valid JAR)
Class not resolvable through given classloader.
at
org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:236)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:99)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:273)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
at java.lang.Thread.run(Thread.java:748)





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Fire and Purge with Idle State

2018-10-12 Thread shkob1
Hey

Say im aggregating an event stream by sessionId in SQL and im emitting the
results once the session is "over", i guess i should be using Fire and Purge
- i dont expect to need to session data once over. How should i treat the
Idle state retention time - is it needed at all if im using purge? will it
become relevant only if a session is both never-ending AND never has more
records?

Thanks!



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Custom Trigger + SQL Pattern

2018-10-12 Thread shkob1
Hey!

I have a use case in which im grouping a stream by session id - so far
pretty standard, note that i need to do it through SQL and not by the table
api.
In my use case i have 2 trigger conditions though - while one is time
(session inactivity) the other is based on a specific event marked as a
"last" event.
AFAIK SQL does not support custom triggers - so what i end up doing is doing
group by in the SQL - then converting the result to a stream along with a
boolean field that marks whether at least one of the events was the end
event - then adding my custom trigger on top of it. 
It looks something like this:

 Table result = tableEnv.sqlQuery("select atLeastOneTrue(lastEvent),
sessionId, count(*) FROM source Group By sessionId");
tableEnv.toRetractStream(result, Row.class, streamQueryConfig)
.filter(tuple -> tuple.f0)
.map(...)
.returns(...)
.keyBy("sessionId")
.window(EventTimeSessionWindows.withGap(Time.hours(4)))
.trigger(new SessionEndedByTimeOrEndTrigger())
.process(...take last element from the group by result..)

This seems like a weird work around to, isn't it? my window is basically of
the SQL result rather than on the source stream. Ideally i would keyby the
sessionId before running the SQL but then a) would I need to register a
table per key? b) would i be able to use the custom trigger per window?

basically i want to group by session id and have a window for every session
that supports both time and custom trigger. Assuming i need to use SQL
(reason is the query is dynamically loaded), is there a better solution for
it?










--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/