Re: Table program cannot be compiled
Hi Timo, Thanks for the link. Not sure i understand your suggestion though, is the goal here reducing the amount of parameters coming to the UDF? if thats the case i can maybe have the tag names there, but still need the expressions to get evaluated before entering the eval. Do you see this in a different way? I tried moving the tag names to be a member within the function instead of a parameter, but apparently i still have too many arguments. Let me know if this is not what you meant. Thanks! Shahar -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: Table program cannot be compiled
BTW looking at past posts on this issue[1] it should have been fixed? i'm using version 1.7.2 Also the recommendation was to use a custom function, though that's exactly what im doing with the conditionalArray function[2] Thanks! [1] http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/DataStreamCalcRule-1802-quot-grows-beyond-64-KB-when-execute-long-sql-td20832.html#a20841 [2] public class ConditionalArrayFunction extends ScalarFunction { public static final String NAME = "conditionalArray"; public String[] eval(Object... keyValues) { if (keyValues.length == 0) { return new String[]{}; } final List keyValuesList = Arrays.asList(keyValues); List trueItems = Lists.newArrayList(); for (int i = 0; i < keyValuesList.size(); i = i + 2){ final String key = (String)keyValuesList.get(i); final Object value = keyValuesList.get(i + 1); if (value != null && (boolean)value) trueItems.add(key); } return trueItems.toArray(new String[0]); } } -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: Table program cannot be compiled
In a subsequent run i get Caused by: org.codehaus.janino.JaninoRuntimeException: Code of method "split$3681$(LDataStreamCalcRule$3682;)V" of class "DataStreamCalcRule$3682" grows beyond 64 KB -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Table program cannot be compiled
Hey, While running a SQL query i get an OutOfMemoryError exception and "Table program cannot be compiled" [2]. In my scenario i'm trying to enrich an event using an array of tags, each tag has a boolean classification (like a WHERE clause) and with a custom function i'm filtering the array to keep only TRUE results. While i cannot publish the actual query the form of the query is as follows: /SELECT originalEvent, conditionalArray('Tag1', boolean_condition1, 'Tag2', boolean_condition2).. FROM../ (more info at the use case here[1] ) I do have quite a lot of those tags (counting 126 tags now) but i had something similar in the past without this error. Any idea about it and if you can think of a workaround to this issue? [1] http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Reconstruct-object-through-partial-select-query-td27782.html [2] *org.apache.flink.api.common.InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue.* at org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:36) at org.apache.flink.table.runtime.CRowProcessRunner.compile(CRowProcessRunner.scala:35) at org.apache.flink.table.runtime.CRowProcessRunner.open(CRowProcessRunner.scala:49) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) at org.apache.flink.streaming.api.operators.ProcessOperator.open(ProcessOperator.java:56) at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) at java.lang.Thread.run(Thread.java:745) *Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded* at java.util.HashMap.newNode(HashMap.java:1734) at java.util.HashMap.putVal(HashMap.java:641) at java.util.HashMap.putMapEntries(HashMap.java:514) at java.util.HashMap.putAll(HashMap.java:784) at org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3322) at org.codehaus.janino.UnitCompiler.access$4900(UnitCompiler.java:212) at org.codehaus.janino.UnitCompiler$8.visitLocalVariableDeclarationStatement(UnitCompiler.java:3207) at org.codehaus.janino.UnitCompiler$8.visitLocalVariableDeclarationStatement(UnitCompiler.java:3175) at org.codehaus.janino.Java$LocalVariableDeclarationStatement.accept(Java.java:3348) at org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3174) at org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3231) at org.codehaus.janino.UnitCompiler.access$3800(UnitCompiler.java:212) at org.codehaus.janino.UnitCompiler$8.visitBlock(UnitCompiler.java:3193) at org.codehaus.janino.UnitCompiler$8.visitBlock(UnitCompiler.java:3175) at org.codehaus.janino.Java$Block.accept(Java.java:2753) at org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3174) at org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3267) at org.codehaus.janino.UnitCompiler.access$4200(UnitCompiler.java:212) at org.codehaus.janino.UnitCompiler$8.visitIfStatement(UnitCompiler.java:3197) at org.codehaus.janino.UnitCompiler$8.visitIfStatement(UnitCompiler.java:3175) at org.codehaus.janino.Java$IfStatement.accept(Java.java:2923) at org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3174) at org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3163) at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:2986) at org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1313) at org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1286) at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:785) at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:436) at org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:212) at org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:390) at org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:385) at org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java.java:1405) -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: Reconstruct object through partial select query
Just to be more clear on my goal - Im trying to enrich the incoming stream with some meaningful tags based on conditions from the event itself. So the input stream could be an event looks like: Class Car { int year; String modelName; } i will have a config that are defining tags as: "NiceCar" -> "year > 2015 AND position("Luxury" in modelName) > 0" So ideally my output will be in the structure of Class TaggedEvent { Car origin; String[] tags; } -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reconstruct object through partial select query
Hey, I'm trying to create a SQL query which, given input from a stream with generic class T type will create a new stream which will be in the structure of { origin : T resultOfSomeSQLCalc : Array[String] } it seems that just by doing "SELECT *" i can convert the resulting table back to a stream of the origin object. Im trying to understand how do i create that envelope with the origin object. Something along the lines of "SELECT box(*), array(..)". I'm sensing i need a function for that, but not sure how that looks. Thanks for the help! Shahar -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: Calcite SQL Map to Pojo Map
Apparently the solution is to force map creating using UDF and to have the UDF return Types.GENERIC(Map.class) That makes them compatible and treated both as GenericType Thanks! -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Calcite SQL Map to Pojo Map
Im trying to convert a SQL query that has a select map[..] into a pojo with Map (using tableEnv.toRestractedStream ) It seems to fail when the field requestedTypeInfo is GenericTypeInfo with GenericType while the field type itself is MapTypeInfo with Map Exception in thread "main" org.apache.flink.table.api.TableException: Result field does not match requested type. Requested: GenericType; Actual: Map Any suggestion? Shahar -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: Schema Evolution on Dynamic Schema
Sorry to flood this thread, but keeping my experiments: so far i've been using retract to a Row and then mapping to a dynamic pojo that is created (using ByteBuddy) according to the select fields in the SQL. Considering the error I'm trying now to remove thr usage in Row and use the dynamic type directly when converting query result table to a retracted stream. However, since i dont have the compile time knowledge of the select field types (the created pojo has them as "Object") i don't think i can create a Kryo serializer for them (i guess by using Row i deferred my problem from compile time to schema evolution time by using row -> dynamic object conversion). So i guess i need to find a solution in a way that i can either: - figure out how to infer the type of a SQL select field based on the source table somehow - OR figure out how to create a (Kryo?) serializer that can convert to the dynamic object in a similar way to the RowSerializer (supporting Object fields). Would love to hear more thoughts -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: Schema Evolution on Dynamic Schema
Debugging locally it seems like the state descriptor of "GroupAggregateState" is creating an additional field (TypleSerializer of SumAccumulator) serializer within the RowSerializer. Im guessing this is what causing incompatibility? Is there any work around i can do? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: Schema Evolution on Dynamic Schema
Hi Fabian, It seems like it didn't work. Let me specify what i have done: i have a SQL that looks something like: Select a, sum(b), map[ 'sum_c', sum(c), 'sum_d', sum(d)] as my_map FROM... GROUP BY a As you said im preventing keys in the state forever by doing idle state retention time (+ im transforming to retracted stream along with a custom trigger that sends the data to the sink). I tried adding a new item to the map ( say 'sum_e', sum(e) ), cancelled with savepoint and rerun from that savepoint and got the same error as above about state incompatibility. Why do you think would that happen? Thanks Shahar -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: Map UDF : The Nothing type cannot have a serializer
Looking further into the RowType it seems like this field is translated as a CURSOR rather than a map.. not sure why -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Map UDF : The Nothing type cannot have a serializer
Hey, As im building a SQL query, im trying to conditionally build a map such that there won't be any keys with null values in it. AFAIK from Calcite there's no native way to do it (other than using case to build the map in different ways, but then i have a lot of key/value pairs so thats not reasonable). I tried to implement it using a flink UDF but failed - i wonder if a table function can return a map since im getting "The Nothing type cannot have a serializer". Example SQL: Select a, b, c, removeNulls('key1', d, 'key2', e, 'key3', f) AS my_map FROM... My Table Function code is: public class RemoveNullValuesFunction extends TableFunction> { public static final String NAME = "removeNulls"; public void eval(String... keyValues) { if (keyValues.length == 0) { collect(Collections.emptyMap()); return; } final List keyValuesList = Arrays.asList(keyValues); Map output = Maps.newHashMap(); for (int i = 0; i < keyValuesList.size(); i = i + 2){ final String key = keyValuesList.get(i); final String value = keyValuesList.get(i + 1); if (value != null) output.put(key, value); } collect(output); } @Override public TypeInformation> getResultType() { return Types.MAP(Types.STRING(),Types.STRING()); } } I wonder if thats not possible or am i missing something in how the serialization works? Thanks! Shahar -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: Schema Evolution on Dynamic Schema
Thanks Rong, I have made some quick test changing the SQL select (adding a select field in the middle) and reran the job from a savepoint and it worked without any errors. I want to make sure i understand how at what point the state is stored and how does it work. Let's simplify the scenario and forget my specific case of dynamically generated pojo. let's focus on generic steps of: Source->register table->SQL select and group by session->retracted stream (Row)->transformToPojo (Custom Map function) ->pushToSink And let's assume the SQL select is changed (a field is added somewhere in the middle of the select field). So: We had intermediate results that are in the old format that are loaded from state to the new Row object in the retracted stream. is that an accurate statement? at what operator/format is the state stored in this case? is it the SQL result/Row? is it the Pojo? as this scenario does not fail for me im trying to understand how/where it is handled in Flink? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Schema Evolution on Dynamic Schema
Hey, My job is built on SQL that is injected as an input to the job. so lets take an example of Select a,max(b) as MaxB,max(c) as MaxC FROM registered_table GROUP BY a (side note: in order for the state not to grow indefinitely i'm transforming to a retracted stream and filtering based on a custom trigger) In order to get the output as a Json format i basically created a way to dynamically generate a class and registering it to the class loader, so when transforming to the retracted stream im doing something like: Table result = tableEnv.sqlQuery(sqlExpression); tableEnv.toRetractStream(result, Row.class, config) .filter(tuple -> tuple.f0) .map(new RowToDynamicClassMapper(sqlSelectFields)) .addSink(..) This actually works pretty good (though i do need to make sure to register the dynamic class to the class loader whenever the state is loaded) Im now looking into "schema evolution" - which basically means what happens when the query is changed (say max(c) is removed, and maybe max(d) is added). I dont know if that fits the classic "schema evolution" feature or should that be thought about differently. Would be happy to get some thoughts. Thanks! -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reducing the number of unique metrics
Hey All, Just read the excellent monitoring blog post https://flink.apache.org/news/2019/02/25/monitoring-best-practices.html I'm looking on reducing the number of unique metrics, especially on items i can compromise on consolidating such as using indices instead of ids. Specifically looking at task managers - i dont mind just having index of the task manager such that multiple runs of the flink application (and Yarn containers on EMR) won't create more unique metric paths but rather reuse existing paths (tm.1, tm.2 .. ). Have anyone managed to do it or had to meet the same requirement? Thanks! Shahar -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: Linkage error when using DropwizardMeterWrapper
Hey Jayant. Getting the same using gradle. my metrics reporter and my application both using the flink-metrics-dropwizard dependency for reporting Meters. how should i be solving it? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: SQL Query named operator exceeds 80 characters
OK, thanks for the help -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
SQL Query named operator exceeds 80 characters
It seems like the operator name for a SQL group by is the query string itself. I get "The operator name groupBy: (myGroupField), select: (myGroupField, someOther... )... exceeded the 80 characters length limit and was truncated" Is there a way to name the SQL query operator? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: Kinesis Shards and Parallelism
Actually was looking at the task manager level, i did have more slots than shards, so it does make sense i had an idle task manager while other task managers split the shards between their slots Thanks! -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Kinesis Shards and Parallelism
Looking at the doc https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kinesis.html "When the number of shards is larger than the parallelism of the consumer, then each consumer subtask can subscribe to multiple shards; otherwise if the number of shards is smaller than the parallelism of the consumer, then some consumer subtasks will simply be idle and wait until it gets assigned new shards" I have the *same number of shards as the configured parallelism*. Seems though a task is grabbing multiple shards while others are idle. is that an expected behavior? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: FLINK Kinesis Connector Error - ProvisionedThroughputExceededException
If it's running in parallel aren't you just adding readers which maxes out your provisioned throughput? probably doesn't belong in here but rather a Kinesis thing, but i suggest increasing your number of shards? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: Manually clean SQL keyed state
Thanks Fabian! -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Manually clean SQL keyed state
I have a scenario in which i do a non-windowed group by using SQL. something like "Select count(*) as events, shouldTrigger(..) as shouldTrigger from source group by sessionId" i'm then converting to a retracted stream, filtering by "add" messages, then further filtering by "shouldTrigger" field and sends out the result to a sink. While i'm using the query config (idle state retention time), it seems like i can reduce the state size by clearing the state of the specific session id earlier ("shouldTrigger" marks the end of the session rather than a timed window). Is there a way for me to clear that state assuming i need to use the SQL API? Thanks! Shahar -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: Custom Trigger + SQL Pattern
following up on the actual question - is there a way to register a keyedstream as table(s) and have a trigger per key? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: Dynamically Generated Classes - Cannot load user class
OK I think i figured it out - not sure though exactly the reason: It seems that i need to have a stream type - Generic Type of the super class - rather than a Pojo of the concrete generated class. It seems like the operation definition otherwise cannot load the Pojo class on the task creation. So - if i don't declare the map produced type as the concrete generated class and then work around the keyby which cannot use a field name to a key selector. Doing all of that seems to work. Will be happy to hear about the reason for it more in depth if anyone knows. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: Dynamically Generated Classes - Cannot load user class
Update on this - if i just do empty mapping and drop the sql part, it works just fine. i wonder if there's any class loading that needs to be done when using SQL, not sure how i do that -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: Dynamically Generated Classes - Cannot load user class
After removing some operators (which i still need, but wanted to understand where my issues are) i get a slightly different stacktrace (though still same issue). my current operators are 1. a sql select with group by (returns retracted stream ) 2. filter (take only non retracted) 3. map (tuple to Row) 3. map (Row to MyGeneratedClass -> this implements the classloader load of the generated class on open()) org.apache.flink.streaming.runtime.tasks.StreamTaskException: Could not instantiate outputs in order. at org.apache.flink.streaming.api.graph.StreamConfig.getOutEdgesInOrder(StreamConfig.java:398) at org.apache.flink.streaming.runtime.tasks.StreamTask.createStreamRecordWriters(StreamTask.java:1165) at org.apache.flink.streaming.runtime.tasks.StreamTask.(StreamTask.java:214) at org.apache.flink.streaming.runtime.tasks.StreamTask.(StreamTask.java:193) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.(SourceStreamTask.java:51) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at org.apache.flink.runtime.taskmanager.Task.loadAndInstantiateInvokable(Task.java:1445) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:680) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.ClassNotFoundException: com.model.MyGeneratedClass at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:129) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:348) at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:76) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1868) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751) at java.io.ObjectInputStream.readClass(ObjectInputStream.java:1716) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1556) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287) at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:561) at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.readObject(PojoSerializer.java:1038) at sun.reflect.GeneratedMethodAccessor37.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1170) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2178) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431) at java.util.ArrayList.readObject(ArrayList.java:797) at sun.reflect.GeneratedMethodAccessor18.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1170) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2178) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
Dynamically Generated Classes - Cannot load user class
Hey, I'm trying to run a job which uses a dynamically generated class (through Byte Buddy). think of me having a complex schema as yaml text and generating a class from it. Throughout the job i am using an artificial super class (MySuperClass) of the generated class (as for example i need to specify the generic class to extend RichMapFunction). MyRichMapFunction extends RichMapFunction is introducing the dynamic class. It will take the yaml in the CTOR and: 1. open - takes the schema and converts it into a Pojo class which extends MySuperClass 2. getProducedType - does the same thing in order to correctly send the Pojo with all the right fields So basically my job is something like env.addSource([stream of pojos]) .filter(...) ... (register table, running a query which generates Rows) .map(myRichMapFunction) .returns(myRichMapFunction.getProducedType) .addSink(...) My trouble now is that, when running on a cluster the classloader fails to load my generated class. i tried to use getRuntimeContext().getUserCodeClassLoader() as the loader to use for Byte Buddy - but doesnt seem to be enough. Was reading about it here: https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/debugging_classloading.html Is there a hook maybe to get called when a job is loaded so i can load the class? Stacktrace: org.apache.flink.client.program.ProgramInvocationException: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user class: commodel.MyGeneratedClass ClassLoader info: URL ClassLoader: file: '/var/folders/f7/c4pvjrf902b6c73_tbzkxnjwgn/T/blobStore-4b685b0a-b8c1-43a1-a75d-f0b9c0156f4c/job_d1187ea7e783007b92ef6c0597d72fcb/blob_p-38b9e6dce2423b0374f82842a35dcaa92e10dedd-6f1056ab61afcccb3c1fca895ccb3eb0' (valid JAR) Class not resolvable through given classloader. at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:264) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:464) at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66) at com.MainClass.main(MainClass.java:46) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:420) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:404) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:785) at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:279) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:214) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1025) at org.apache.flink.client.cli.CliFrontend.lambda$main$9(CliFrontend.java:1101) at org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1101) Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user class: com.model.DynamicSchema ClassLoader info: URL ClassLoader: file: '/var/folders/f7/c4pvjrf902b6c73_tbzkxnjwgn/T/blobStore-4b685b0a-b8c1-43a1-a75d-f0b9c0156f4c/job_d1187ea7e783007b92ef6c0597d72fcb/blob_p-38b9e6dce2423b0374f82842a35dcaa92e10dedd-6f1056ab61afcccb3c1fca895ccb3eb0' (valid JAR) Class not resolvable through given classloader. at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:236) at org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:99) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:273) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) at java.lang.Thread.run(Thread.java:748) -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Fire and Purge with Idle State
Hey Say im aggregating an event stream by sessionId in SQL and im emitting the results once the session is "over", i guess i should be using Fire and Purge - i dont expect to need to session data once over. How should i treat the Idle state retention time - is it needed at all if im using purge? will it become relevant only if a session is both never-ending AND never has more records? Thanks! -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Custom Trigger + SQL Pattern
Hey! I have a use case in which im grouping a stream by session id - so far pretty standard, note that i need to do it through SQL and not by the table api. In my use case i have 2 trigger conditions though - while one is time (session inactivity) the other is based on a specific event marked as a "last" event. AFAIK SQL does not support custom triggers - so what i end up doing is doing group by in the SQL - then converting the result to a stream along with a boolean field that marks whether at least one of the events was the end event - then adding my custom trigger on top of it. It looks something like this: Table result = tableEnv.sqlQuery("select atLeastOneTrue(lastEvent), sessionId, count(*) FROM source Group By sessionId"); tableEnv.toRetractStream(result, Row.class, streamQueryConfig) .filter(tuple -> tuple.f0) .map(...) .returns(...) .keyBy("sessionId") .window(EventTimeSessionWindows.withGap(Time.hours(4))) .trigger(new SessionEndedByTimeOrEndTrigger()) .process(...take last element from the group by result..) This seems like a weird work around to, isn't it? my window is basically of the SQL result rather than on the source stream. Ideally i would keyby the sessionId before running the SQL but then a) would I need to register a table per key? b) would i be able to use the custom trigger per window? basically i want to group by session id and have a window for every session that supports both time and custom trigger. Assuming i need to use SQL (reason is the query is dynamically loaded), is there a better solution for it? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/