This is an automated email from the ASF dual-hosted git repository.

sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new bf2a9c7  Add Windowfunction interface to functions api (#3324)
bf2a9c7 is described below

commit bf2a9c7b84f1c749bfabdc21a2926b017caa61ad
Author: Sanjeev Kulkarni <sanjee...@gmail.com>
AuthorDate: Mon Jan 14 09:54:49 2019 -0800

    Add Windowfunction interface to functions api (#3324)
    
    * Added WindowFunction interface and implementation
    
    * Fixed logic
    
    * Update comments
    
    * Took feedback into account
---
 .../pulsar/functions/api}/WindowContext.java       |  2 +-
 .../pulsar/functions/api}/WindowFunction.java      | 23 ++++----
 .../functions/windowing/WindowContextImpl.java     |  1 +
 .../windowing/WindowFunctionExecutor.java          | 64 ++++++++++++----------
 .../windowing/WindowFunctionExecutorTest.java      | 27 +++++----
 ...{WindowFunction.java => AddWindowFunction.java} |  2 +-
 ...dowFunction.java => ContextWindowFunction.java} | 14 +++--
 .../resources/example-window-function-config.yaml  |  2 +-
 .../org/apache/pulsar/functions/utils/Utils.java   | 34 ++++++++----
 9 files changed, 99 insertions(+), 70 deletions(-)

diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowContext.java
 
b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/WindowContext.java
similarity index 99%
rename from 
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowContext.java
rename to 
pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/WindowContext.java
index 2f1f2e7..0abc87a 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowContext.java
+++ 
b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/WindowContext.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.functions.windowing;
+package org.apache.pulsar.functions.api;
 
 import org.slf4j.Logger;
 
diff --git 
a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/WindowFunction.java
 
b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/WindowFunction.java
similarity index 65%
copy from 
pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/WindowFunction.java
copy to 
pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/WindowFunction.java
index ae01cec..6f2c421 100644
--- 
a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/WindowFunction.java
+++ 
b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/WindowFunction.java
@@ -16,20 +16,19 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.functions.api.examples;
-
-import lombok.extern.slf4j.Slf4j;
+package org.apache.pulsar.functions.api;
 
 import java.util.Collection;
-import java.util.function.Function;
 
 /**
- * Example Function that acts on a window of tuples at a time rather than per 
tuple basis.
+ * This is the interface of the windowed function api. The process method is 
called
+ * for every triggered window.
  */
-@Slf4j
-public class WindowFunction implements Function <Collection<Integer>, Integer> 
{
-    @Override
-    public Integer apply(Collection<Integer> integers) {
-        return integers.stream().reduce(0, (x, y) -> x + y);
-    }
-}
+@FunctionalInterface
+public interface WindowFunction<I, O> {
+    /**
+     * Process the input.
+     * @return the output
+     */
+    O process(Collection<Record<I>> input, WindowContext context) throws 
Exception;
+}
\ No newline at end of file
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowContextImpl.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowContextImpl.java
index 41e8ebe..de00f52 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowContextImpl.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowContextImpl.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.functions.windowing;
 
 import org.apache.pulsar.functions.api.Context;
+import org.apache.pulsar.functions.api.WindowContext;
 import org.slf4j.Logger;
 
 import java.nio.ByteBuffer;
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutor.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutor.java
index e288261..1945949 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutor.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutor.java
@@ -30,9 +30,7 @@ import java.util.stream.Collectors;
 
 import lombok.extern.slf4j.Slf4j;
 
-import org.apache.pulsar.functions.api.Context;
-import org.apache.pulsar.functions.api.Function;
-import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.functions.api.*;
 import org.apache.pulsar.functions.utils.Reflections;
 import org.apache.pulsar.common.functions.WindowConfig;
 import org.apache.pulsar.functions.windowing.evictors.CountEvictionPolicy;
