Thanks! Looks like that worked! Fwiw, the error message is very confusing. Is there any way this can be improved?
Thanks :) On Sun, Oct 25, 2020 at 6:42 PM Xingbo Huang <hxbks...@gmail.com> wrote: > Hi, > I think you can directly declare `def accumulate(acc: MembershipsIDsAcc, > value1: Long, value2: Boolean)` > > Best, > Xingbo > > Rex Fenley <r...@remind101.com> 于2020年10月26日周一 上午9:28写道: > >> If I switch accumulate to the following: >> def accumulate(acc: MembershipsIDsAcc, value: >> org.apache.flink.api.java.tuple.Tuple2[java.lang.Long, java.lang.Boolean]): >> Unit = {...} >> >> >> I instead receive: >> >> Tuple needs to be parameterized by using generics. >> >> org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:847) >> >> org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:781) >> >> org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:735) >> >> org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:731) >> ... >> >> On Sun, Oct 25, 2020 at 6:24 PM Rex Fenley <r...@remind101.com> wrote: >> >>> Imports: >>> >>> import java.util.Date >>> import org.apache.flink.api.scala._ >>> import org.apache.flink.api.common.typeinfo.TypeInformation >>> import org.apache.flink.api.java.typeutils.RowTypeInfo >>> import org.apache.flink.table.api.{ >>> DataTypes, >>> EnvironmentSettings, >>> TableEnvironment, >>> TableSchema >>> } >>> import org.apache.flink.table.api.bridge.scala._ >>> import org.apache.flink.table.data.RowData >>> import org.apache.flink.streaming.api.scala.{ >>> DataStream, >>> StreamExecutionEnvironment >>> } >>> import org.apache.flink.types.Row >>> import org.apache.flink.table.api._ >>> import >>> org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup >>> import >>> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchUpsertTableSinkFactoryBase >>> import >>> org.apache.flink.streaming.connectors.elasticsearch7.Elasticsearch7UpsertTableSink >>> import >>> org.apache.flink.streaming.connectors.elasticsearch7.Elasticsearch7UpsertTableSinkFactory >>> import org.apache.flink.table.descriptors.Elasticsearch >>> import org.apache.flink.table.descriptors._ >>> import org.apache.flink.table.factories.SerializationFormatFactory >>> import org.apache.flink.formats.json >>> import org.apache.flink.table.functions.AggregateFunction >>> import org.apache.flink.types.ListValue >>> import scala.collection.mutable >>> >>> On Sun, Oct 25, 2020 at 6:09 PM Rex Fenley <r...@remind101.com> wrote: >>> >>>> Hello, >>>> >>>> I'm trying to create an Accumulator that takes in 2 columns, where each >>>> "value" is therefore a tuple, and results in a tuple of 2 arrays yet no >>>> matter what I try I receive an error trace like the one below. >>>> >>>> (Oddly, using almost the same pattern with 1 input column (a Long for >>>> "value") and 1 Array of Longs result/output works perfectly fine, so I'm >>>> not sure why suddenly using a Tuple should make much of any difference.) >>>> >>>> please help. >>>> >>>> Error trace: >>>> >>>> org.apache.flink.client.program.ProgramInvocationException: The main >>>> method caused an error: Given parameters do not match any signature. >>>> >>>> >>>> *Actual: (java.lang.Long, java.lang.Boolean) Expected: (scala.Tuple2) *at >>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302) >>>> at >>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) >>>> at >>>> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) >>>> at >>>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699) >>>> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232) >>>> at >>>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916) >>>> at >>>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992) >>>> at >>>> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30) >>>> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992) >>>> Caused by: org.apache.flink.table.api.ValidationException: Given >>>> parameters do not match any signature. >>>> Actual: (java.lang.Long, java.lang.Boolean) >>>> Expected: (scala.Tuple2) >>>> at >>>> org.apache.flink.table.planner.expressions.PlannerTypeInferenceUtilImpl.validateArguments(PlannerTypeInferenceUtilImpl.java:112) >>>> at >>>> org.apache.flink.table.planner.expressions.PlannerTypeInferenceUtilImpl.runTypeInference(PlannerTypeInferenceUtilImpl.java:71) >>>> at >>>> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.runLegacyTypeInference(ResolveCallByArgumentsRule.java:218) >>>> at >>>> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.lambda$visit$2(ResolveCallByArgumentsRule.java:134) >>>> at java.base/java.util.Optional.orElseGet(Optional.java:369) >>>> at >>>> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:134) >>>> at >>>> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:89) >>>> at >>>> org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:39) >>>> at >>>> org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:132) >>>> at >>>> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:124) >>>> at >>>> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:89) >>>> at >>>> org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:39) >>>> at >>>> org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:132) >>>> at >>>> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule.lambda$apply$0(ResolveCallByArgumentsRule.java:83) >>>> at >>>> java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:271) >>>> at >>>> java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655) >>>> at >>>> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484) >>>> at >>>> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) >>>> at >>>> java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913) >>>> at >>>> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) >>>> at >>>> java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578) >>>> at >>>> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule.apply(ResolveCallByArgumentsRule.java:84) >>>> at >>>> org.apache.flink.table.expressions.resolver.ExpressionResolver.lambda$null$1(ExpressionResolver.java:211) >>>> at >>>> java.base/java.util.function.Function.lambda$andThen$1(Function.java:88) >>>> at >>>> org.apache.flink.table.expressions.resolver.ExpressionResolver.resolve(ExpressionResolver.java:178) >>>> at >>>> org.apache.flink.table.operations.utils.OperationTreeBuilder.aggregate(OperationTreeBuilder.java:237) >>>> at >>>> org.apache.flink.table.operations.utils.OperationTreeBuilder.aggregate(OperationTreeBuilder.java:528) >>>> at >>>> org.apache.flink.table.api.internal.TableImpl$AggregatedTableImpl.select(TableImpl.java:685) >>>> at remind.Job$.main(Job.scala:181) >>>> at remind.Job.main(Job.scala) >>>> at >>>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native >>>> Method) >>>> at >>>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>>> at >>>> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>> at java.base/java.lang.reflect.Method.invoke(Method.java:566) >>>> at >>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288) >>>> >>>> This is my code: >>>> >>>> case class MembershipsIDsAcc( >>>> var subscriberIDs: mutable.Set[Long], >>>> var ownerIDs: mutable.Set[Long] >>>> ) >>>> class MembershipsIDsAgg extends AggregateFunction[Row, >>>> MembershipsIDsAcc] { >>>> >>>> override def createAccumulator(): MembershipsIDsAcc = >>>> MembershipsIDsAcc(mutable.Set(), mutable.Set()) >>>> >>>> def accumulate(acc: MembershipsIDsAcc, value: (Long, Boolean)): Unit = >>>> { >>>> acc.subscriberIDs.add(value._1) >>>> if (value._2) { >>>> acc.ownerIDs.add(value._1) >>>> } else { >>>> acc.ownerIDs.remove(value._1) >>>> } >>>> } >>>> >>>> def retract(acc: MembershipsIDsAcc, value: (Long, Boolean)): Unit = { >>>> acc.subscriberIDs.remove(value._1) >>>> acc.ownerIDs.remove(value._1) >>>> } >>>> >>>> def resetAccumulator(acc: MembershipsIDsAcc): Unit = { >>>> acc.subscriberIDs = mutable.Set() >>>> acc.ownerIDs = mutable.Set() >>>> } >>>> >>>> override def getValue(acc: MembershipsIDsAcc): Row = { >>>> Row.of(acc.subscriberIDs.toArray, acc.ownerIDs.toArray) >>>> } >>>> >>>> override def getResultType: TypeInformation[Row] = { >>>> new RowTypeInfo( >>>> createTypeInformation[Array[Long]], >>>> createTypeInformation[Array[Long]] >>>> ) >>>> } >>>> } >>>> >>>> // Usage >>>> ... >>>> val membershipsByGroupId = >>>> membershipsNotDeletedTable >>>> .groupBy($"group_id") >>>> .aggregate( >>>> membershipsIDsAgg( >>>> $"user_id", >>>> $"owner" >>>> ) as ("subscriber_ids", "owner_ids") >>>> ) >>>> .select($"group_id", $"subscriber_ids", $"owner_ids") >>>> ... >>>> >>>> -- >>>> >>>> Rex Fenley | Software Engineer - Mobile and Backend >>>> >>>> >>>> Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> >>>> | FOLLOW US <https://twitter.com/remindhq> | LIKE US >>>> <https://www.facebook.com/remindhq> >>>> >>> >>> >>> -- >>> >>> Rex Fenley | Software Engineer - Mobile and Backend >>> >>> >>> Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> >>> | FOLLOW US <https://twitter.com/remindhq> | LIKE US >>> <https://www.facebook.com/remindhq> >>> >> >> >> -- >> >> Rex Fenley | Software Engineer - Mobile and Backend >> >> >> Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> >> | FOLLOW US <https://twitter.com/remindhq> | LIKE US >> <https://www.facebook.com/remindhq> >> > -- Rex Fenley | Software Engineer - Mobile and Backend Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> | FOLLOW US <https://twitter.com/remindhq> | LIKE US <https://www.facebook.com/remindhq>