[jira] [Created] (FLINK-1828) Impossible to output data to an HBase table
Flavio Pompermaier created FLINK-1828: - Summary: Impossible to output data to an HBase table Key: FLINK-1828 URL: https://issues.apache.org/jira/browse/FLINK-1828 Project: Flink Issue Type: Bug Components: Hadoop Compatibility Affects Versions: 0.9 Reporter: Flavio Pompermaier Fix For: 0.9 Right now it is not possible to use HBase TableOutputFormat as output format because Configurable.setConf is not called in the configure() method of the HadoopOutputFormatBase -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: Reworking of Iteration Synchronization, Accumu...
Github user markus-h commented on the pull request: https://github.com/apache/flink/pull/36#issuecomment-89529411 This change is continued in https://github.com/apache/flink/pull/570 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-951) Reworking of Iteration Synchronization, Accumulators and Aggregators
[ https://issues.apache.org/jira/browse/FLINK-951?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14395628#comment-14395628 ] ASF GitHub Bot commented on FLINK-951: -- GitHub user markus-h opened a pull request: https://github.com/apache/flink/pull/570 [FLINK-951] Reworking of Iteration Synchronization, Accumulators and Aggregators Iteration synchronization through JobManager Unification of Accumulators and Aggregators (removal of former Aggregators) Adjusted testcases accordingly I redid the work of my very old pull request https://github.com/apache/flink/pull/36 A more detailed description can be found in jira https://issues.apache.org/jira/browse/FLINK-951 I came across some unexpected behaviour with akka that made a small hack neccessary. Perhaps somebody with more experience in akka can find a better solution. See IterationHeadPactTask line 392. You can merge this pull request into a Git repository by running: $ git pull https://github.com/markus-h/incubator-flink iterationsAndAccumulatorsRework2 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/570.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #570 commit 5492487892ff99f10fccdb075404dedaa3371ff7 Author: Markus Holzemer markus.holze...@gmx.de Date: 2015-04-02T15:56:19Z Iteration synchronization through JobManager Unification of Accumulators and Aggregators (removal of former Aggregators) Adjusted testcases accordingly Reworking of Iteration Synchronization, Accumulators and Aggregators Key: FLINK-951 URL: https://issues.apache.org/jira/browse/FLINK-951 Project: Flink Issue Type: Improvement Components: Iterations, Optimizer Affects Versions: 0.6-incubating Reporter: Markus Holzemer Assignee: Markus Holzemer Labels: refactoring Original Estimate: 168h Remaining Estimate: 168h I just realized that there is no real Jira issue for the task I am currently working on. I am currently reworking a few things regarding Iteration Synchronization, Accumulators and Aggregators. Currently the synchronization at the end of one superstep is done through channel events. That makes it hard to track the current status of iterations. That is why I am changing this synchronization to use RPC calls with the JobManager, so that the JobManager manages the current status of all iterations. Currently we use Accumulators outside of iterations and Aggregators inside of iterations. Both have a similiar function, but a bit different interfaces and handling. I want to unify these two concepts. I propose that we stick in the future to Accumulators only. Aggregators therefore are removed and Accumulators are extended to cover the usecases Aggregators were used fore before. The switch to RPC for iterations makes it possible to also send the current Accumulator values at the end of each superstep, so that the JobManager (and thereby the webinterface) will be able to print intermediate accumulation results. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: Reworking of Iteration Synchronization, Accumu...
Github user markus-h closed the pull request at: https://github.com/apache/flink/pull/36 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-951] Reworking of Iteration Synchroniza...
GitHub user markus-h opened a pull request: https://github.com/apache/flink/pull/570 [FLINK-951] Reworking of Iteration Synchronization, Accumulators and Aggregators Iteration synchronization through JobManager Unification of Accumulators and Aggregators (removal of former Aggregators) Adjusted testcases accordingly I redid the work of my very old pull request https://github.com/apache/flink/pull/36 A more detailed description can be found in jira https://issues.apache.org/jira/browse/FLINK-951 I came across some unexpected behaviour with akka that made a small hack neccessary. Perhaps somebody with more experience in akka can find a better solution. See IterationHeadPactTask line 392. You can merge this pull request into a Git repository by running: $ git pull https://github.com/markus-h/incubator-flink iterationsAndAccumulatorsRework2 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/570.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #570 commit 5492487892ff99f10fccdb075404dedaa3371ff7 Author: Markus Holzemer markus.holze...@gmx.de Date: 2015-04-02T15:56:19Z Iteration synchronization through JobManager Unification of Accumulators and Aggregators (removal of former Aggregators) Adjusted testcases accordingly --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1828) Impossible to output data to an HBase table
[ https://issues.apache.org/jira/browse/FLINK-1828?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14395694#comment-14395694 ] ASF GitHub Bot commented on FLINK-1828: --- GitHub user fpompermaier opened a pull request: https://github.com/apache/flink/pull/571 Fixed Configurable HadoopOutputFormat (FLINK-1828) See https://issues.apache.org/jira/browse/FLINK-1828 You can merge this pull request into a Git repository by running: $ git pull https://github.com/fpompermaier/flink master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/571.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #571 commit 83655bf2773a871c0fb88481be51c6d61ee98881 Author: fpompermaier f.pomperma...@gmail.com Date: 2015-04-04T10:57:36Z Fixed Configurable Hadoop output format initialization, added a simple HBase sink test and upgraded HBase dependencies (from 0.98.6 to 0.98.11) commit 85dbacf46c6f97f6033a4247cdd60ded87b93641 Author: fpompermaier f.pomperma...@gmail.com Date: 2015-04-04T10:57:36Z Fixed Configurable Hadoop output format initialization, added a simple HBase sink test and upgraded HBase dependencies (from 0.98.6 to 0.98.11) commit da39bd2da2ab6ae03ff90b4434e167b8278d2df2 Author: fpompermaier f.pomperma...@gmail.com Date: 2015-04-04T11:11:55Z Merge branch 'master' of https://github.com/fpompermaier/flink.git Conflicts: flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatBase.java Impossible to output data to an HBase table --- Key: FLINK-1828 URL: https://issues.apache.org/jira/browse/FLINK-1828 Project: Flink Issue Type: Bug Components: Hadoop Compatibility Affects Versions: 0.9 Reporter: Flavio Pompermaier Labels: hadoop, hbase Fix For: 0.9 Right now it is not possible to use HBase TableOutputFormat as output format because Configurable.setConf is not called in the configure() method of the HadoopOutputFormatBase -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: Fixed Configurable HadoopOutputFormat (FLINK-1...
GitHub user fpompermaier opened a pull request: https://github.com/apache/flink/pull/571 Fixed Configurable HadoopOutputFormat (FLINK-1828) See https://issues.apache.org/jira/browse/FLINK-1828 You can merge this pull request into a Git repository by running: $ git pull https://github.com/fpompermaier/flink master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/571.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #571 commit 83655bf2773a871c0fb88481be51c6d61ee98881 Author: fpompermaier f.pomperma...@gmail.com Date: 2015-04-04T10:57:36Z Fixed Configurable Hadoop output format initialization, added a simple HBase sink test and upgraded HBase dependencies (from 0.98.6 to 0.98.11) commit 85dbacf46c6f97f6033a4247cdd60ded87b93641 Author: fpompermaier f.pomperma...@gmail.com Date: 2015-04-04T10:57:36Z Fixed Configurable Hadoop output format initialization, added a simple HBase sink test and upgraded HBase dependencies (from 0.98.6 to 0.98.11) commit da39bd2da2ab6ae03ff90b4434e167b8278d2df2 Author: fpompermaier f.pomperma...@gmail.com Date: 2015-04-04T11:11:55Z Merge branch 'master' of https://github.com/fpompermaier/flink.git Conflicts: flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatBase.java --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Fix issue where Windows paths were not recogni...
Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/491#issuecomment-89548376 I checked what are [valid absolute paths on windows](https://msdn.microsoft.com/en-us/library/windows/desktop/aa365247%28v=vs.85%29.aspx#paths). Summary: A path is relative if it does not begin with any of the following: - A disk designator **with a backslash**, for example `C:\` or `d:\`. - A single backslash, for example, `\directory` or `\file.txt`. This is also referred to as an absolute path. - A UNC name of any format, which always start with two backslash characters (`\\`). **Note:** A path with disk designator but without backslash (such as `c:test.tmp`, or `D:tmp/test.tmp`) is relative. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-951) Reworking of Iteration Synchronization, Accumulators and Aggregators
[ https://issues.apache.org/jira/browse/FLINK-951?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Markus Holzemer updated FLINK-951: -- Affects Version/s: (was: 0.6-incubating) 0.9 Reworking of Iteration Synchronization, Accumulators and Aggregators Key: FLINK-951 URL: https://issues.apache.org/jira/browse/FLINK-951 Project: Flink Issue Type: Improvement Components: Iterations, Optimizer Affects Versions: 0.9 Reporter: Markus Holzemer Assignee: Markus Holzemer Labels: refactoring Original Estimate: 168h Remaining Estimate: 168h I just realized that there is no real Jira issue for the task I am currently working on. I am currently reworking a few things regarding Iteration Synchronization, Accumulators and Aggregators. Currently the synchronization at the end of one superstep is done through channel events. That makes it hard to track the current status of iterations. That is why I am changing this synchronization to use RPC calls with the JobManager, so that the JobManager manages the current status of all iterations. Currently we use Accumulators outside of iterations and Aggregators inside of iterations. Both have a similiar function, but a bit different interfaces and handling. I want to unify these two concepts. I propose that we stick in the future to Accumulators only. Aggregators therefore are removed and Accumulators are extended to cover the usecases Aggregators were used fore before. The switch to RPC for iterations makes it possible to also send the current Accumulator values at the end of each superstep, so that the JobManager (and thereby the webinterface) will be able to print intermediate accumulation results. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-1827) Move test classes in test folders and fix scope of test dependencies
Flavio Pompermaier created FLINK-1827: - Summary: Move test classes in test folders and fix scope of test dependencies Key: FLINK-1827 URL: https://issues.apache.org/jira/browse/FLINK-1827 Project: Flink Issue Type: Improvement Components: Build System Affects Versions: 0.9 Reporter: Flavio Pompermaier Priority: Minor Right now it is not possible to avoid compilation of test classes (-Dmaven.test.skip=true) because some project (e.g. flink-test-utils) requires test classes in non-test sources (e.g. scalatest_${scala.binary.version}) Test classes should be moved to src/main/test (if Java) and src/test/scala (if scala) and use scope=test for test dependencies -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-1829) Conflicting Jackson version in the Flink POMs
Alexander Alexandrov created FLINK-1829: --- Summary: Conflicting Jackson version in the Flink POMs Key: FLINK-1829 URL: https://issues.apache.org/jira/browse/FLINK-1829 Project: Flink Issue Type: Bug Components: Build System Affects Versions: 0.9 Reporter: Alexander Alexandrov Fix For: 0.9 The current POM setup transitively includes multiple conflicting versions of the Jackson library over * {{com.amazonaws:aws-java-sdk}} (v. 2.1.1) * {{org.apache.avro:avro}} (v. 1.9.13) * {{org.apache.hbase:hbase-client}} (v. 1.8.8) When running jobs against a Flink local runtime embedded with: {code:xml} dependency groupIdorg.apache.flink/groupId artifactIdflink-scala/artifactId version${flink.version}/version scopeprovided/scope /dependency dependency groupIdorg.apache.flink/groupId artifactIdflink-java/artifactId version${flink.version}/version scopeprovided/scope /dependency dependency groupIdorg.apache.flink/groupId artifactIdflink-clients/artifactId version${flink.version}/version scopeprovided/scope /dependency {code} I get the following error: {noformat} 15-04-04 15:52:04 ERROR exception during creation akka.actor.ActorInitializationException: exception during creation at akka.actor.ActorInitializationException$.apply(Actor.scala:164) at akka.actor.ActorCell.create(ActorCell.scala:596) at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:456) at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478) at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:279) at akka.dispatch.Mailbox.run(Mailbox.scala:220) at akka.dispatch.Mailbox.exec(Mailbox.scala:231) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.lang.reflect.InvocationTargetException at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:526) at akka.util.Reflect$.instantiate(Reflect.scala:66) at akka.actor.ArgsReflectConstructor.produce(Props.scala:352) at akka.actor.Props.newActor(Props.scala:252) at akka.actor.ActorCell.newActor(ActorCell.scala:552) at akka.actor.ActorCell.create(ActorCell.scala:578) ... 9 more Caused by: java.lang.NoSuchMethodError: com.fasterxml.jackson.core.JsonFactory.requiresPropertyOrdering()Z at com.fasterxml.jackson.databind.ObjectMapper.init(ObjectMapper.java:445) at com.fasterxml.jackson.databind.ObjectMapper.init(ObjectMapper.java:366) at org.apache.flink.runtime.taskmanager.TaskManager.init(TaskManager.scala:134) ... 18 more {noformat} Fixing the Jackson version on the client side, e.g, with the following snippet {code:xml} dependency groupIdcom.fasterxml.jackson.core/groupId artifactIdjackson-core/artifactId version2.2.1/version scopeprovided/scope /dependency dependency groupIdcom.fasterxml.jackson.core/groupId artifactIdjackson-databind/artifactId version2.2.1/version scopeprovided/scope /dependency dependency groupIdcom.fasterxml.jackson.core/groupId artifactIdjackson-annotations/artifactId version2.2.1/version scopeprovided/scope /dependency {code} solves the problem, but I guess it will be better if we can stick with one version in the build artifacts. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1828) Impossible to output data to an HBase table
[ https://issues.apache.org/jira/browse/FLINK-1828?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14395709#comment-14395709 ] ASF GitHub Bot commented on FLINK-1828: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/571#discussion_r27768581 --- Diff: flink-staging/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteExample.java --- @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.addons.hbase.example; + +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.util.Collector; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.mapreduce.TableOutputFormat; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; + +@SuppressWarnings(serial) +public class HBaseWriteExample { + + // * + // PROGRAM + // * + + public static void main(String[] args) throws Exception { + + if(!parseParameters(args)) { + return; + } + + // set up the execution environment + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + // get input data + DataSetString text = getTextDataSet(env); + + DataSetTuple2String, Integer counts = + // split up the lines in pairs (2-tuples) containing: (word,1) + text.flatMap(new Tokenizer()) + // group by the tuple field 0 and sum up tuple field 1 + .groupBy(0) + .sum(1); + + // emit result +// if(fileOutput) { --- End diff -- the `if` statement should be completely removed. Impossible to output data to an HBase table --- Key: FLINK-1828 URL: https://issues.apache.org/jira/browse/FLINK-1828 Project: Flink Issue Type: Bug Components: Hadoop Compatibility Affects Versions: 0.9 Reporter: Flavio Pompermaier Labels: hadoop, hbase Fix For: 0.9 Right now it is not possible to use HBase TableOutputFormat as output format because Configurable.setConf is not called in the configure() method of the HadoopOutputFormatBase -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: Fixed Configurable HadoopOutputFormat (FLINK-1...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/571#discussion_r27768581 --- Diff: flink-staging/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteExample.java --- @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.addons.hbase.example; + +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.util.Collector; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.mapreduce.TableOutputFormat; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; + +@SuppressWarnings(serial) +public class HBaseWriteExample { + + // * + // PROGRAM + // * + + public static void main(String[] args) throws Exception { + + if(!parseParameters(args)) { + return; + } + + // set up the execution environment + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + // get input data + DataSetString text = getTextDataSet(env); + + DataSetTuple2String, Integer counts = + // split up the lines in pairs (2-tuples) containing: (word,1) + text.flatMap(new Tokenizer()) + // group by the tuple field 0 and sum up tuple field 1 + .groupBy(0) + .sum(1); + + // emit result +// if(fileOutput) { --- End diff -- the `if` statement should be completely removed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Fixed Configurable HadoopOutputFormat (FLINK-1...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/571#discussion_r27768585 --- Diff: flink-staging/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteExample.java --- @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.addons.hbase.example; + +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.util.Collector; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.mapreduce.TableOutputFormat; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; + +@SuppressWarnings(serial) +public class HBaseWriteExample { + + // * + // PROGRAM + // * + + public static void main(String[] args) throws Exception { + + if(!parseParameters(args)) { + return; + } + + // set up the execution environment + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + // get input data + DataSetString text = getTextDataSet(env); + + DataSetTuple2String, Integer counts = + // split up the lines in pairs (2-tuples) containing: (word,1) + text.flatMap(new Tokenizer()) + // group by the tuple field 0 and sum up tuple field 1 + .groupBy(0) + .sum(1); + + // emit result +// if(fileOutput) { + Job job = Job.getInstance(); + job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, outputTableName); + // TODO is mapred.output.dir really useful? + job.getConfiguration().set(mapred.output.dir,/tmp/test); + counts.map(new RichMapFunction Tuple2String,Integer, Tuple2Text,Mutation() { + private final byte[] CF_SOME = Bytes.toBytes(test-column); + private final byte[] Q_SOME = Bytes.toBytes(value); + private transient Tuple2Text, Mutation reuse; + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + reuse = new Tuple2Text, Mutation(); + } + + @Override + public Tuple2Text, Mutation map(Tuple2String, Integer t) throws Exception { + reuse.f0 = new Text(t.f0); + Put put = new Put(t.f0.getBytes()); + put.add(CF_SOME, Q_SOME, Bytes.toBytes(t.f1)); + reuse.f1 = put; + return reuse; + } + }).output(new HadoopOutputFormatText, Mutation(new TableOutputFormatText(), job)); +// } else { --- End diff -- `else` branch not necessary --- If your project is set up for it, you can reply to this email and have your reply appear on
[jira] [Commented] (FLINK-1828) Impossible to output data to an HBase table
[ https://issues.apache.org/jira/browse/FLINK-1828?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14395710#comment-14395710 ] ASF GitHub Bot commented on FLINK-1828: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/571#discussion_r27768585 --- Diff: flink-staging/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteExample.java --- @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.addons.hbase.example; + +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.util.Collector; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.mapreduce.TableOutputFormat; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; + +@SuppressWarnings(serial) +public class HBaseWriteExample { + + // * + // PROGRAM + // * + + public static void main(String[] args) throws Exception { + + if(!parseParameters(args)) { + return; + } + + // set up the execution environment + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + // get input data + DataSetString text = getTextDataSet(env); + + DataSetTuple2String, Integer counts = + // split up the lines in pairs (2-tuples) containing: (word,1) + text.flatMap(new Tokenizer()) + // group by the tuple field 0 and sum up tuple field 1 + .groupBy(0) + .sum(1); + + // emit result +// if(fileOutput) { + Job job = Job.getInstance(); + job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, outputTableName); + // TODO is mapred.output.dir really useful? + job.getConfiguration().set(mapred.output.dir,/tmp/test); + counts.map(new RichMapFunction Tuple2String,Integer, Tuple2Text,Mutation() { + private final byte[] CF_SOME = Bytes.toBytes(test-column); + private final byte[] Q_SOME = Bytes.toBytes(value); + private transient Tuple2Text, Mutation reuse; + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + reuse = new Tuple2Text, Mutation(); + } + + @Override + public Tuple2Text, Mutation map(Tuple2String, Integer t) throws Exception { + reuse.f0 = new Text(t.f0); + Put put = new Put(t.f0.getBytes()); + put.add(CF_SOME, Q_SOME, Bytes.toBytes(t.f1)); + reuse.f1 = put; + return reuse; + } +
[GitHub] flink pull request: Fixed Configurable HadoopOutputFormat (FLINK-1...
Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/571#issuecomment-89559472 Looks good to me, except for some comments that should be removed. Regarding the `mapred.output.dir` parameter I am not sure whether this is generally expected for all Hadoop OutputFormats or only required for file-based OutputFormats. I would keep it for now and open a JIRA to investigate the issue and fix it if necessary. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Fixed Configurable HadoopOutputFormat (FLINK-1...
Github user fpompermaier commented on the pull request: https://github.com/apache/flink/pull/571#issuecomment-89570344 Removed comments and commented code as suggested by Fabian. Do I have also to create a JIRA ticket about mapred.output.dir parameter? I think that it can be defaulted to the Flink temp directory or flinkTempDir/hadoop/job-id --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1828) Impossible to output data to an HBase table
[ https://issues.apache.org/jira/browse/FLINK-1828?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14395714#comment-14395714 ] ASF GitHub Bot commented on FLINK-1828: --- Github user fpompermaier commented on the pull request: https://github.com/apache/flink/pull/571#issuecomment-89570344 Removed comments and commented code as suggested by Fabian. Do I have also to create a JIRA ticket about mapred.output.dir parameter? I think that it can be defaulted to the Flink temp directory or flinkTempDir/hadoop/job-id Impossible to output data to an HBase table --- Key: FLINK-1828 URL: https://issues.apache.org/jira/browse/FLINK-1828 Project: Flink Issue Type: Bug Components: Hadoop Compatibility Affects Versions: 0.9 Reporter: Flavio Pompermaier Labels: hadoop, hbase Fix For: 0.9 Right now it is not possible to use HBase TableOutputFormat as output format because Configurable.setConf is not called in the configure() method of the HadoopOutputFormatBase -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1828) Impossible to output data to an HBase table
[ https://issues.apache.org/jira/browse/FLINK-1828?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14395707#comment-14395707 ] ASF GitHub Bot commented on FLINK-1828: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/571#discussion_r27768571 --- Diff: flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatBase.java --- @@ -66,6 +67,8 @@ public HadoopOutputFormatBase(org.apache.hadoop.mapreduce.OutputFormatK, V map @Override public void configure(Configuration parameters) { // nothing to do --- End diff -- The comment should be removed. Impossible to output data to an HBase table --- Key: FLINK-1828 URL: https://issues.apache.org/jira/browse/FLINK-1828 Project: Flink Issue Type: Bug Components: Hadoop Compatibility Affects Versions: 0.9 Reporter: Flavio Pompermaier Labels: hadoop, hbase Fix For: 0.9 Right now it is not possible to use HBase TableOutputFormat as output format because Configurable.setConf is not called in the configure() method of the HadoopOutputFormatBase -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-1670) Collect method for streaming
[ https://issues.apache.org/jira/browse/FLINK-1670?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gabor Gevay reassigned FLINK-1670: -- Assignee: Gabor Gevay Collect method for streaming Key: FLINK-1670 URL: https://issues.apache.org/jira/browse/FLINK-1670 Project: Flink Issue Type: New Feature Components: Streaming Affects Versions: 0.9 Reporter: Márton Balassi Assignee: Gabor Gevay Priority: Minor A convenience method for streaming back the results of a job to the client. As the client itself is a bottleneck anyway an easy solution would be to provide a socket sink with degree of parallelism 1, from which a client utility can read. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1826) Remove the code never be executed in function getNumPages
[ https://issues.apache.org/jira/browse/FLINK-1826?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14396033#comment-14396033 ] ASF GitHub Bot commented on FLINK-1826: --- Github user matadorhong commented on the pull request: https://github.com/apache/flink/pull/569#issuecomment-89706960 OK. I accept it. I will close the PR. Thanks for your answer. Remove the code never be executed in function getNumPages - Key: FLINK-1826 URL: https://issues.apache.org/jira/browse/FLINK-1826 Project: Flink Issue Type: Improvement Components: Local Runtime Affects Versions: master Reporter: Sibao Hong Assignee: Sibao Hong Priority: Minor Fix For: master Remove the code never be executed in function getNumPages, because the input of numBytes has be validataion by its caller of DefaultMemoryManager(long memorySize, int numberOfSlots, int pageSize). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-1826]Remove the redundant codes never b...
Github user matadorhong commented on the pull request: https://github.com/apache/flink/pull/569#issuecomment-89706960 OK. I accept it. I will close the PR. Thanks for your answer. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1826) Remove the code never be executed in function getNumPages
[ https://issues.apache.org/jira/browse/FLINK-1826?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14396034#comment-14396034 ] ASF GitHub Bot commented on FLINK-1826: --- Github user matadorhong closed the pull request at: https://github.com/apache/flink/pull/569 Remove the code never be executed in function getNumPages - Key: FLINK-1826 URL: https://issues.apache.org/jira/browse/FLINK-1826 Project: Flink Issue Type: Improvement Components: Local Runtime Affects Versions: master Reporter: Sibao Hong Assignee: Sibao Hong Priority: Minor Fix For: master Remove the code never be executed in function getNumPages, because the input of numBytes has be validataion by its caller of DefaultMemoryManager(long memorySize, int numberOfSlots, int pageSize). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-1826]Remove the redundant codes never b...
Github user matadorhong closed the pull request at: https://github.com/apache/flink/pull/569 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1825]Fix the constructor comments with ...
Github user mbalassi commented on the pull request: https://github.com/apache/flink/pull/568#issuecomment-89618942 LGTM, merging. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1826]Remove the redundant codes never b...
Github user mbalassi commented on the pull request: https://github.com/apache/flink/pull/569#issuecomment-89617988 Hey, I personally like that the check is performed there even though it is currently redundant. It is true that the `DefaultMemoryManager` is for internal usage, but I think it is generally better be safe than sorry with these checks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1824] [streaming] Support added for mis...
Github user mbalassi commented on the pull request: https://github.com/apache/flink/pull/567#issuecomment-89619837 LGTM, merging. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1824) Allow missing types for DataStreams
[ https://issues.apache.org/jira/browse/FLINK-1824?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14395828#comment-14395828 ] ASF GitHub Bot commented on FLINK-1824: --- Github user mbalassi commented on the pull request: https://github.com/apache/flink/pull/567#issuecomment-89619837 LGTM, merging. Allow missing types for DataStreams --- Key: FLINK-1824 URL: https://issues.apache.org/jira/browse/FLINK-1824 Project: Flink Issue Type: Improvement Components: Streaming Reporter: Gyula Fora Assignee: Gyula Fora The streaming api currently only uses the methods of the type extractor which don't allow missing types. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1808) Omit sending checkpoint barriers when the execution graph is not running
[ https://issues.apache.org/jira/browse/FLINK-1808?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14395830#comment-14395830 ] ASF GitHub Bot commented on FLINK-1808: --- Github user mbalassi commented on the pull request: https://github.com/apache/flink/pull/551#issuecomment-89620475 Thanks for the fix, merging. Omit sending checkpoint barriers when the execution graph is not running Key: FLINK-1808 URL: https://issues.apache.org/jira/browse/FLINK-1808 Project: Flink Issue Type: Improvement Components: Streaming Reporter: Paris Carbone Assignee: Paris Carbone Currently the StreamCheckpointCoordinator sends barrier requests even when the executionGraph is in FAILING or RESTARTING status which results in unneeded potential communication and space overhead until the job restarts again. It should therefore simply omit sending barriers requests when the execution graph is not in a RUNNING state. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-1808] Send barrier requests only when t...
Github user mbalassi commented on the pull request: https://github.com/apache/flink/pull/551#issuecomment-89620475 Thanks for the fix, merging. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1800) Add a Beta badge in the documentation to components in flink-staging
[ https://issues.apache.org/jira/browse/FLINK-1800?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14395829#comment-14395829 ] ASF GitHub Bot commented on FLINK-1800: --- Github user mbalassi commented on the pull request: https://github.com/apache/flink/pull/555#issuecomment-89620103 +1 for Ufuk's suggestion. Add a Beta badge in the documentation to components in flink-staging -- Key: FLINK-1800 URL: https://issues.apache.org/jira/browse/FLINK-1800 Project: Flink Issue Type: Task Components: Documentation Reporter: Robert Metzger Assignee: Robert Metzger Priority: Minor As per mailing list discussion: http://apache-flink-incubator-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Add-a-quot-Beta-quot-badge-in-the-documentation-to-components-in-flink-staging-td4801.html -- This message was sent by Atlassian JIRA (v6.3.4#6332)