@@ -51,22 +49,23 @@ public class WindowFunctionExecutor<I, O> implements 
Function<I, O> {
 
     private boolean initialized;
     protected WindowConfig windowConfig;
-    private WindowManager<I> windowManager;
+    private WindowManager<Record<I>> windowManager;
     private TimestampExtractor<I> timestampExtractor;
-    protected transient WaterMarkEventGenerator<I> waterMarkEventGenerator;
+    protected transient WaterMarkEventGenerator<Record<I>> 
waterMarkEventGenerator;
 
-    protected java.util.function.Function<Collection<I>, O> windowFunction;
+    protected java.util.function.Function<Collection<I>, O> bareWindowFunction;
+    protected WindowFunction<I, O> windowFunction;
 
     public void initialize(Context context) {
         this.windowConfig = this.getWindowConfigs(context);
-        this.windowFunction = intializeUserFunction(this.windowConfig);
+        initializeUserFunction(this.windowConfig);
         log.info("Window Config: {}", this.windowConfig);
         this.windowManager = this.getWindowManager(this.windowConfig, context);
         this.initialized = true;
         this.start();
     }
 
-    private java.util.function.Function<Collection<I>, O> 
intializeUserFunction(WindowConfig windowConfig) {
+    private void initializeUserFunction(WindowConfig windowConfig) {
         String actualWindowFunctionClassName = 
windowConfig.getActualWindowFunctionClassName();
         ClassLoader clsLoader = Thread.currentThread().getContextClassLoader();
         Object userClassObject = Reflections.createInstance(
@@ -76,10 +75,12 @@ public class WindowFunctionExecutor<I, O> implements 
Function<I, O> {
             Class<?>[] typeArgs = TypeResolver.resolveRawArguments(
                     java.util.function.Function.class, 
userClassObject.getClass());
             if (typeArgs[0].equals(Collection.class)) {
-                return (java.util.function.Function) userClassObject;
+                bareWindowFunction = (java.util.function.Function) 
userClassObject;
             } else {
                 throw new IllegalArgumentException("Window function must take 
a collection as input");
             }
+        } else if (userClassObject instanceof WindowFunction) {
+            windowFunction = (WindowFunction) userClassObject;
         } else {
             throw new IllegalArgumentException("Window function does not 
implement the correct interface");
         }
@@ -97,10 +98,10 @@ public class WindowFunctionExecutor<I, O> implements 
Function<I, O> {
         return windowConfig;
     }
 
-    private WindowManager<I> getWindowManager(WindowConfig windowConfig, 
Context context) {
+    private WindowManager<Record<I>> getWindowManager(WindowConfig 
windowConfig, Context context) {
 
-        WindowLifecycleListener<Event<I>> lifecycleListener = 
newWindowLifecycleListener(context);
-        WindowManager<I> manager = new WindowManager<>(lifecycleListener, new 
ConcurrentLinkedQueue<>());
+        WindowLifecycleListener<Event<Record<I>>> lifecycleListener = 
newWindowLifecycleListener(context);
+        WindowManager<Record<I>> manager = new 
WindowManager<>(lifecycleListener, new ConcurrentLinkedQueue<>());
 
         if (this.windowConfig.getTimestampExtractorClassName() != null) {
             this.timestampExtractor = getTimeStampExtractor(windowConfig);
@@ -115,8 +116,8 @@ public class WindowFunctionExecutor<I, O> implements 
Function<I, O> {
             }
         }
 
-        EvictionPolicy<I, ?> evictionPolicy = getEvictionPolicy(windowConfig);
-        TriggerPolicy<I, ?> triggerPolicy = getTriggerPolicy(windowConfig, 
manager,
+        EvictionPolicy<Record<I>, ?> evictionPolicy = 
getEvictionPolicy(windowConfig);
+        TriggerPolicy<Record<I>, ?> triggerPolicy = 
getTriggerPolicy(windowConfig, manager,
                 evictionPolicy, context);
         manager.setEvictionPolicy(evictionPolicy);
         manager.setTriggerPolicy(triggerPolicy);
@@ -162,8 +163,8 @@ public class WindowFunctionExecutor<I, O> implements 
Function<I, O> {
         return (TimestampExtractor<I>) result;
     }
 
-    private TriggerPolicy<I, ?> getTriggerPolicy(WindowConfig windowConfig, 
WindowManager<I> manager,
-                                                 EvictionPolicy<I, ?> 
evictionPolicy, Context context) {
+    private TriggerPolicy<Record<I>, ?> getTriggerPolicy(WindowConfig 
windowConfig, WindowManager<Record<I>> manager,
+                                                 EvictionPolicy<Record<I>, ?> 
evictionPolicy, Context context) {
         if (windowConfig.getSlidingIntervalCount() != null) {
             if (this.isEventTime()) {
                 return new WatermarkCountTriggerPolicy<>(
@@ -181,7 +182,7 @@ public class WindowFunctionExecutor<I, O> implements 
Function<I, O> {
         }
     }
 
-    private EvictionPolicy<I, ?> getEvictionPolicy(WindowConfig windowConfig) {
+    private EvictionPolicy<Record<I>, ?> getEvictionPolicy(WindowConfig 
windowConfig) {
         if (windowConfig.getWindowLengthCount() != null) {
             if (this.isEventTime()) {
                 return new 
WatermarkCountEvictionPolicy<>(windowConfig.getWindowLengthCount());
@@ -198,17 +199,17 @@ public class WindowFunctionExecutor<I, O> implements 
Function<I, O> {
         }
     }
 
-    protected WindowLifecycleListener<Event<I>> 
newWindowLifecycleListener(Context context) {
-        return new WindowLifecycleListener<Event<I>>() {
+    protected WindowLifecycleListener<Event<Record<I>>> 
newWindowLifecycleListener(Context context) {
+        return new WindowLifecycleListener<Event<Record<I>>>() {
             @Override
-            public void onExpiry(List<Event<I>> events) {
-                for (Event<I> event : events) {
+            public void onExpiry(List<Event<Record<I>>> events) {
+                for (Event<Record<I>> event : events) {
                     event.getRecord().ack();
                 }
             }
 
             @Override
-            public void onActivation(List<Event<I>> tuples, List<Event<I>> 
newTuples, List<Event<I>>
+            public void onActivation(List<Event<Record<I>>> tuples, 
List<Event<Record<I>>> newTuples, List<Event<Record<I>>>
                     expiredTuples, Long referenceTime) {
                 processWindow(
                         context,
@@ -220,7 +221,7 @@ public class WindowFunctionExecutor<I, O> implements 
Function<I, O> {
         };
     }
 
-    private void processWindow(Context context, List<I> tuples, List<I> 
newTuples, List<I>
+    private void processWindow(Context context, List<Record<I>> tuples, 
List<Record<I>> newTuples, List<Record<I>>
             expiredTuples, Long referenceTime) {
 
         O output = null;
@@ -273,12 +274,12 @@ public class WindowFunctionExecutor<I, O> implements 
Function<I, O> {
             initialize(context);
         }
 
-        Record<?> record = context.getCurrentRecord();
+        Record<I> record = (Record<I>)context.getCurrentRecord();
 
         if (isEventTime()) {
-            long ts = this.timestampExtractor.extractTimestamp(input);
+            long ts = 
this.timestampExtractor.extractTimestamp(record.getValue());
             if 
(this.waterMarkEventGenerator.track(record.getTopicName().get(), ts)) {
-                this.windowManager.add(input, ts, record);
+                this.windowManager.add(record, ts, record);
             } else {
                 if (this.windowConfig.getLateDataTopic() != null) {
                     context.publish(this.windowConfig.getLateDataTopic(), 
input);
@@ -290,12 +291,17 @@ public class WindowFunctionExecutor<I, O> implements 
Function<I, O> {
                 record.ack();
             }
         } else {
-            this.windowManager.add(input, System.currentTimeMillis(), record);
+            this.windowManager.add(record, System.currentTimeMillis(), record);
         }
         return null;
     }
 
-    public O process(Window<I> inputWindow, WindowContext context) throws 
Exception {
-        return this.windowFunction.apply(inputWindow.get());
+    public O process(Window<Record<I>> inputWindow, WindowContext context) 
throws Exception {
+        if (this.bareWindowFunction != null) {
+            Collection<I> newCollection = 
inputWindow.get().stream().map(Record::getValue).collect(Collectors.toList());
+            return this.bareWindowFunction.apply(newCollection);
+        } else {
+            return this.windowFunction.process(inputWindow.get(), context);
+        }
     }
 }
diff --git 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutorTest.java
 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutorTest.java
index 21a78a2..88ecebe 100644
--- 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutorTest.java
+++ 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutorTest.java
@@ -24,9 +24,8 @@ import org.apache.pulsar.functions.api.Context;
 import org.apache.pulsar.functions.api.Record;
 
 import org.apache.pulsar.common.functions.WindowConfig;
-import org.apache.pulsar.functions.utils.WindowConfigUtils;
+import org.apache.pulsar.functions.api.WindowContext;
 import org.mockito.Mockito;
-import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
@@ -51,10 +50,10 @@ public class WindowFunctionExecutorTest {
 
     private static class TestWindowFunctionExecutor extends 
WindowFunctionExecutor<Long, Long> {
 
-        List<Window<Long>> windows = new ArrayList<>();
+        List<Window<Record<Long>>> windows = new ArrayList<>();
 
         @Override
-        public Long process(Window<Long> inputWindow, WindowContext context) 
throws Exception {
+        public Long process(Window<Record<Long>> inputWindow, WindowContext 
context) throws Exception {
             windows.add(inputWindow);
             return null;
         }
@@ -150,22 +149,26 @@ public class WindowFunctionExecutorTest {
     public void testExecuteWithTs() throws Exception {
         long[] timestamps = {603, 605, 607, 618, 626, 636};
         for (long ts : timestamps) {
+            Record<?> record = Mockito.mock(Record.class);
+            
Mockito.doReturn(Optional.of("test-topic")).when(record).getTopicName();
+            Mockito.doReturn(record).when(context).getCurrentRecord();
+            Mockito.doReturn(ts).when(record).getValue();
             testWindowedPulsarFunction.process(ts, context);
         }
         testWindowedPulsarFunction.waterMarkEventGenerator.run();
         assertEquals(3, testWindowedPulsarFunction.windows.size());
-        Window<Long> first = testWindowedPulsarFunction.windows.get(0);
+        Window<Record<Long>> first = testWindowedPulsarFunction.windows.get(0);
         assertArrayEquals(
                 new long[]{603, 605, 607},
-                new long[]{first.get().get(0), first.get().get(1), 
first.get().get(2)});
+                new long[]{first.get().get(0).getValue(), 
first.get().get(1).getValue(), first.get().get(2).getValue()});
 
-        Window<Long> second = testWindowedPulsarFunction.windows.get(1);
+        Window<Record<Long>> second = 
testWindowedPulsarFunction.windows.get(1);
         assertArrayEquals(
                 new long[]{603, 605, 607, 618},
-                new long[]{second.get().get(0), second.get().get(1), 
second.get().get(2), second.get().get(3)});
+                new long[]{second.get().get(0).getValue(), 
second.get().get(1).getValue(), second.get().get(2).getValue(), 
second.get().get(3).getValue()});
 
-        Window<Long> third = testWindowedPulsarFunction.windows.get(2);
-        assertArrayEquals(new long[]{618, 626}, new long[]{third.get().get(0), 
third.get().get(1)});
+        Window<Record<Long>> third = testWindowedPulsarFunction.windows.get(2);
+        assertArrayEquals(new long[]{618, 626}, new 
long[]{third.get().get(0).getValue(), third.get().get(1).getValue()});
     }
 
     @Test
@@ -207,6 +210,10 @@ public class WindowFunctionExecutorTest {
 
         for (long ts : timestamps) {
             events.add(ts);
+            Record<?> record = Mockito.mock(Record.class);
+            
Mockito.doReturn(Optional.of("test-topic")).when(record).getTopicName();
+            Mockito.doReturn(record).when(context).getCurrentRecord();
+            Mockito.doReturn(ts).when(record).getValue();
             testWindowedPulsarFunction.process(ts, context);
 
             //Update the watermark to this timestamp
diff --git 
a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/WindowFunction.java
 
b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/AddWindowFunction.java
similarity index 93%
copy from 
pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/WindowFunction.java
copy to 
pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/AddWindowFunction.java
index ae01cec..d8f1864 100644
--- 
a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/WindowFunction.java
+++ 
b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/AddWindowFunction.java
@@ -27,7 +27,7 @@ import java.util.function.Function;
  * Example Function that acts on a window of tuples at a time rather than per 
tuple basis.
  */
 @Slf4j
-public class WindowFunction implements Function <Collection<Integer>, Integer> 
{
+public class AddWindowFunction implements Function <Collection<Integer>, 
Integer> {
     @Override
     public Integer apply(Collection<Integer> integers) {
         return integers.stream().reduce(0, (x, y) -> x + y);
diff --git 
a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/WindowFunction.java
 
b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/ContextWindowFunction.java
similarity index 68%
rename from 
pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/WindowFunction.java
rename to 
pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/ContextWindowFunction.java
index ae01cec..fe90d1e 100644
--- 
a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/WindowFunction.java
+++ 
b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/ContextWindowFunction.java
@@ -19,17 +19,23 @@
 package org.apache.pulsar.functions.api.examples;
 
 import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.functions.api.WindowContext;
+import org.apache.pulsar.functions.api.WindowFunction;
 
 import java.util.Collection;
-import java.util.function.Function;
 
 /**
  * Example Function that acts on a window of tuples at a time rather than per 
tuple basis.
  */
 @Slf4j
-public class WindowFunction implements Function <Collection<Integer>, Integer> 
{
+public class ContextWindowFunction implements WindowFunction<Integer, Integer> 
{
     @Override
-    public Integer apply(Collection<Integer> integers) {
-        return integers.stream().reduce(0, (x, y) -> x + y);
+    public Integer process(Collection<Record<Integer>> integers, WindowContext 
context) {
+        Integer retval = 0;
+        for (Record<Integer> record : integers) {
+            retval += record.getValue();
+        }
+        return retval;
     }
 }
diff --git 
a/pulsar-functions/java-examples/src/main/resources/example-window-function-config.yaml
 
b/pulsar-functions/java-examples/src/main/resources/example-window-function-config.yaml
index 3dc3279..e0faf6f 100644
--- 
a/pulsar-functions/java-examples/src/main/resources/example-window-function-config.yaml
+++ 
b/pulsar-functions/java-examples/src/main/resources/example-window-function-config.yaml
@@ -20,7 +20,7 @@
 tenant: "test"
 namespace: "test-namespace"
 name: "example"
-className: "org.apache.pulsar.functions.api.examples.WindowFunction"
+className: "org.apache.pulsar.functions.api.examples.AddWindowFunction"
 inputs: ["test_src"]
 userConfig:
   "PublishTopic": "test_result"
diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java
index b50bcd5..6f2e76b 100644
--- 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java
@@ -36,6 +36,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.common.functions.FunctionConfig;
 import org.apache.pulsar.common.nar.NarClassLoader;
 import org.apache.pulsar.functions.api.Function;
+import org.apache.pulsar.functions.api.WindowFunction;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails.Runtime;
 import org.apache.pulsar.io.core.Sink;
 import org.apache.pulsar.io.core.Source;
@@ -91,19 +92,28 @@ public class Utils {
         Class<?>[] typeArgs;
         // if window function
         if (isWindowConfigPresent) {
-            java.util.function.Function function = 
(java.util.function.Function) userClass;
-            if (function == null) {
-                throw new IllegalArgumentException(
-                        String.format("The Java util function class %s could 
not be instantiated", userClass));
-            }
-            typeArgs = 
TypeResolver.resolveRawArguments(java.util.function.Function.class, 
function.getClass());
-            if (!typeArgs[0].equals(Collection.class)) {
-                throw new IllegalArgumentException("Window function must take 
a collection as input");
+            if (userClass instanceof WindowFunction) {
+                WindowFunction function = (WindowFunction) userClass;
+                if (function == null) {
+                    throw new IllegalArgumentException(
+                            String.format("The WindowFunction class %s could 
not be instantiated", userClass));
+                }
+                typeArgs = 
TypeResolver.resolveRawArguments(WindowFunction.class, function.getClass());
+            } else {
+                java.util.function.Function function = 
(java.util.function.Function) userClass;
+                if (function == null) {
+                    throw new IllegalArgumentException(
+                            String.format("The Java util function class %s 
could not be instantiated", userClass));
+                }
+                typeArgs = 
TypeResolver.resolveRawArguments(java.util.function.Function.class, 
function.getClass());
+                if (!typeArgs[0].equals(Collection.class)) {
+                    throw new IllegalArgumentException("Window function must 
take a collection as input");
+                }
+                Type type = 
TypeResolver.resolveGenericType(java.util.function.Function.class, 
function.getClass());
+                Type collectionType = ((ParameterizedType) 
type).getActualTypeArguments()[0];
+                Type actualInputType = ((ParameterizedType) 
collectionType).getActualTypeArguments()[0];
+                typeArgs[0] = (Class<?>) actualInputType;
             }
-            Type type = 
TypeResolver.resolveGenericType(java.util.function.Function.class, 
function.getClass());
-            Type collectionType = ((ParameterizedType) 
type).getActualTypeArguments()[0];
-            Type actualInputType = ((ParameterizedType) 
collectionType).getActualTypeArguments()[0];
-            typeArgs[0] = (Class<?>) actualInputType;
         } else {
             if (userClass instanceof Function) {
                 Function pulsarFunction = (Function) userClass;

Reply via email to