[GitHub] flink pull request: New operator state interfaces
Github user gyfora commented on the pull request: https://github.com/apache/flink/pull/747#issuecomment-113389891 Thats's a good point Stephan, I fixed it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: New operator state interfaces
Github user gyfora commented on a diff in the pull request: https://github.com/apache/flink/pull/747#discussion_r32804068 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java --- @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.state; + +import static org.junit.Assert.assertEquals; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.state.OperatorState; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.operators.testutils.MockEnvironment; +import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; +import org.apache.flink.runtime.state.LocalStateHandle.LocalStateHandleProvider; +import org.apache.flink.runtime.state.PartitionedStateHandle; +import org.apache.flink.shaded.com.google.common.collect.ImmutableMap; --- End diff -- fixed --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: New operator state interfaces
Github user gyfora commented on a diff in the pull request: https://github.com/apache/flink/pull/747#discussion_r32804056 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SuccessfulCheckpoint.java --- @@ -64,6 +73,22 @@ public long getTimestamp() { return states; } + /** +* Returns the task state included in the checkpoint for a given JobVertexID if it exists or +* null if no state is included for that id. +* +* @param jobVertexID +* @return +*/ + public StateForTask getState(JobVertexID jobVertexID) + { + if(vertexToState.containsKey(jobVertexID)) { + return vertexToState.get(jobVertexID); --- End diff -- done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [contrib] Storm compatibility
Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/764#issuecomment-113381132 Congrats @mjsax for getting your first big contribution in! I talked to a few users here in the valley which were showing interest in this feature. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2235) Local Flink cluster allocates too much memory
[ https://issues.apache.org/jira/browse/FLINK-2235?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14593008#comment-14593008 ] Robert Metzger commented on FLINK-2235: --- If I recall correctly, the community decided to drop Java 1.6 support after the 0.9 release. Maybe it is time to bring up that discussion again and close this issue as invalid ;) > Local Flink cluster allocates too much memory > - > > Key: FLINK-2235 > URL: https://issues.apache.org/jira/browse/FLINK-2235 > Project: Flink > Issue Type: Bug > Components: Local Runtime, TaskManager >Affects Versions: 0.9 > Environment: Oracle JDK: 1.6.0_65-b14-462 > Eclipse >Reporter: Maximilian Michels >Priority: Minor > > When executing a Flink job locally, the task manager gets initialized with an > insane amount of memory. After a quick look in the code it seems that the > call to {{EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag()}} > returns a wrong estimate of the heap memory size. > Moreover, the same user switched to Oracle JDK 1.8 and that made the error > disappear. So I'm guessing this is some Java 1.6 quirk. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2124] [streaming] Fix behavior of FromE...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/848#discussion_r32801136 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java --- @@ -17,37 +17,81 @@ package org.apache.flink.streaming.api.functions.source; -import java.util.Arrays; -import java.util.Collection; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.InputViewDataInputStreamWrapper; +import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; import java.util.Iterator; public class FromElementsFunction implements SourceFunction { private static final long serialVersionUID = 1L; - private Iterable iterable; + private final TypeSerializer serializer; + private final byte[] elements; private volatile boolean isRunning = true; - public FromElementsFunction(T... elements) { - this.iterable = Arrays.asList(elements); - } + public FromElementsFunction(TypeSerializer serializer, final T... elements) { + this(serializer, new Iterable() { + @Override + public Iterator iterator() { + return new Iterator() { + int index = 0; + + @Override + public boolean hasNext() { + return index < elements.length; + } + + @Override + public T next() { + return elements[index++]; + } - public FromElementsFunction(Collection elements) { - this.iterable = elements; + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + }); } - public FromElementsFunction(Iterable elements) { - this.iterable = elements; + public FromElementsFunction(TypeSerializer serializer, Iterable elements) { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + OutputViewDataOutputStreamWrapper wrapper = new OutputViewDataOutputStreamWrapper(new DataOutputStream(baos)); + + try { + for (T element : elements) { + serializer.serialize(element, wrapper); + } + } catch (IOException e) { + // ByteArrayOutputStream doesn't throw IOExceptions when written to --- End diff -- I think not forwarding the exception here is a bad idea. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2124) FromElementsFunction is not really Serializable
[ https://issues.apache.org/jira/browse/FLINK-2124?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14592987#comment-14592987 ] ASF GitHub Bot commented on FLINK-2124: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/848#discussion_r32801136 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java --- @@ -17,37 +17,81 @@ package org.apache.flink.streaming.api.functions.source; -import java.util.Arrays; -import java.util.Collection; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.InputViewDataInputStreamWrapper; +import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; import java.util.Iterator; public class FromElementsFunction implements SourceFunction { private static final long serialVersionUID = 1L; - private Iterable iterable; + private final TypeSerializer serializer; + private final byte[] elements; private volatile boolean isRunning = true; - public FromElementsFunction(T... elements) { - this.iterable = Arrays.asList(elements); - } + public FromElementsFunction(TypeSerializer serializer, final T... elements) { + this(serializer, new Iterable() { + @Override + public Iterator iterator() { + return new Iterator() { + int index = 0; + + @Override + public boolean hasNext() { + return index < elements.length; + } + + @Override + public T next() { + return elements[index++]; + } - public FromElementsFunction(Collection elements) { - this.iterable = elements; + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + }); } - public FromElementsFunction(Iterable elements) { - this.iterable = elements; + public FromElementsFunction(TypeSerializer serializer, Iterable elements) { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + OutputViewDataOutputStreamWrapper wrapper = new OutputViewDataOutputStreamWrapper(new DataOutputStream(baos)); + + try { + for (T element : elements) { + serializer.serialize(element, wrapper); + } + } catch (IOException e) { + // ByteArrayOutputStream doesn't throw IOExceptions when written to --- End diff -- I think not forwarding the exception here is a bad idea. > FromElementsFunction is not really Serializable > --- > > Key: FLINK-2124 > URL: https://issues.apache.org/jira/browse/FLINK-2124 > Project: Flink > Issue Type: Bug > Components: Streaming >Reporter: Aljoscha Krettek >Assignee: Johannes Reifferscheid > > The function stores an Iterable of T. T is not necessarily Serializable and > and Iterable is also not necessarily Serializable. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: New operator state interfaces
Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/747#issuecomment-113362527 Let's keep the current interfaces `Checkpointed` and `AsynchronouslyCheckpointed` to not fully break current programs. They are used actually in examples that have been published. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Modified the equals method in all the Value ty...
Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/850#issuecomment-113303962 I would not expect `equals()` to throw an exception, so this is a good fix in my opinion. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: New operator state interfaces
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/747#discussion_r32784958 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java --- @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.state; + +import static org.junit.Assert.assertEquals; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.state.OperatorState; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.operators.testutils.MockEnvironment; +import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; +import org.apache.flink.runtime.state.LocalStateHandle.LocalStateHandleProvider; +import org.apache.flink.runtime.state.PartitionedStateHandle; +import org.apache.flink.shaded.com.google.common.collect.ImmutableMap; --- End diff -- Import of shaded guava --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: New operator state interfaces
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/747#discussion_r32784821 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StreamOperatorState.java --- @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.state; + +import java.io.Serializable; +import java.util.Map; + +import org.apache.flink.api.common.state.OperatorState; +import org.apache.flink.api.common.state.StateCheckpointer; +import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.runtime.state.StateHandleProvider; +import org.apache.flink.shaded.com.google.common.collect.ImmutableMap; --- End diff -- Import of shaded guava --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: New operator state interfaces
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/747#discussion_r32784302 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SuccessfulCheckpoint.java --- @@ -64,6 +73,22 @@ public long getTimestamp() { return states; } + /** +* Returns the task state included in the checkpoint for a given JobVertexID if it exists or +* null if no state is included for that id. +* +* @param jobVertexID +* @return +*/ + public StateForTask getState(JobVertexID jobVertexID) + { + if(vertexToState.containsKey(jobVertexID)) { + return vertexToState.get(jobVertexID); --- End diff -- I think you can save one map lookup by calling get() immediately. It will return null on a missing key. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: check in if clause in LongValue.java
Github user sahitya-pavurala commented on the pull request: https://github.com/apache/flink/pull/849#issuecomment-113299510 I think that the instanceOf should be changed to getClass in the first place. getClass makes more sense since its a equals method and we want both the types to be identical(not allow sub types) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: check in if clause in LongValue.java
Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/849#issuecomment-113298367 I think that the correct thing is to drop the check for not null, because the `instanceof` keyword checks that the argument is not null already. The current code hence checks twice that the value is not null. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: check in if clause in LongValue.java
Github user sahitya-pavurala commented on the pull request: https://github.com/apache/flink/pull/849#issuecomment-113270413 Okay, thanks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: check in if clause in LongValue.java
Github user hsaputra commented on the pull request: https://github.com/apache/flink/pull/849#issuecomment-113270184 Hi @sahitya-pavurala , if the PR is merged we will automatically close the PR for you. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-2237) Add hash-based Aggregation
[ https://issues.apache.org/jira/browse/FLINK-2237?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger updated FLINK-2237: -- Fix Version/s: (was: pre-apache) > Add hash-based Aggregation > -- > > Key: FLINK-2237 > URL: https://issues.apache.org/jira/browse/FLINK-2237 > Project: Flink > Issue Type: New Feature >Reporter: Rafiullah Momand >Priority: Minor > > Aggregation functions at the moment are implemented in a sort-based way. > How can we implement hash based Aggregation for Flink? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-2237) Add hash-based Aggregation
[ https://issues.apache.org/jira/browse/FLINK-2237?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger updated FLINK-2237: -- Labels: (was: github-import) > Add hash-based Aggregation > -- > > Key: FLINK-2237 > URL: https://issues.apache.org/jira/browse/FLINK-2237 > Project: Flink > Issue Type: New Feature >Reporter: Rafiullah Momand >Priority: Minor > > Aggregation functions at the moment are implemented in a sort-based way. > How can we implement hash based Aggregation for Flink? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2237) Add hash-based Aggregation
[ https://issues.apache.org/jira/browse/FLINK-2237?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14592256#comment-14592256 ] Stephan Ewen commented on FLINK-2237: - All of that being said, be aware of what you are implementing here: This may be one of the tougtest open issues in Flink. Since the performance of the hash aggregator pretty much defies the performance of the job, we really need to make every CPU cycle count in that implementation. > Add hash-based Aggregation > -- > > Key: FLINK-2237 > URL: https://issues.apache.org/jira/browse/FLINK-2237 > Project: Flink > Issue Type: New Feature >Reporter: Rafiullah Momand >Priority: Minor > Labels: github-import > Fix For: pre-apache > > > Aggregation functions at the moment are implemented in a sort-based way. > How can we implement hash based Aggregation for Flink? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: Modified the equals method in all the Value ty...
Github user sahitya-pavurala commented on the pull request: https://github.com/apache/flink/pull/850#issuecomment-113235155 If there is someone out there relying on the NPE, i think it should be thrown.(throw) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: check in if clause in LongValue.java
Github user sahitya-pavurala commented on the pull request: https://github.com/apache/flink/pull/849#issuecomment-113232609 I am relatively new to github, i cant close the pr until you merge the code right? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1520) Read edges and vertices from CSV files
[ https://issues.apache.org/jira/browse/FLINK-1520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14592090#comment-14592090 ] ASF GitHub Bot commented on FLINK-1520: --- Github user andralungu commented on the pull request: https://github.com/apache/flink/pull/847#issuecomment-113217602 Ah! And I just remembered! Maybe it makes sense to update the examples to use `fromCSV` when creating the Graph instead of `getEdgesDataSet`. > Read edges and vertices from CSV files > -- > > Key: FLINK-1520 > URL: https://issues.apache.org/jira/browse/FLINK-1520 > Project: Flink > Issue Type: New Feature > Components: Gelly >Reporter: Vasia Kalavri >Assignee: Shivani Ghatge >Priority: Minor > Labels: easyfix, newbie > > Add methods to create Vertex and Edge Datasets directly from CSV file inputs. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...
Github user andralungu commented on the pull request: https://github.com/apache/flink/pull/847#issuecomment-113217602 Ah! And I just remembered! Maybe it makes sense to update the examples to use `fromCSV` when creating the Graph instead of `getEdgesDataSet`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1520) Read edges and vertices from CSV files
[ https://issues.apache.org/jira/browse/FLINK-1520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14592072#comment-14592072 ] ASF GitHub Bot commented on FLINK-1520: --- Github user andralungu commented on the pull request: https://github.com/apache/flink/pull/847#issuecomment-113213190 This looks very nice! Someone deserves a virtual :ice_cream: ! There are some tests missing: - test `fromCSV` with a Mapper - you just test `types`, `ignoreFirstLines` and `ignoreComments`; let's at least add tests for the `lineDelimiter*` and the `fieldDelimiter*` methods. I'm sure they work, but tests are written to guarantee that the functionality will also be there (at the same quality) in the future (i.e. some exotic code addition will not break it) :) I saw an outdated Vasia comment on an unused import; always hit mvn verify before pushing - it would have caught that :D > Read edges and vertices from CSV files > -- > > Key: FLINK-1520 > URL: https://issues.apache.org/jira/browse/FLINK-1520 > Project: Flink > Issue Type: New Feature > Components: Gelly >Reporter: Vasia Kalavri >Assignee: Shivani Ghatge >Priority: Minor > Labels: easyfix, newbie > > Add methods to create Vertex and Edge Datasets directly from CSV file inputs. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...
Github user andralungu commented on the pull request: https://github.com/apache/flink/pull/847#issuecomment-113213190 This looks very nice! Someone deserves a virtual :ice_cream: ! There are some tests missing: - test `fromCSV` with a Mapper - you just test `types`, `ignoreFirstLines` and `ignoreComments`; let's at least add tests for the `lineDelimiter*` and the `fieldDelimiter*` methods. I'm sure they work, but tests are written to guarantee that the functionality will also be there (at the same quality) in the future (i.e. some exotic code addition will not break it) :) I saw an outdated Vasia comment on an unused import; always hit mvn verify before pushing - it would have caught that :D --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-2242) Deprecate RemoteCollector
Ufuk Celebi created FLINK-2242: -- Summary: Deprecate RemoteCollector Key: FLINK-2242 URL: https://issues.apache.org/jira/browse/FLINK-2242 Project: Flink Issue Type: Improvement Components: Java API Affects Versions: 0.9, master Reporter: Ufuk Celebi Priority: Minor {{collect}} replaces {{RemoteCollector}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: Version bumped to 0.10-SNAPSHOT
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/851 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Version bumped to 0.10-SNAPSHOT
Github user mbalassi commented on the pull request: https://github.com/apache/flink/pull/851#issuecomment-113181805 Thanks, merging. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...
Github user shghatge commented on the pull request: https://github.com/apache/flink/pull/847#issuecomment-113165614 Updated PR --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1520) Read edges and vertices from CSV files
[ https://issues.apache.org/jira/browse/FLINK-1520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14591827#comment-14591827 ] ASF GitHub Bot commented on FLINK-1520: --- Github user shghatge commented on the pull request: https://github.com/apache/flink/pull/847#issuecomment-113165614 Updated PR > Read edges and vertices from CSV files > -- > > Key: FLINK-1520 > URL: https://issues.apache.org/jira/browse/FLINK-1520 > Project: Flink > Issue Type: New Feature > Components: Gelly >Reporter: Vasia Kalavri >Assignee: Shivani Ghatge >Priority: Minor > Labels: easyfix, newbie > > Add methods to create Vertex and Edge Datasets directly from CSV file inputs. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2093][gelly] Added difference Method
Github user shghatge commented on the pull request: https://github.com/apache/flink/pull/818#issuecomment-113150859 Updated the docs accordingly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2093) Add a difference method to Gelly's Graph class
[ https://issues.apache.org/jira/browse/FLINK-2093?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14591753#comment-14591753 ] ASF GitHub Bot commented on FLINK-2093: --- Github user shghatge commented on the pull request: https://github.com/apache/flink/pull/818#issuecomment-113150859 Updated the docs accordingly. > Add a difference method to Gelly's Graph class > -- > > Key: FLINK-2093 > URL: https://issues.apache.org/jira/browse/FLINK-2093 > Project: Flink > Issue Type: New Feature > Components: Gelly >Affects Versions: 0.9 >Reporter: Andra Lungu >Assignee: Shivani Ghatge >Priority: Minor > > This method will compute the difference between two graphs, returning a new > graph containing the vertices and edges that the current graph and the input > graph don't have in common. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1962) Add Gelly Scala API
[ https://issues.apache.org/jira/browse/FLINK-1962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14591751#comment-14591751 ] ASF GitHub Bot commented on FLINK-1962: --- Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/808#issuecomment-113150313 Yes, the formatting would indeed have to be changed to conform to the pre-existing Flink Scala style. > Add Gelly Scala API > --- > > Key: FLINK-1962 > URL: https://issues.apache.org/jira/browse/FLINK-1962 > Project: Flink > Issue Type: Task > Components: Gelly, Scala API >Affects Versions: 0.9 >Reporter: Vasia Kalavri >Assignee: PJ Van Aeken > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-1962] Add Gelly Scala API
Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/808#issuecomment-113150313 Yes, the formatting would indeed have to be changed to conform to the pre-existing Flink Scala style. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2093][gelly] Added difference Method
Github user andralungu commented on the pull request: https://github.com/apache/flink/pull/818#issuecomment-113149078 Hi @shghatge , Don't forget to remove the definition for the public removeVertices(DataSet) from the documentation. Up for discussion: should we keep the name removeVertices for the private, helper method or should we call it something else, like removeVerticesAndEdges... Names are not my strongest point, but I guess you got the idea :) Personally, I am fine with the current name! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2093) Add a difference method to Gelly's Graph class
[ https://issues.apache.org/jira/browse/FLINK-2093?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14591745#comment-14591745 ] ASF GitHub Bot commented on FLINK-2093: --- Github user andralungu commented on the pull request: https://github.com/apache/flink/pull/818#issuecomment-113149078 Hi @shghatge , Don't forget to remove the definition for the public removeVertices(DataSet) from the documentation. Up for discussion: should we keep the name removeVertices for the private, helper method or should we call it something else, like removeVerticesAndEdges... Names are not my strongest point, but I guess you got the idea :) Personally, I am fine with the current name! > Add a difference method to Gelly's Graph class > -- > > Key: FLINK-2093 > URL: https://issues.apache.org/jira/browse/FLINK-2093 > Project: Flink > Issue Type: New Feature > Components: Gelly >Affects Versions: 0.9 >Reporter: Andra Lungu >Assignee: Shivani Ghatge >Priority: Minor > > This method will compute the difference between two graphs, returning a new > graph containing the vertices and edges that the current graph and the input > graph don't have in common. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1962) Add Gelly Scala API
[ https://issues.apache.org/jira/browse/FLINK-1962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14591741#comment-14591741 ] ASF GitHub Bot commented on FLINK-1962: --- Github user chiwanpark commented on the pull request: https://github.com/apache/flink/pull/808#issuecomment-113147981 Hi. I'm very excited about Gelly's Scala API. I'm reading the changes from this PR. I found a problem about re-formatting. Unlike Flink's Java code, Flink's Scala code use 2 spaces as indent. But I think the IDE did re-format all code to use 4 spaces as indent. We should preserve the previous indent setting to preserve modification history. > Add Gelly Scala API > --- > > Key: FLINK-1962 > URL: https://issues.apache.org/jira/browse/FLINK-1962 > Project: Flink > Issue Type: Task > Components: Gelly, Scala API >Affects Versions: 0.9 >Reporter: Vasia Kalavri >Assignee: PJ Van Aeken > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-1962] Add Gelly Scala API
Github user chiwanpark commented on the pull request: https://github.com/apache/flink/pull/808#issuecomment-113147981 Hi. I'm very excited about Gelly's Scala API. I'm reading the changes from this PR. I found a problem about re-formatting. Unlike Flink's Java code, Flink's Scala code use 2 spaces as indent. But I think the IDE did re-format all code to use 4 spaces as indent. We should preserve the previous indent setting to preserve modification history. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2093][gelly] Added difference Method
Github user shghatge commented on the pull request: https://github.com/apache/flink/pull/818#issuecomment-113147138 Updated the PR by changing the removeVertices(DataSet>) method access from Public to Private. It is only used as a helper function for the difference method. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2093) Add a difference method to Gelly's Graph class
[ https://issues.apache.org/jira/browse/FLINK-2093?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14591735#comment-14591735 ] ASF GitHub Bot commented on FLINK-2093: --- Github user shghatge commented on the pull request: https://github.com/apache/flink/pull/818#issuecomment-113147138 Updated the PR by changing the removeVertices(DataSet>) method access from Public to Private. It is only used as a helper function for the difference method. > Add a difference method to Gelly's Graph class > -- > > Key: FLINK-2093 > URL: https://issues.apache.org/jira/browse/FLINK-2093 > Project: Flink > Issue Type: New Feature > Components: Gelly >Affects Versions: 0.9 >Reporter: Andra Lungu >Assignee: Shivani Ghatge >Priority: Minor > > This method will compute the difference between two graphs, returning a new > graph containing the vertices and edges that the current graph and the input > graph don't have in common. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: Version bumped to 0.10-SNAPSHOT
Github user chiwanpark commented on the pull request: https://github.com/apache/flink/pull/851#issuecomment-113144626 +1 for merging. :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Modified the equals method in all the Value ty...
Github user sahitya-pavurala commented on the pull request: https://github.com/apache/flink/pull/850#issuecomment-113121714 instanceOf being an operator will return false if its a null reference , but getClass being a method will give an exception(I think so), also use of getClass makes more sense since its a equals method and we want both the types to be identical(no sub types) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Modified the equals method in all the Value ty...
Github user sahitya-pavurala commented on the pull request: https://github.com/apache/flink/pull/850#issuecomment-113120768 Well yes, my opinion is to use getClass in place of instanceOf , I didnt make the change yet , so in that case I think we need to have a check for null reference. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: check in if clause in LongValue.java
Github user uce commented on the pull request: https://github.com/apache/flink/pull/849#issuecomment-113120481 Since both are booleans, this check is equivalent to `&&` w/o the short-circuit evaluation. So good to merge. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Modified the equals method in all the Value ty...
Github user uce commented on the pull request: https://github.com/apache/flink/pull/850#issuecomment-113119208 So, we have been handling `null` values inconsistently from what I can see. `LongValue` checks for `null` whereas the changed classes in this PR did not. The equals contract is to return `false` for `obj.equals(null)` but I think there are [fair arguments for throwing NPE](http://stackoverflow.com/questions/2887761/is-it-a-bad-idea-if-equalsnull-throws-nullpointerexception-instead) as well. I think it's good to have it consistent in all `Value` classes. I'm in favor of your change, but it is essentially API breaking. There could be someone out there relying on the NPE. ;-) Need more opinions on this, @tillrohrmann, @fhueske. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: check in if clause in LongValue.java
Github user sahitya-pavurala commented on the pull request: https://github.com/apache/flink/pull/849#issuecomment-113110114 okay thanks , so after you merge the code , there are few more things to be changed in those classes , will make the changes shortly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1520) Read edges and vertices from CSV files
[ https://issues.apache.org/jira/browse/FLINK-1520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14591644#comment-14591644 ] ASF GitHub Bot commented on FLINK-1520: --- Github user shghatge commented on the pull request: https://github.com/apache/flink/pull/847#issuecomment-113107660 Hello @vasia I will follow the guidelines and add the tests that are suggested by you when making a commit. For the separate configuration methods issue, I was thinking more along the lines that if we want to configure the readers separately, then we could use the get methods for the CsvReaders and then configure them. But I will add the separate method now. Thanks for the detailed guidance. :) > Read edges and vertices from CSV files > -- > > Key: FLINK-1520 > URL: https://issues.apache.org/jira/browse/FLINK-1520 > Project: Flink > Issue Type: New Feature > Components: Gelly >Reporter: Vasia Kalavri >Assignee: Shivani Ghatge >Priority: Minor > Labels: easyfix, newbie > > Add methods to create Vertex and Edge Datasets directly from CSV file inputs. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...
Github user shghatge commented on the pull request: https://github.com/apache/flink/pull/847#issuecomment-113107660 Hello @vasia I will follow the guidelines and add the tests that are suggested by you when making a commit. For the separate configuration methods issue, I was thinking more along the lines that if we want to configure the readers separately, then we could use the get methods for the CsvReaders and then configure them. But I will add the separate method now. Thanks for the detailed guidance. :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Version bumped to 0.10-SNAPSHOT
Github user uce commented on the pull request: https://github.com/apache/flink/pull/851#issuecomment-113106251 Looks good. Checked that all POMs are changed. +1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Version bumped to 0.10-SNAPSHOT
GitHub user mbalassi opened a pull request: https://github.com/apache/flink/pull/851 Version bumped to 0.10-SNAPSHOT The release-0.9 and master branches are diverging, it is time to make the distinction on the master. You can merge this pull request into a Git repository by running: $ git pull https://github.com/mbalassi/flink 10-bump Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/851.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #851 commit 6280adffd557306d77418c6f6949c199e754bcbd Author: mbalassi Date: 2015-06-18T10:41:01Z Version bumped to 0.10-SNAPSHOT --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2220) Join on Pojo without hashCode() silently fails
[ https://issues.apache.org/jira/browse/FLINK-2220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14591620#comment-14591620 ] Fabian Hueske commented on FLINK-2220: -- Every object has a {{hashCode()}} method. So we need to check via reflection if {{Object.hashCode()}} was overridden. But otherwise I think [~rmetzger] is right. It should be checked together with the other POJO conditions. I guess the same problem applies as well to GenericTypes that implement {{Comparable}}. > Join on Pojo without hashCode() silently fails > -- > > Key: FLINK-2220 > URL: https://issues.apache.org/jira/browse/FLINK-2220 > Project: Flink > Issue Type: Bug >Affects Versions: 0.9, 0.8.1 >Reporter: Marcus Leich > > I need to perform a join using a complete Pojo as join key. > With DOP > 1 this only works if the Pojo comes with a meaningful hasCode() > implementation, as otherwise equal objects will get hashed to different > partitions based on their memory address and not on the content. > I guess it's fine if users are required to implement hasCode() themselves, > but it would be nice of documentation or better yet, Flink itself could alert > users that this is a requirement, similar to how Comparable is required for > keys. > Use the following code to reproduce the issue: > public class Pojo implements Comparable { > public byte[] data; > public Pojo () { > } > public Pojo (byte[] data) { > this.data = data; > } > @Override > public int compareTo(Pojo o) { > return UnsignedBytes.lexicographicalComparator().compare(data, > o.data); > } > // uncomment me for making the join work > /* @Override > public int hashCode() { > return Arrays.hashCode(data); > }*/ > } > public void testJoin () throws Exception { > final ExecutionEnvironment env = > ExecutionEnvironment.createLocalEnvironment(); > env.setParallelism(4); > DataSet> left = env.fromElements( > new Tuple2<>(new Pojo(new byte[] {0, 24, 23, 1, 3}), "black"), > new Tuple2<>(new Pojo(new byte[] {0, 14, 13, 14, 13}), "red"), > new Tuple2<>(new Pojo(new byte[] {1}), "Spark"), > new Tuple2<>(new Pojo(new byte[] {2}), "good"), > new Tuple2<>(new Pojo(new byte[] {5}), "bug")); > DataSet> right = env.fromElements( > new Tuple2<>(new Pojo(new byte[] {0, 24, 23, 1, 3}), "white"), > new Tuple2<>(new Pojo(new byte[] {0, 14, 13, 14, 13}), > "green"), > new Tuple2<>(new Pojo(new byte[] {1}), "Flink"), > new Tuple2<>(new Pojo(new byte[] {2}), "evil"), > new Tuple2<>(new Pojo(new byte[] {5}), "fix")); > // will not print anything unless Pojo has a real hashCode() > implementation > > left.join(right).where(0).equalTo(0).projectFirst(1).projectSecond(1).print(); > } -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2105) Implement Sort-Merge Outer Join algorithm
[ https://issues.apache.org/jira/browse/FLINK-2105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14591613#comment-14591613 ] Fabian Hueske commented on FLINK-2105: -- Hi, I'm commenting on your assumptions first: bq. In the case where one input side is broadcast and you try to perform a sort-merge outer join on the same side that was broadcast (or a full outer join), you don't know whether to emit (x, null) or whether there is maybe a matching key x on the right side on some other node. A Left-outer join can be executed using a repartition-repartition or forward-broadcast ship strategy (right-outer join is switched, full outer join only possible using repartition-repartition) bq. Similarly, the same problem occurs if you were to perform a sort-merge based cogroup, with one side being broadcast. CoGroup is always executed as repartition-repartition (as you observed later). bq. Is there even a case in the current implementation where you would broadcast one side and perform a sort-merge join as opposed to a hash join with the broadcast side as the build side? This scenario wouldn't make a lot of sense IMO; I don't think we found the option for that in the source code either. (E.g. there is no BROADCAST_SORT_MERGE in the JoinHint enum.) That can be beneficial if a sorted result can be reused. Broadcast-forward and HybridHash are more common, though. bq. there must be some component in the flink runtime which decides which partitioning makes sense for which operator and operator strategy. For example, if the optimizer chooses the left side shall be broadcast, then the MatchDriver should perform a HYBRIDHASH_BUILD_FIRST join, and so on bq. keeping track of which side was broadcast, repartitioned, sorted or grouped doesn't appear to be the responsibility of the Driver implementation or, in the case of the MatchDriver, the iterator implementations that perform the sort-merge or hash joins, correct? The optimizer takes care of choosing the execution strategies. It tracks the physical properties (sorting, partitioning) of intermediate results and partitions the data in an appropriate way and chooses the local and driver strategies. All of that is already decided and fixed when the program is executed. The driver does not decide anything and only does what the optimizer told it to do. I would recommend to not think about partitioning yet and focus on the local join algorithm. You can safely assume, that the data is partitioned in a suitable way and work the local join algorithm. It is important that you cover your implementation with a lot of unit tests to make sure it works as expected. Once the local algorithm is done, it needs to be integrated into a Driver and all the optimizer integration needs to be done. > Implement Sort-Merge Outer Join algorithm > - > > Key: FLINK-2105 > URL: https://issues.apache.org/jira/browse/FLINK-2105 > Project: Flink > Issue Type: Sub-task > Components: Local Runtime >Reporter: Fabian Hueske >Assignee: Ricky Pogalz >Priority: Minor > Fix For: pre-apache > > > Flink does not natively support outer joins at the moment. > This issue proposes to implement a sort-merge outer join algorithm that can > cover left, right, and full outer joins. > The implementation can be based on the regular sort-merge join iterator > ({{ReusingMergeMatchIterator}} and {{NonReusingMergeMatchIterator}}, see also > {{MatchDriver}} class) > The Reusing and NonReusing variants differ in whether object instances are > reused or new objects are created. I would start with the NonReusing variant > which is safer from a user's point of view and should also be easier to > implement. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2232) StormWordCountLocalITCase fails
[ https://issues.apache.org/jira/browse/FLINK-2232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14591586#comment-14591586 ] Márton Balassi commented on FLINK-2232: --- Merged a commit from [~mjsax] increasing the timeout. The complete fix is dependant on [1], let us keep the issue open until that is merged. [1] https://github.com/apache/flink/pull/750 > StormWordCountLocalITCase fails > --- > > Key: FLINK-2232 > URL: https://issues.apache.org/jira/browse/FLINK-2232 > Project: Flink > Issue Type: Bug > Components: Streaming, Tests >Affects Versions: master >Reporter: Ufuk Celebi >Assignee: Matthias J. Sax >Priority: Minor > > https://travis-ci.org/apache/flink/jobs/66936476 > > {code} > StormWordCountLocalITCase>StreamingProgramTestBase.testJobWithoutObjectReuse:109->postSubmit:40->TestBaseUtils.compareResultsByLinesInMemory:256->TestBaseUtils.compareResultsByLinesInMemory:270 > Different number of lines in expected and obtained result. expected:<801> > but was:<0> > {code} > Can we disable the test until this is fixed? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2232] StormWordCountLocalITCase fails
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/845 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2232) StormWordCountLocalITCase fails
[ https://issues.apache.org/jira/browse/FLINK-2232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14591582#comment-14591582 ] ASF GitHub Bot commented on FLINK-2232: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/845 > StormWordCountLocalITCase fails > --- > > Key: FLINK-2232 > URL: https://issues.apache.org/jira/browse/FLINK-2232 > Project: Flink > Issue Type: Bug > Components: Streaming, Tests >Affects Versions: master >Reporter: Ufuk Celebi >Assignee: Matthias J. Sax >Priority: Minor > > https://travis-ci.org/apache/flink/jobs/66936476 > > {code} > StormWordCountLocalITCase>StreamingProgramTestBase.testJobWithoutObjectReuse:109->postSubmit:40->TestBaseUtils.compareResultsByLinesInMemory:256->TestBaseUtils.compareResultsByLinesInMemory:270 > Different number of lines in expected and obtained result. expected:<801> > but was:<0> > {code} > Can we disable the test until this is fixed? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2237) Add hash-based Aggregation
[ https://issues.apache.org/jira/browse/FLINK-2237?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14591580#comment-14591580 ] Fabian Hueske commented on FLINK-2237: -- Looking at the code of the other hash tables is a very good idea. Before, starting with a Driver implementation, I would recommend to build the hash table stand-alone and add extensive unit tests to verify that it works correctly. I don't think that a two-pass hash algorithm for combining and reducing would be very beneficial performance-wise. As [~StephanEwen] said on the mailing list, a hash-based combiner is expected to give the best performance improvement. Such a hash-based combiner would work by incrementally updating the (pre-)aggregated result in a hash table in a single-pass. There are basically two implementation designs for a combiner hash table that is incrementally updated. 1. A hash table for records with a fixed-length binary representation. If the length of an aggregated record does not change in the Combine operation (either because it is inherently fixed length like a Tuple2 or because we know, that the Combine function only updates fixed-length fields of a record) the hash table can update the record in-place. 2. A hash table for records with variable-length binary representation. If the length of the binary representation of a record changes in the Combine operation, the hash table cannot be updated in place. Instead, new records need to be appended and the old one invalidated. Periodically, the table needs to be compacted. The hash table for fixed-length records is certainly easier to realized and more efficient. Hence, I would recommend to work on that. > Add hash-based Aggregation > -- > > Key: FLINK-2237 > URL: https://issues.apache.org/jira/browse/FLINK-2237 > Project: Flink > Issue Type: New Feature >Reporter: Rafiullah Momand >Priority: Minor > Labels: github-import > Fix For: pre-apache > > > Aggregation functions at the moment are implemented in a sort-based way. > How can we implement hash based Aggregation for Flink? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2232] StormWordCountLocalITCase fails
Github user mbalassi commented on the pull request: https://github.com/apache/flink/pull/845#issuecomment-113096386 Thanks for the temporal fix, I think the real solution is the one proposed in #750. Will merge this for now. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2232) StormWordCountLocalITCase fails
[ https://issues.apache.org/jira/browse/FLINK-2232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14591577#comment-14591577 ] ASF GitHub Bot commented on FLINK-2232: --- Github user mbalassi commented on the pull request: https://github.com/apache/flink/pull/845#issuecomment-113096386 Thanks for the temporal fix, I think the real solution is the one proposed in #750. Will merge this for now. > StormWordCountLocalITCase fails > --- > > Key: FLINK-2232 > URL: https://issues.apache.org/jira/browse/FLINK-2232 > Project: Flink > Issue Type: Bug > Components: Streaming, Tests >Affects Versions: master >Reporter: Ufuk Celebi >Assignee: Matthias J. Sax >Priority: Minor > > https://travis-ci.org/apache/flink/jobs/66936476 > > {code} > StormWordCountLocalITCase>StreamingProgramTestBase.testJobWithoutObjectReuse:109->postSubmit:40->TestBaseUtils.compareResultsByLinesInMemory:256->TestBaseUtils.compareResultsByLinesInMemory:270 > Different number of lines in expected and obtained result. expected:<801> > but was:<0> > {code} > Can we disable the test until this is fixed? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: New operator state interfaces
Github user mbalassi commented on the pull request: https://github.com/apache/flink/pull/747#issuecomment-113095684 I vote for the second option, it is more clean and more inline with the current behavior of the API. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-2240) Use BloomFilter to minimize probe side records which are spilled to disk in Hybrid-Hash-Join
[ https://issues.apache.org/jira/browse/FLINK-2240?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann updated FLINK-2240: - Assignee: Chengxiang Li > Use BloomFilter to minimize probe side records which are spilled to disk in > Hybrid-Hash-Join > > > Key: FLINK-2240 > URL: https://issues.apache.org/jira/browse/FLINK-2240 > Project: Flink > Issue Type: Improvement > Components: Core >Reporter: Chengxiang Li >Assignee: Chengxiang Li >Priority: Minor > > In Hybrid-Hash-Join, while small table does not fit into memory, part of the > small table data would be spilled to disk, and the counterpart partition of > big table data would be spilled to disk in probe phase as well. If we build a > BloomFilter while spill small table to disk during build phase, and use it to > filter the big table records which tend to be spilled to disk, this may > greatly reduce the spilled big table file size, and saved the disk IO cost > for writing and further reading. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2241) Use BloomFilter to minmize probe side records which are spilled to disk in Hybrid-Hash-Join
[ https://issues.apache.org/jira/browse/FLINK-2241?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14591552#comment-14591552 ] Chengxiang Li commented on FLINK-2241: -- Oh, thanks, [~till.rohrmann], i should double clicked my mouse. > Use BloomFilter to minmize probe side records which are spilled to disk in > Hybrid-Hash-Join > --- > > Key: FLINK-2241 > URL: https://issues.apache.org/jira/browse/FLINK-2241 > Project: Flink > Issue Type: Improvement > Components: Core >Reporter: Chengxiang Li >Priority: Minor > > In Hybrid-Hash-Join, while small table does not fit into memory, part of the > small table data would be spilled to disk, and the counterpart partition of > big table data would be spilled to disk in probe phase as well. If we build a > BloomFilter while spill small table to disk during build phase, and use it to > filter the big table records which tend to be spilled to disk, this may > greatly reduce the spilled big table file size, and saved the disk IO cost > for writing and further reading. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-2240) Use BloomFilter to minimize probe side records which are spilled to disk in Hybrid-Hash-Join
[ https://issues.apache.org/jira/browse/FLINK-2240?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chengxiang Li updated FLINK-2240: - Summary: Use BloomFilter to minimize probe side records which are spilled to disk in Hybrid-Hash-Join (was: Use BloomFilter to minmize build side records which spilled to disk in Hybrid-Hash-Join) > Use BloomFilter to minimize probe side records which are spilled to disk in > Hybrid-Hash-Join > > > Key: FLINK-2240 > URL: https://issues.apache.org/jira/browse/FLINK-2240 > Project: Flink > Issue Type: Improvement > Components: Core >Reporter: Chengxiang Li >Priority: Minor > > In Hybrid-Hash-Join, while small table does not fit into memory, part of the > small table data would be spilled to disk, and the counterpart partition of > big table data would be spilled to disk in probe phase as well. If we build a > BloomFilter while spill small table to disk during build phase, and use it to > filter the big table records which tend to be spilled to disk, this may > greatly reduce the spilled big table file size, and saved the disk IO cost > for writing and further reading. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-2124) FromElementsFunction is not really Serializable
[ https://issues.apache.org/jira/browse/FLINK-2124?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann updated FLINK-2124: - Assignee: Johannes Reifferscheid (was: Till Rohrmann) > FromElementsFunction is not really Serializable > --- > > Key: FLINK-2124 > URL: https://issues.apache.org/jira/browse/FLINK-2124 > Project: Flink > Issue Type: Bug > Components: Streaming >Reporter: Aljoscha Krettek >Assignee: Johannes Reifferscheid > > The function stores an Iterable of T. T is not necessarily Serializable and > and Iterable is also not necessarily Serializable. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-2124) FromElementsFunction is not really Serializable
[ https://issues.apache.org/jira/browse/FLINK-2124?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann updated FLINK-2124: - Assignee: Johannes Reifferscheid > FromElementsFunction is not really Serializable > --- > > Key: FLINK-2124 > URL: https://issues.apache.org/jira/browse/FLINK-2124 > Project: Flink > Issue Type: Bug > Components: Streaming >Reporter: Aljoscha Krettek >Assignee: Johannes Reifferscheid > > The function stores an Iterable of T. T is not necessarily Serializable and > and Iterable is also not necessarily Serializable. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (FLINK-2241) Use BloomFilter to minmize probe side records which are spilled to disk in Hybrid-Hash-Join
[ https://issues.apache.org/jira/browse/FLINK-2241?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann closed FLINK-2241. Resolution: Duplicate Seems to be duplicate of FLINK-2240. > Use BloomFilter to minmize probe side records which are spilled to disk in > Hybrid-Hash-Join > --- > > Key: FLINK-2241 > URL: https://issues.apache.org/jira/browse/FLINK-2241 > Project: Flink > Issue Type: Improvement > Components: Core >Reporter: Chengxiang Li >Priority: Minor > > In Hybrid-Hash-Join, while small table does not fit into memory, part of the > small table data would be spilled to disk, and the counterpart partition of > big table data would be spilled to disk in probe phase as well. If we build a > BloomFilter while spill small table to disk during build phase, and use it to > filter the big table records which tend to be spilled to disk, this may > greatly reduce the spilled big table file size, and saved the disk IO cost > for writing and further reading. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-2241) Use BloomFilter to minmize probe side records which are spilled to disk in Hybrid-Hash-Join
[ https://issues.apache.org/jira/browse/FLINK-2241?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chengxiang Li updated FLINK-2241: - Summary: Use BloomFilter to minmize probe side records which are spilled to disk in Hybrid-Hash-Join (was: Use BloomFilter to minmize build side records which spilled to disk in Hybrid-Hash-Join) > Use BloomFilter to minmize probe side records which are spilled to disk in > Hybrid-Hash-Join > --- > > Key: FLINK-2241 > URL: https://issues.apache.org/jira/browse/FLINK-2241 > Project: Flink > Issue Type: Improvement > Components: Core >Reporter: Chengxiang Li >Priority: Minor > > In Hybrid-Hash-Join, while small table does not fit into memory, part of the > small table data would be spilled to disk, and the counterpart partition of > big table data would be spilled to disk in probe phase as well. If we build a > BloomFilter while spill small table to disk during build phase, and use it to > filter the big table records which tend to be spilled to disk, this may > greatly reduce the spilled big table file size, and saved the disk IO cost > for writing and further reading. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2241) Use BloomFilter to minmize build side records which spilled to disk in Hybrid-Hash-Join
Chengxiang Li created FLINK-2241: Summary: Use BloomFilter to minmize build side records which spilled to disk in Hybrid-Hash-Join Key: FLINK-2241 URL: https://issues.apache.org/jira/browse/FLINK-2241 Project: Flink Issue Type: Improvement Components: Core Reporter: Chengxiang Li Priority: Minor In Hybrid-Hash-Join, while small table does not fit into memory, part of the small table data would be spilled to disk, and the counterpart partition of big table data would be spilled to disk in probe phase as well. If we build a BloomFilter while spill small table to disk during build phase, and use it to filter the big table records which tend to be spilled to disk, this may greatly reduce the spilled big table file size, and saved the disk IO cost for writing and further reading. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2240) Use BloomFilter to minmize build side records which spilled to disk in Hybrid-Hash-Join
Chengxiang Li created FLINK-2240: Summary: Use BloomFilter to minmize build side records which spilled to disk in Hybrid-Hash-Join Key: FLINK-2240 URL: https://issues.apache.org/jira/browse/FLINK-2240 Project: Flink Issue Type: Improvement Components: Core Reporter: Chengxiang Li Priority: Minor In Hybrid-Hash-Join, while small table does not fit into memory, part of the small table data would be spilled to disk, and the counterpart partition of big table data would be spilled to disk in probe phase as well. If we build a BloomFilter while spill small table to disk during build phase, and use it to filter the big table records which tend to be spilled to disk, this may greatly reduce the spilled big table file size, and saved the disk IO cost for writing and further reading. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2124) FromElementsFunction is not really Serializable
[ https://issues.apache.org/jira/browse/FLINK-2124?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14591510#comment-14591510 ] ASF GitHub Bot commented on FLINK-2124: --- Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/848#issuecomment-113081149 LGTM. +1 for merging once Travis has given green light. > FromElementsFunction is not really Serializable > --- > > Key: FLINK-2124 > URL: https://issues.apache.org/jira/browse/FLINK-2124 > Project: Flink > Issue Type: Bug > Components: Streaming >Reporter: Aljoscha Krettek > > The function stores an Iterable of T. T is not necessarily Serializable and > and Iterable is also not necessarily Serializable. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2124] [streaming] Fix behavior of FromE...
Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/848#issuecomment-113081149 LGTM. +1 for merging once Travis has given green light. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-2239) print() on DataSet: stream results and print incrementally
Maximilian Michels created FLINK-2239: - Summary: print() on DataSet: stream results and print incrementally Key: FLINK-2239 URL: https://issues.apache.org/jira/browse/FLINK-2239 Project: Flink Issue Type: Improvement Components: Distributed Runtime Affects Versions: 0.9 Reporter: Maximilian Michels Fix For: 0.10 Users find it counter-intuitive that {{print()}} on a DataSet internally calls {{collect()}} and fully materializes the set. This leads to out of memory errors on the client. It also leaves users with the feeling that Flink cannot handle large amount of data and that it fails frequently. To improve on this situation requires some major architectural changes in Flink. The easiest solution would probably be to transfer the data from the job manager to the client via the {{BlobManager}}. Alternatively, the client could directly connect to the task managers and fetch the results. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: check in if clause in LongValue.java
Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/849#issuecomment-113079519 Thanks für the PR. As long as you keep the PR branch in your GH repository, things are fine. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2124) FromElementsFunction is not really Serializable
[ https://issues.apache.org/jira/browse/FLINK-2124?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14591494#comment-14591494 ] ASF GitHub Bot commented on FLINK-2124: --- Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/848#issuecomment-113079060 Thanks for your contribution @jreiffers. The PR looks really good :-) There is only a problem with the `.*` import. I was thinking about adding a test case where we create a `FromElementsFunction` with non serializable data elements. What do you think? > FromElementsFunction is not really Serializable > --- > > Key: FLINK-2124 > URL: https://issues.apache.org/jira/browse/FLINK-2124 > Project: Flink > Issue Type: Bug > Components: Streaming >Reporter: Aljoscha Krettek > > The function stores an Iterable of T. T is not necessarily Serializable and > and Iterable is also not necessarily Serializable. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2238) Scala ExecutionEnvironment.fromCollection does not work with Sets
Fabian Hueske created FLINK-2238: Summary: Scala ExecutionEnvironment.fromCollection does not work with Sets Key: FLINK-2238 URL: https://issues.apache.org/jira/browse/FLINK-2238 Project: Flink Issue Type: Improvement Components: Scala API Affects Versions: 0.9 Reporter: Fabian Hueske Priority: Minor ExecutionEnvironment.fromCollection() of Scala's DataSet API does only work with Seq[T] and Iterator[T]. This excludes Set[T] which is a common collection type. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2124] [streaming] Fix behavior of FromE...
Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/848#issuecomment-113079060 Thanks for your contribution @jreiffers. The PR looks really good :-) There is only a problem with the `.*` import. I was thinking about adding a test case where we create a `FromElementsFunction` with non serializable data elements. What do you think? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2124) FromElementsFunction is not really Serializable
[ https://issues.apache.org/jira/browse/FLINK-2124?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14591488#comment-14591488 ] ASF GitHub Bot commented on FLINK-2124: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/848#discussion_r32709133 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java --- @@ -17,37 +17,71 @@ package org.apache.flink.streaming.api.functions.source; -import java.util.Arrays; -import java.util.Collection; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.InputViewDataInputStreamWrapper; +import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper; + +import java.io.*; --- End diff -- The checkstyle plugin complains about the `.*` import here. You have to import the different streams explicitly. > FromElementsFunction is not really Serializable > --- > > Key: FLINK-2124 > URL: https://issues.apache.org/jira/browse/FLINK-2124 > Project: Flink > Issue Type: Bug > Components: Streaming >Reporter: Aljoscha Krettek > > The function stores an Iterable of T. T is not necessarily Serializable and > and Iterable is also not necessarily Serializable. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2124] [streaming] Fix behavior of FromE...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/848#discussion_r32709133 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java --- @@ -17,37 +17,71 @@ package org.apache.flink.streaming.api.functions.source; -import java.util.Arrays; -import java.util.Collection; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.InputViewDataInputStreamWrapper; +import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper; + +import java.io.*; --- End diff -- The checkstyle plugin complains about the `.*` import here. You have to import the different streams explicitly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: New operator state interfaces
Github user gyfora commented on the pull request: https://github.com/apache/flink/pull/747#issuecomment-113078373 Alright I think I fixed the issues, now the only thing remains is to add partitioning setting to the API. State partitioning should be a property of the operator therefore it should be set afterwards like parallelism. For example: stream.map(Mapper).setStatePartitioner(...) This is quite tricky however as the state partitioner should affect the partitioning scheme of the input streams (otherwise it makes no sense). I see two approaches here: 1. simply overwrite the partitioning without warning 2. Only overwrite in case it is not defined (forward), otherwise throw exception stating that partitioning cannot be different from statePartitioning --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2124) FromElementsFunction is not really Serializable
[ https://issues.apache.org/jira/browse/FLINK-2124?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14591485#comment-14591485 ] ASF GitHub Bot commented on FLINK-2124: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/848#discussion_r32709026 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java --- @@ -17,37 +17,71 @@ package org.apache.flink.streaming.api.functions.source; -import java.util.Arrays; -import java.util.Collection; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.InputViewDataInputStreamWrapper; +import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper; + +import java.io.*; import java.util.Iterator; public class FromElementsFunction implements SourceFunction { private static final long serialVersionUID = 1L; - private Iterable iterable; + private final TypeSerializer serializer; + private final byte[] elements; private volatile boolean isRunning = true; - public FromElementsFunction(T... elements) { - this.iterable = Arrays.asList(elements); - } + public FromElementsFunction(TypeSerializer serializer, final T... elements) { + this(serializer, new Iterable() { + @Override + public Iterator iterator() { + return new Iterator() { + int index = 0; + + @Override + public boolean hasNext() { + return index < elements.length; + } - public FromElementsFunction(Collection elements) { - this.iterable = elements; + @Override + public T next() { + return elements[index++]; + } + }; + } + }); } - public FromElementsFunction(Iterable elements) { - this.iterable = elements; + public FromElementsFunction(TypeSerializer serializer, Iterable elements) { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + OutputViewDataOutputStreamWrapper wrapper = new OutputViewDataOutputStreamWrapper(new DataOutputStream(baos)); + + try { + for (T element : elements) + serializer.serialize(element, wrapper); --- End diff -- Implicit coding conventions are to put curly brackets also around one line `for` statements. > FromElementsFunction is not really Serializable > --- > > Key: FLINK-2124 > URL: https://issues.apache.org/jira/browse/FLINK-2124 > Project: Flink > Issue Type: Bug > Components: Streaming >Reporter: Aljoscha Krettek > > The function stores an Iterable of T. T is not necessarily Serializable and > and Iterable is also not necessarily Serializable. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2124] [streaming] Fix behavior of FromE...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/848#discussion_r32709026 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java --- @@ -17,37 +17,71 @@ package org.apache.flink.streaming.api.functions.source; -import java.util.Arrays; -import java.util.Collection; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.InputViewDataInputStreamWrapper; +import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper; + +import java.io.*; import java.util.Iterator; public class FromElementsFunction implements SourceFunction { private static final long serialVersionUID = 1L; - private Iterable iterable; + private final TypeSerializer serializer; + private final byte[] elements; private volatile boolean isRunning = true; - public FromElementsFunction(T... elements) { - this.iterable = Arrays.asList(elements); - } + public FromElementsFunction(TypeSerializer serializer, final T... elements) { + this(serializer, new Iterable() { + @Override + public Iterator iterator() { + return new Iterator() { + int index = 0; + + @Override + public boolean hasNext() { + return index < elements.length; + } - public FromElementsFunction(Collection elements) { - this.iterable = elements; + @Override + public T next() { + return elements[index++]; + } + }; + } + }); } - public FromElementsFunction(Iterable elements) { - this.iterable = elements; + public FromElementsFunction(TypeSerializer serializer, Iterable elements) { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + OutputViewDataOutputStreamWrapper wrapper = new OutputViewDataOutputStreamWrapper(new DataOutputStream(baos)); + + try { + for (T element : elements) + serializer.serialize(element, wrapper); --- End diff -- Implicit coding conventions are to put curly brackets also around one line `for` statements. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2124] [streaming] Fix behavior of FromE...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/848#discussion_r32708806 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java --- @@ -17,37 +17,71 @@ package org.apache.flink.streaming.api.functions.source; -import java.util.Arrays; -import java.util.Collection; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.InputViewDataInputStreamWrapper; +import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper; + +import java.io.*; import java.util.Iterator; public class FromElementsFunction implements SourceFunction { private static final long serialVersionUID = 1L; - private Iterable iterable; + private final TypeSerializer serializer; + private final byte[] elements; private volatile boolean isRunning = true; - public FromElementsFunction(T... elements) { - this.iterable = Arrays.asList(elements); - } + public FromElementsFunction(TypeSerializer serializer, final T... elements) { + this(serializer, new Iterable() { + @Override + public Iterator iterator() { + return new Iterator() { + int index = 0; + + @Override + public boolean hasNext() { + return index < elements.length; + } - public FromElementsFunction(Collection elements) { - this.iterable = elements; + @Override + public T next() { + return elements[index++]; + } + }; + } + }); } - public FromElementsFunction(Iterable elements) { - this.iterable = elements; + public FromElementsFunction(TypeSerializer serializer, Iterable elements) { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + OutputViewDataOutputStreamWrapper wrapper = new OutputViewDataOutputStreamWrapper(new DataOutputStream(baos)); + + try { + for (T element : elements) + serializer.serialize(element, wrapper); + } catch (IOException e) { + // ByteArrayOutputStream doesn't throw IOExceptions when written to + } + // closing the DataOutputStream would just flush the ByteArrayOutputStream, which in turn doesn't do anything. + + this.serializer = serializer; + this.elements = baos.toByteArray(); } @Override public void run(SourceContext ctx) throws Exception { - Iterator it = iterable.iterator(); + T value = serializer.createInstance(); + ByteArrayInputStream bais = new ByteArrayInputStream(elements); + DataInputView input = new InputViewDataInputStreamWrapper(new DataInputStream(bais)); - while (isRunning && it.hasNext()) { - ctx.collect(it.next()); + while (isRunning && bais.available() > 0) { + value = serializer.deserialize(value, input); + ctx.collect(value); } + // closing the DataOutputStream would just close the ByteArrayInputStream, which doesn't do anything --- End diff -- I guess you meant the `DataInputStream`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2124) FromElementsFunction is not really Serializable
[ https://issues.apache.org/jira/browse/FLINK-2124?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14591480#comment-14591480 ] ASF GitHub Bot commented on FLINK-2124: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/848#discussion_r32708806 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java --- @@ -17,37 +17,71 @@ package org.apache.flink.streaming.api.functions.source; -import java.util.Arrays; -import java.util.Collection; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.InputViewDataInputStreamWrapper; +import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper; + +import java.io.*; import java.util.Iterator; public class FromElementsFunction implements SourceFunction { private static final long serialVersionUID = 1L; - private Iterable iterable; + private final TypeSerializer serializer; + private final byte[] elements; private volatile boolean isRunning = true; - public FromElementsFunction(T... elements) { - this.iterable = Arrays.asList(elements); - } + public FromElementsFunction(TypeSerializer serializer, final T... elements) { + this(serializer, new Iterable() { + @Override + public Iterator iterator() { + return new Iterator() { + int index = 0; + + @Override + public boolean hasNext() { + return index < elements.length; + } - public FromElementsFunction(Collection elements) { - this.iterable = elements; + @Override + public T next() { + return elements[index++]; + } + }; + } + }); } - public FromElementsFunction(Iterable elements) { - this.iterable = elements; + public FromElementsFunction(TypeSerializer serializer, Iterable elements) { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + OutputViewDataOutputStreamWrapper wrapper = new OutputViewDataOutputStreamWrapper(new DataOutputStream(baos)); + + try { + for (T element : elements) + serializer.serialize(element, wrapper); + } catch (IOException e) { + // ByteArrayOutputStream doesn't throw IOExceptions when written to + } + // closing the DataOutputStream would just flush the ByteArrayOutputStream, which in turn doesn't do anything. + + this.serializer = serializer; + this.elements = baos.toByteArray(); } @Override public void run(SourceContext ctx) throws Exception { - Iterator it = iterable.iterator(); + T value = serializer.createInstance(); + ByteArrayInputStream bais = new ByteArrayInputStream(elements); + DataInputView input = new InputViewDataInputStreamWrapper(new DataInputStream(bais)); - while (isRunning && it.hasNext()) { - ctx.collect(it.next()); + while (isRunning && bais.available() > 0) { + value = serializer.deserialize(value, input); + ctx.collect(value); } + // closing the DataOutputStream would just close the ByteArrayInputStream, which doesn't do anything --- End diff -- I guess you meant the `DataInputStream`. > FromElementsFunction is not really Serializable > --- > > Key: FLINK-2124 > URL: https://issues.apache.org/jira/browse/FLINK-2124 > Project: Flink > Issue Type: Bug > Components: Streaming >Reporter: Aljoscha Krettek > > The function stores an Iterable of T. T is not necessarily Serializable and > and Iterable is also not necessarily Serializable. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2173) Python uses different tmp file than Flink
[ https://issues.apache.org/jira/browse/FLINK-2173?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14591369#comment-14591369 ] Georg Hildebrand commented on FLINK-2173: - Hey, i am data scientists and tried out flinks python api. Directly after calling the env i got this error. IOError: [Errno 2] No such file or directory: '/tmp/flink_data/output' > Python uses different tmp file than Flink > - > > Key: FLINK-2173 > URL: https://issues.apache.org/jira/browse/FLINK-2173 > Project: Flink > Issue Type: Bug > Components: Python API > Environment: Debian Linux >Reporter: Matthias J. Sax >Priority: Critical > > Flink gets the temp file path using System.getProperty("java.io.tmpdir") > while Python uses the "tempfile.gettempdir()" method. However, both can be > semantically different. > On my system Flink uses "/tmp" while Pyhton used "/tmp/users/1000" ("1000" is > my Linux user-id) > This issues leads (at least) to failing tests. -- This message was sent by Atlassian JIRA (v6.3.4#6332)