Hi Kurt,

How do you make the finishBundle
method returns the combined tuples? I saw that there is a method
"List<String> getOutputs()" which is never called.

I did an implementation
based on the example that you suggested. The MapBundleFunctionImpl
has the method finishBundle which iterate all the combined tuples and
return it. However, my application does not continue to receive tuples
after the transform method


*-- Felipe Gutierrez*

*-- skype: felipe.o.gutierrez*
*--* *https://felipeogutierrez.blogspot.com

On Tue, Apr 16, 2019 at 3:10 AM Kurt Young <ykt...@gmail.com> wrote:

> I think you can simply copy the source codes to your project if maven
> dependency can not be used.
> Best,
> Kurt
> On Mon, Apr 15, 2019 at 9:47 PM Felipe Gutierrez <
> felipe.o.gutier...@gmail.com> wrote:
>> Hi again Kurt,
>> could you please help me with the pom.xml file? I have included all Table
>> ecosystem dependencies and the flink-table-runtime-blink as well. However
>> the class org.apache.flink.table.runtime.context.ExecutionContext is still
>> not found. I guess I am missing some dependency, but I do not know which.
>> This is my pom.xml file.
>> <project xmlns="http://maven.apache.org/POM/4.0.0";
>> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
>> xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
>> http://maven.apache.org/xsd/maven-4.0.0.xsd";>
>> <modelVersion>4.0.0</modelVersion>
>> <groupId>org.sense.flink</groupId>
>> <artifactId>explore-flink</artifactId>
>> <version>0.0.1-SNAPSHOT</version>
>> <packaging>jar</packaging>
>> <name>explore-flink</name>
>> <url>http://maven.apache.org</url>
>> <properties>
>> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
>> <jdk.version>1.8</jdk.version>
>> <scala.binary.version>2.11</scala.binary.version>
>> <!-- <flink.version>1.8.0</flink.version> -->
>> <flink.version>1.9-SNAPSHOT</flink.version>
>> <junit.version>4.12</junit.version>
>> </properties>
>> <dependencies>
>> <dependency>
>> <groupId>org.apache.flink</groupId>
>> <artifactId>flink-java</artifactId>
>> <version>${flink.version}</version>
>> </dependency>
>> <dependency>
>> <groupId>org.apache.flink</groupId>
>> <artifactId>flink-clients_${scala.binary.version}</artifactId>
>> <version>${flink.version}</version>
>> </dependency>
>> <dependency>
>> <groupId>org.apache.flink</groupId>
>> <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
>> <version>${flink.version}</version>
>> </dependency>
>> <dependency>
>> <groupId>org.apache.flink</groupId>
>> <artifactId>flink-metrics-dropwizard</artifactId>
>> <version>${flink.version}</version>
>> <scope>provided</scope>
>> </dependency>
>> <!-- Table ecosystem -->
>> <dependency>
>> <groupId>org.apache.flink</groupId>
>> <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
>> <version>${flink.version}</version>
>> </dependency>
>> <dependency>
>> <groupId>org.apache.flink</groupId>
>> <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
>> <version>${flink.version}</version>
>> </dependency>
>> <dependency>
>> <groupId>org.apache.flink</groupId>
>> <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
>> <version>${flink.version}</version>
>> </dependency>
>> <dependency>
>> <groupId>org.apache.flink</groupId>
>> <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
>> <version>${flink.version}</version>
>> </dependency>
>> <dependency>
>> <groupId>org.apache.flink</groupId>
>> <artifactId>flink-table-runtime-blink</artifactId>
>> <version>${flink.version}</version>
>> </dependency>
>> <dependency>
>> <groupId>org.fusesource.mqtt-client</groupId>
>> <artifactId>mqtt-client</artifactId>
>> <version>1.15</version>
>> <!-- <scope>provided</scope> -->
>> </dependency>
>> <dependency>
>> <groupId>org.slf4j</groupId>
>> <artifactId>slf4j-api</artifactId>
>> <version>1.7.26</version>
>> </dependency>
>> <dependency>
>> <groupId>org.slf4j</groupId>
>> <artifactId>slf4j-log4j12</artifactId>
>> <version>1.7.26</version>
>> </dependency>
>> <dependency>
>> <groupId>junit</groupId>
>> <artifactId>junit</artifactId>
>> <version>${junit.version}</version>
>> </dependency>
>> </dependencies>
>> <build>
>> <finalName>explore-flink</finalName>
>> <plugins>
>> <!-- download source code in Eclipse, best practice -->
>> <plugin>
>> <groupId>org.apache.maven.plugins</groupId>
>> <artifactId>maven-eclipse-plugin</artifactId>
>> <version>2.10</version>
>> <configuration>
>> <downloadSources>true</downloadSources>
>> <downloadJavadocs>false</downloadJavadocs>
>> </configuration>
>> </plugin>
>> <!-- Set a compiler level -->
>> <plugin>
>> <groupId>org.apache.maven.plugins</groupId>
>> <artifactId>maven-compiler-plugin</artifactId>
>> <version>3.8.0</version>
>> <configuration>
>> <source>${jdk.version}</source>
>> <target>${jdk.version}</target>
>> </configuration>
>> </plugin>
>> <!-- Maven Shade Plugin -->
>> <plugin>
>> <groupId>org.apache.maven.plugins</groupId>
>> <artifactId>maven-shade-plugin</artifactId>
>> <version>3.2.0</version>
>> <!-- Run shade goal on package phase -->
>> <executions>
>> <execution>
>> <phase>package</phase>
>> <goals>
>> <goal>shade</goal>
>> </goals>
>> <configuration>
>> <artifactSet>
>> <excludes>
>> <exclude>org.apache.flink:*</exclude>
>> <!-- Also exclude very big transitive dependencies of Flink WARNING:
>> You have to remove these excludes if your code relies on other versions
>> of
>> these dependencies. -->
>> <exclude>org.slf4j:*</exclude>
>> <exclude>log4j:*</exclude>
>> <exclude>com.typesafe:config:*</exclude>
>> <exclude>junit:junit:*</exclude>
>> <exclude>com.codahale.metrics:*</exclude>
>> </excludes>
>> </artifactSet>
>> <filters>
>> <filter>
>> <artifact>org.apache.flink:*</artifact>
>> <excludes>
>> <!-- exclude shaded google but include shaded curator -->
>> <exclude>org/apache/flink/shaded/com/**</exclude>
>> <exclude>web-docs/**</exclude>
>> </excludes>
>> </filter>
>> <filter>
>> <!-- Do not copy the signatures in the META-INF folder. Otherwise,
>> this might cause SecurityExceptions when using the JAR. -->
>> <artifact>*:*</artifact>
>> <excludes>
>> <exclude>META-INF/*.SF</exclude>
>> <exclude>META-INF/*.DSA</exclude>
>> <exclude>META-INF/*.RSA</exclude>
>> </excludes>
>> </filter>
>> <filter>
>> <artifact>*:*</artifact>
>> <includes>
>> <include>org/apache/calcite/**</include>
>> <include>org/apache/flink/calcite/shaded/**</include>
>> <include>org/apache/flink/table/**</include>
>> <include>org.codehaus.commons.compiler.properties</include>
>> <include>org/codehaus/janino/**</include>
>> <include>org/codehaus/commons/**</include>
>> </includes>
>> </filter>
>> </filters>
>> <!-- If you want to use ./bin/flink run <quickstart jar> uncomment
>> the following lines. This will add a Main-Class entry to the manifest
>> file -->
>> <transformers>
>> <transformer
>> implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
>> <mainClass>org.sense.flink.App</mainClass>
>> </transformer>
>> </transformers>
>> <createDependencyReducedPom>false</createDependencyReducedPom>
>> </configuration>
>> </execution>
>> </executions>
>> </plugin>
>> </plugins>
>> </build>
>> </project>
>> Thanks
>> *--*
>> *-- Felipe Gutierrez*
>> *-- skype: felipe.o.gutierrez*
>> *--* *https://felipeogutierrez.blogspot.com
>> <https://felipeogutierrez.blogspot.com>*
>> On Mon, Apr 15, 2019 at 3:25 PM Felipe Gutierrez <
>> felipe.o.gutier...@gmail.com> wrote:
>>> oh, yes. I just saw. I will use 1.9 then. thanks
>>> *--*
>>> *-- Felipe Gutierrez*
>>> *-- skype: felipe.o.gutierrez*
>>> *--* *https://felipeogutierrez.blogspot.com
>>> <https://felipeogutierrez.blogspot.com>*
>>> On Mon, Apr 15, 2019 at 3:23 PM Kurt Young <ykt...@gmail.com> wrote:
>>>> It's because all blink codes are not shipped with 1.8.0, they current
>>>> only available in 1.9-SNAPSHOT.
>>>> Best,
>>>> Kurt
>>>> On Mon, Apr 15, 2019 at 7:18 PM Felipe Gutierrez <
>>>> felipe.o.gutier...@gmail.com> wrote:
>>>>> Hi,
>>>>> what are the artifacts that I have to import on maven in order to use
>>>>> Blink Api?
>>>>> I am using Flink 1.8.0 and I am trying to import blink code to use its
>>>>> ExecutionContext
>>>>> <https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/context/ExecutionContext.java>.
>>>>> I want to do this in order to implement my own operator like it is
>>>>> implemented here
>>>>> <https://github.com/apache/flink/blob/blink/flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/bundle/BundleOperator.java>.
>>>>> I guess if I import flink-table everything should come inside the same jar
>>>>> as it is done here
>>>>> <https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/pom.xml>.
>>>>> However, I cannot import "flink-table-runtime-blink". Eclipse says that it
>>>>> is a missing artifact.
>>>>> <dependency>
>>>>> <groupId>org.apache.flink</groupId>
>>>>> <artifactId>flink-table-planner_2.11</artifactId>
>>>>> <version>1.8.0</version>
>>>>> </dependency>
>>>>> <dependency>
>>>>> <groupId>org.apache.flink</groupId>
>>>>> <artifactId>flink-table-api-java-bridge_2.11</artifactId>
>>>>> <version>1.8.0</version>
>>>>> </dependency>
>>>>> <dependency>
>>>>> <groupId>org.apache.flink</groupId>
>>>>> <artifactId>flink-streaming-scala_2.11</artifactId>
>>>>> <version>1.8.0</version>
>>>>> </dependency>
>>>>> <dependency>
>>>>> <groupId>org.apache.flink</groupId>
>>>>> <artifactId>flink-table-common</artifactId>
>>>>> <version>1.8.0</version>
>>>>> </dependency>
>>>>> <dependency>
>>>>> <groupId>org.apache.flink</groupId>
>>>>> <artifactId>flink-table</artifactId>
>>>>> <version>1.8.0</version>
>>>>> <type>pom</type>
>>>>> <scope>provided</scope>
>>>>> </dependency>
>>>>> <dependency> <!-- THIS IS NOT POSSIBLE TO IMPORT -->
>>>>> <groupId>org.apache.flink</groupId>
>>>>> <artifactId>flink-table-runtime-blink</artifactId>
>>>>> <version>1.8.0</version>
>>>>> </dependency>
>>>>> *--*
>>>>> *-- Felipe Gutierrez*
>>>>> *-- skype: felipe.o.gutierrez*
>>>>> *--* *https://felipeogutierrez.blogspot.com
>>>>> <https://felipeogutierrez.blogspot.com>*
>>>>> On Mon, Apr 15, 2019 at 9:49 AM Felipe Gutierrez <
>>>>> felipe.o.gutier...@gmail.com> wrote:
>>>>>> Cool, thanks Kurt!
>>>>>> *-*
>>>>>> *- Felipe Gutierrez*
>>>>>> *- skype: felipe.o.gutierrez*
>>>>>> *- **https://felipeogutierrez.blogspot.com
>>>>>> <https://felipeogutierrez.blogspot.com>* *
>>>>>> <https://felipeogutierrez.blogspot.com>*
>>>>>> On Mon, Apr 15, 2019 at 6:06 AM Kurt Young <ykt...@gmail.com> wrote:
>>>>>>> Hi,
>>>>>>> You can checkout the bundle operator which used in Blink to perform
>>>>>>> similar thing you mentioned:
>>>>>>> https://github.com/apache/flink/blob/blink/flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/bundle/BundleOperator.java
>>>>>>> Best,
>>>>>>> Kurt
>>>>>>> On Fri, Apr 12, 2019 at 8:05 PM Felipe Gutierrez <
>>>>>>> felipe.o.gutier...@gmail.com> wrote:
>>>>>>>> Hi,
>>>>>>>> I was trying to implement a better way to handle data skew using
>>>>>>>> Flink and I found this talk from #FlinkForward SF 2017: "Cliff
>>>>>>>> Resnick & Seth Wiesman - From Zero to Streaming
>>>>>>>> <https://youtu.be/mSLesPzWplA?t=835>" [1] which says that they
>>>>>>>> used OneInputStreamOperator [2]. Through it, they could implement the
>>>>>>>> "combiner" in Hadoop (execute part of the reduce tasks on the Map 
>>>>>>>> phase,
>>>>>>>> before shuffling).
>>>>>>>> I need some help here. What are some of the Flink source-code
>>>>>>>> operators that I can peek up to implement my on operator that deals 
>>>>>>>> with
>>>>>>>> data skew? Or maybe, is there someone that have an example of a use 
>>>>>>>> case
>>>>>>>> similar to this?
>>>>>>>> [1] https://youtu.be/mSLesPzWplA?t=835
>>>>>>>> [2]
>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-master/api/java/index.html?org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.html
>>>>>>>> Thanks!
>>>>>>>> Felipe
>>>>>>>> *--*
>>>>>>>> *-- Felipe Gutierrez*
>>>>>>>> *-- skype: felipe.o.gutierrez*
>>>>>>>> *--* *https://felipeogutierrez.blogspot.com
>>>>>>>> <https://felipeogutierrez.blogspot.com>*

Reply via email to