[GitHub] flink pull request: [FLINK-2622][streaming]add WriteMode for write...

2015-12-21 Thread fhueske
Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1098#issuecomment-166338012
  
Thanks for working on this issue!
I'll close this PR due to inactivity and PR #1473.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2622][streaming]add WriteMode for write...

2015-12-21 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/1098


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2622][streaming]add WriteMode for write...

2015-11-24 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1098#discussion_r45749750
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
 ---
@@ -717,23 +717,188 @@ class DataStream[T](javaStream: JavaStream[T]) {
* every element of the DataStream the result of .toString
* is written.
*
+   * @param path
+   * the path pointing to the location the text file is written to
+   *
+   * @param millis
+   * the file update frequency
+   *
+   * @return the closed DataStream
*/
   def writeAsText(path: String, millis: Long = 0): DataStreamSink[T] =
 javaStream.writeAsText(path, millis)
 
   /**
+   * Writes a DataStream to the file specified by path in text format. For
+   * every element of the DataStream the result of .toString
+   * is written.
+   *
+   * @param path
+   * the path pointing to the location the text file is written to
+   *
+   * @param writeMode
+   * Controls the behavior for existing files. Options are
+   * NO_OVERWRITE and OVERWRITE.
+   *
+   * @return the closed DataStream
+   *
+   */
+  def writeAsText(
+   path: String,
+   writeMode: FileSystem.WriteMode): DataStreamSink[T] = {
+if (writeMode != null) {
+  javaStream.writeAsText(path, writeMode)
+} else {
+  javaStream.writeAsText(path)
+}
+  }
+
+  /**
* Writes a DataStream to the file specified by path in text format. The
* writing is performed periodically, in every millis milliseconds. For
* every element of the DataStream the result of .toString
* is written.
*
+   * @param path
+   * the path pointing to the location the text file is written to
+   *
+   * @param writeMode
+   * Controls the behavior for existing files. Options are
+   * NO_OVERWRITE and OVERWRITE.
+   *
+   * @param millis
+   * the file update frequency
+   *
+   * @return the closed DataStream
+   *
+   */
+  def writeAsText(
+   path: String,
+   writeMode: FileSystem.WriteMode,
+   millis: Long): DataStreamSink[T] = {
+if (writeMode != null) {
+  javaStream.writeAsText(path, writeMode, millis)
+} else {
+  javaStream.writeAsText(path, millis)
+}
+  }
+
+  /**
+   * Writes a DataStream to the file specified by path in csv format. The
+   * writing is performed periodically, in every millis milliseconds. For
+   * every element of the DataStream the result of .toString
+   * is written.
+   *
+   * @param path
+   * the path pointing to the location the text file is written to
+   *
+   * @param millis
+   * the file update frequency
+   *
+   * @return the closed DataStream
+   */
+  def writeAsCsv(
+  path: String,
+  millis: Long = 0): DataStreamSink[T] = {
+require(javaStream.getType.isTupleType, "CSV output can only be used 
with Tuple DataSets.")
+val of = new ScalaCsvOutputFormat[Product](
+  new Path(path),
+  ScalaCsvOutputFormat.DEFAULT_LINE_DELIMITER,
+  ScalaCsvOutputFormat.DEFAULT_FIELD_DELIMITER)
+javaStream.write(of.asInstanceOf[OutputFormat[T]], millis)
--- End diff --

Could you call `writeAsCsv(String, WriteMode, Long, String, String)` here 
with the respective parameter values instead? That way we have only one place 
where we have to apply changes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2622][streaming]add WriteMode for write...

2015-11-24 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1098#discussion_r45750373
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/TextOutputFormatITCase.java
 ---
@@ -17,27 +17,86 @@
 
 package org.apache.flink.streaming.scala.api;
 
+import junit.framework.Assert;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.streaming.api.scala.OutputFormatTestPrograms;
-import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
 import org.apache.flink.test.testdata.WordCountData;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
 
-public class TextOutputFormatITCase extends StreamingProgramTestBase {
+import java.io.File;
+
+import static org.junit.Assert.assertTrue;
+
+public class TextOutputFormatITCase extends 
StreamingMultipleProgramsTestBase {
 
protected String resultPath;
 
-   @Override
-   protected void preSubmit() throws Exception {
-   resultPath = getTempDirPath("result");
+   public AbstractTestBase fileInfo = new AbstractTestBase(new 
Configuration()) {
+   @Override
+   public void startCluster() throws Exception {
+   super.startCluster();
+   }
+   };
+
+   @Before
+   public void createFile() throws Exception {
+   File f = fileInfo.createAndRegisterTempFile("result");
+   resultPath = f.toURI().toString();
+   }
+
+   @Test
+   public void testPath() throws Exception {
+   OutputFormatTestPrograms.wordCountToText(WordCountData.TEXT, 
resultPath);
+   }
+
+   @Test
+   public void testPathMillis() throws Exception {
+   OutputFormatTestPrograms.wordCountToText(WordCountData.TEXT, 
resultPath, 1);
+   }
+
+   @Test
+   public void testPathWriteMode() throws Exception {
+   OutputFormatTestPrograms.wordCountToText(WordCountData.TEXT, 
resultPath, FileSystem.WriteMode.NO_OVERWRITE);
+   }
+
+   @Test
+   public void testPathWriteModeMillis() throws Exception {
+   OutputFormatTestPrograms.wordCountToText(WordCountData.TEXT, 
resultPath, FileSystem.WriteMode.NO_OVERWRITE, 1);
+   }
+
+   @Test
+   public void failtestPathWriteMode() throws Exception {
+   OutputFormatTestPrograms.wordCountToText(WordCountData.TEXT, 
resultPath);
+   try {
+   
OutputFormatTestPrograms.wordCountToText(WordCountData.TEXT, resultPath, 
FileSystem.WriteMode.NO_OVERWRITE);
+   } catch (Exception e) {
+   assertTrue(e.getCause().getMessage().indexOf("File 
already exists") != -1);
+   return;
+   }
+   Assert.fail();
}
 
-   @Override
-   protected void testProgram() throws Exception {
+   @Test
+   public void failtestPathWriteModeMillis() throws Exception {
--- End diff --

camel case


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2622][streaming]add WriteMode for write...

2015-11-24 Thread tillrohrmann
Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/1098#issuecomment-159308288
  
Hi @HuangWHWHW, sorry for the late review. Things piled up in the last 
weeks. Do you wan to finish this PR?

I had some comments. I think that the PR is in a good shape right now. 
After addressing my comments it should be good to be merged.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2622][streaming]add WriteMode for write...

2015-11-24 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1098#discussion_r45751049
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/CsvOutputFormatITCase.java
 ---
@@ -17,32 +17,107 @@
 
 package org.apache.flink.streaming.scala.api;
 
+import junit.framework.Assert;
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.streaming.api.scala.OutputFormatTestPrograms;
-import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
 import org.apache.flink.test.testdata.WordCountData;
+import org.apache.flink.test.util.AbstractTestBase;
 import org.apache.flink.util.Collector;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
 
-public class CsvOutputFormatITCase extends StreamingProgramTestBase {
+import java.io.File;
+import static org.junit.Assert.assertTrue;
+
+
+public class CsvOutputFormatITCase extends 
StreamingMultipleProgramsTestBase  {
 
protected String resultPath;
 
-   @Override
-   protected void preSubmit() throws Exception {
-   resultPath = getTempDirPath("result");
+   public AbstractTestBase fileInfo = new AbstractTestBase(new 
Configuration()) {
+   @Override
+   public void startCluster() throws Exception {
+   super.startCluster();
+   }
+   };
+
+   @Before
+   public void createFile() throws Exception {
+   File f = fileInfo.createAndRegisterTempFile("result");
+   resultPath = f.toURI().toString();
}
 
-   @Override
-   protected void testProgram() throws Exception {
+   @Test
+   public void testPath() throws Exception {
OutputFormatTestPrograms.wordCountToCsv(WordCountData.TEXT, 
resultPath);
}
 
-   @Override
-   protected void postSubmit() throws Exception {
-   //Strip the parentheses from the expected text like output
+   @Test
+   public void testPathMillis() throws Exception {
+   OutputFormatTestPrograms.wordCountToCsv(WordCountData.TEXT, 
resultPath, 1);
+   }
+
+   @Test
+   public void testPathWriteMode() throws Exception {
+   OutputFormatTestPrograms.wordCountToCsv(WordCountData.TEXT, 
resultPath, FileSystem.WriteMode.NO_OVERWRITE);
+   }
+
+   @Test
+   public void testPathWriteModeMillis() throws Exception {
+   OutputFormatTestPrograms.wordCountToCsv(WordCountData.TEXT, 
resultPath, FileSystem.WriteMode.NO_OVERWRITE, 1);
+   }
+
+   @Test
+   public void testPathWriteModeMillisDelimiter() throws Exception {
+   OutputFormatTestPrograms.wordCountToCsv(WordCountData.TEXT, 
resultPath, FileSystem.WriteMode.NO_OVERWRITE, 1, "\n", ",");
+   }
+
+   @Test
+   public void failtestPathWriteMode() throws Exception {
+   OutputFormatTestPrograms.wordCountToCsv(WordCountData.TEXT, 
resultPath);
+   try {
+   
OutputFormatTestPrograms.wordCountToCsv(WordCountData.TEXT, resultPath, 
FileSystem.WriteMode.NO_OVERWRITE);
+   } catch (Exception e) {
+   assertTrue(e.getCause().getMessage().indexOf("File 
already exists") != -1);
+   return;
+   }
+   Assert.fail();
+   }
+
+   @Test
+   public void failtestPathWriteModeMillis() throws Exception {
+   OutputFormatTestPrograms.wordCountToCsv(WordCountData.TEXT, 
resultPath);
+   try {
+   
OutputFormatTestPrograms.wordCountToCsv(WordCountData.TEXT, resultPath, 
FileSystem.WriteMode.NO_OVERWRITE, 1);
+   } catch (Exception e) {
+   assertTrue(e.getCause().getMessage().indexOf("File 
already exists") != -1);
+   return;
+   }
+   Assert.fail();
+   }
+
+   @Test
+   public void failtestPathWriteModeMillisDelimiter() throws Exception {
+   OutputFormatTestPrograms.wordCountToCsv(WordCountData.TEXT, 
resultPath);
+   try {
+   
OutputFormatTestPrograms.wordCountToCsv(WordCountData.TEXT, resultPath, 
FileSystem.WriteMode.NO_OVERWRITE, 1, "\n", ",");
--- End diff --

Would be good to test the method with some other values than the default 
parameter values. That way you see that these values are correctly forwarded.


---
If your project is set up for it, you can reply to this email and have your

[GitHub] flink pull request: [FLINK-2622][streaming]add WriteMode for write...

2015-11-24 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1098#discussion_r45751119
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/CsvOutputFormatITCase.java
 ---
@@ -17,32 +17,107 @@
 
 package org.apache.flink.streaming.scala.api;
 
+import junit.framework.Assert;
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.streaming.api.scala.OutputFormatTestPrograms;
-import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
 import org.apache.flink.test.testdata.WordCountData;
+import org.apache.flink.test.util.AbstractTestBase;
 import org.apache.flink.util.Collector;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
 
-public class CsvOutputFormatITCase extends StreamingProgramTestBase {
+import java.io.File;
+import static org.junit.Assert.assertTrue;
+
+
+public class CsvOutputFormatITCase extends 
StreamingMultipleProgramsTestBase  {
 
protected String resultPath;
 
-   @Override
-   protected void preSubmit() throws Exception {
-   resultPath = getTempDirPath("result");
+   public AbstractTestBase fileInfo = new AbstractTestBase(new 
Configuration()) {
+   @Override
+   public void startCluster() throws Exception {
+   super.startCluster();
+   }
+   };
+
+   @Before
+   public void createFile() throws Exception {
+   File f = fileInfo.createAndRegisterTempFile("result");
+   resultPath = f.toURI().toString();
}
 
-   @Override
-   protected void testProgram() throws Exception {
+   @Test
+   public void testPath() throws Exception {
OutputFormatTestPrograms.wordCountToCsv(WordCountData.TEXT, 
resultPath);
}
 
-   @Override
-   protected void postSubmit() throws Exception {
-   //Strip the parentheses from the expected text like output
+   @Test
+   public void testPathMillis() throws Exception {
+   OutputFormatTestPrograms.wordCountToCsv(WordCountData.TEXT, 
resultPath, 1);
+   }
+
+   @Test
+   public void testPathWriteMode() throws Exception {
+   OutputFormatTestPrograms.wordCountToCsv(WordCountData.TEXT, 
resultPath, FileSystem.WriteMode.NO_OVERWRITE);
+   }
+
+   @Test
+   public void testPathWriteModeMillis() throws Exception {
+   OutputFormatTestPrograms.wordCountToCsv(WordCountData.TEXT, 
resultPath, FileSystem.WriteMode.NO_OVERWRITE, 1);
+   }
+
+   @Test
+   public void testPathWriteModeMillisDelimiter() throws Exception {
+   OutputFormatTestPrograms.wordCountToCsv(WordCountData.TEXT, 
resultPath, FileSystem.WriteMode.NO_OVERWRITE, 1, "\n", ",");
+   }
+
+   @Test
+   public void failtestPathWriteMode() throws Exception {
+   OutputFormatTestPrograms.wordCountToCsv(WordCountData.TEXT, 
resultPath);
+   try {
+   
OutputFormatTestPrograms.wordCountToCsv(WordCountData.TEXT, resultPath, 
FileSystem.WriteMode.NO_OVERWRITE);
+   } catch (Exception e) {
+   assertTrue(e.getCause().getMessage().indexOf("File 
already exists") != -1);
+   return;
+   }
+   Assert.fail();
+   }
+
+   @Test
+   public void failtestPathWriteModeMillis() throws Exception {
+   OutputFormatTestPrograms.wordCountToCsv(WordCountData.TEXT, 
resultPath);
+   try {
+   
OutputFormatTestPrograms.wordCountToCsv(WordCountData.TEXT, resultPath, 
FileSystem.WriteMode.NO_OVERWRITE, 1);
--- End diff --

How do you test that the `millis` parameter is respected?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2622][streaming]add WriteMode for write...

2015-11-24 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1098#discussion_r45749794
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
 ---
@@ -717,23 +717,188 @@ class DataStream[T](javaStream: JavaStream[T]) {
* every element of the DataStream the result of .toString
* is written.
*
+   * @param path
+   * the path pointing to the location the text file is written to
+   *
+   * @param millis
+   * the file update frequency
+   *
+   * @return the closed DataStream
*/
   def writeAsText(path: String, millis: Long = 0): DataStreamSink[T] =
 javaStream.writeAsText(path, millis)
 
   /**
+   * Writes a DataStream to the file specified by path in text format. For
+   * every element of the DataStream the result of .toString
+   * is written.
+   *
+   * @param path
+   * the path pointing to the location the text file is written to
+   *
+   * @param writeMode
+   * Controls the behavior for existing files. Options are
+   * NO_OVERWRITE and OVERWRITE.
+   *
+   * @return the closed DataStream
+   *
+   */
+  def writeAsText(
+   path: String,
+   writeMode: FileSystem.WriteMode): DataStreamSink[T] = {
+if (writeMode != null) {
+  javaStream.writeAsText(path, writeMode)
+} else {
+  javaStream.writeAsText(path)
+}
+  }
+
+  /**
* Writes a DataStream to the file specified by path in text format. The
* writing is performed periodically, in every millis milliseconds. For
* every element of the DataStream the result of .toString
* is written.
*
+   * @param path
+   * the path pointing to the location the text file is written to
+   *
+   * @param writeMode
+   * Controls the behavior for existing files. Options are
+   * NO_OVERWRITE and OVERWRITE.
+   *
+   * @param millis
+   * the file update frequency
+   *
+   * @return the closed DataStream
+   *
+   */
+  def writeAsText(
+   path: String,
+   writeMode: FileSystem.WriteMode,
+   millis: Long): DataStreamSink[T] = {
+if (writeMode != null) {
+  javaStream.writeAsText(path, writeMode, millis)
+} else {
+  javaStream.writeAsText(path, millis)
+}
+  }
+
+  /**
+   * Writes a DataStream to the file specified by path in csv format. The
+   * writing is performed periodically, in every millis milliseconds. For
+   * every element of the DataStream the result of .toString
+   * is written.
+   *
+   * @param path
+   * the path pointing to the location the text file is written to
+   *
+   * @param millis
+   * the file update frequency
+   *
+   * @return the closed DataStream
+   */
+  def writeAsCsv(
+  path: String,
+  millis: Long = 0): DataStreamSink[T] = {
+require(javaStream.getType.isTupleType, "CSV output can only be used 
with Tuple DataSets.")
+val of = new ScalaCsvOutputFormat[Product](
+  new Path(path),
+  ScalaCsvOutputFormat.DEFAULT_LINE_DELIMITER,
+  ScalaCsvOutputFormat.DEFAULT_FIELD_DELIMITER)
+javaStream.write(of.asInstanceOf[OutputFormat[T]], millis)
+  }
+
+  /**
+   * Writes a DataStream to the file specified by path in csv format. For
+   * every element of the DataStream the result of .toString
+   * is written.
+   *
+   * @param path
+   * the path pointing to the location the text file is written to
+   *
+   * @param writeMode
+   * Controls the behavior for existing files. Options are
+   * NO_OVERWRITE and OVERWRITE.
+   *
+   * @return the closed DataStream
+   */
+  def writeAsCsv(
+  path: String,
+  writeMode: FileSystem.WriteMode): DataStreamSink[T] = {
+require(javaStream.getType.isTupleType, "CSV output can only be used 
with Tuple DataSets.")
+val of = new ScalaCsvOutputFormat[Product](
+  new Path(path),
+  ScalaCsvOutputFormat.DEFAULT_LINE_DELIMITER,
+  ScalaCsvOutputFormat.DEFAULT_FIELD_DELIMITER)
+if (writeMode != null) {
+  of.setWriteMode(writeMode)
+}
+javaStream.write(of.asInstanceOf[OutputFormat[T]], 0L)
--- End diff --

Same here: Call the most generic method here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2622][streaming]add WriteMode for write...

2015-09-21 Thread HuangWHWHW
Github user HuangWHWHW commented on the pull request:

https://github.com/apache/flink/pull/1098#issuecomment-141898429
  
@tillrohrmann 
Thanks!
For saving the time, I usually `run mvn` verify in the sub-module where I 
changed.
So, I forgot the DataStream.java in the other module before.

And I can see the CI detail now, and is the CI error following related to 
me?
`Tests in error: 

org.apache.flink.yarn.YARNSessionFIFOITCase.testJavaAPI(org.apache.flink.yarn.YARNSessionFIFOITCase)
  Run 1: YARNSessionFIFOITCase.testJavaAPI:642 » Runtime Unable to get 
Cluster status f...
  Run 2: 
YARNSessionFIFOITCase.checkForProhibitedLogContents:94->YarnTestBase.ensureNoProhibitedStringInLogFiles:294
 Found a file 
/home/travis/build/apache/flink/flink-yarn-tests/target/flink-yarn-tests-fifo/flink-yarn-tests-fifo-logDir-nm-1_0/application_1442651724242_0009/container_1442651724242_0009_01_01/jobmanager-main.log
 with a prohibited string: [Exception, Started 
SelectChannelConnector@0.0.0.0:8081]
`
I'll try to rerun the CI and may report this error if this is not due to my 
PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2622][streaming]add WriteMode for write...

2015-09-21 Thread tillrohrmann
Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/1098#issuecomment-141902080
  
This test shouldn't be related to your changes. Currently, our yarn tests
are a bit instable. But Robert is working on it.

On Mon, Sep 21, 2015 at 9:35 AM, HuangWHWHW 
wrote:

> @tillrohrmann 
> Thanks!
> For saving the time, I usually run mvn verify in the sub-module where I
> changed.
> So, I forgot the DataStream.java in the other module before.
>
> And I can see the CI detail now, and is the CI error following related to
> me?
> Tests in error:
>
> 
org.apache.flink.yarn.YARNSessionFIFOITCase.testJavaAPI(org.apache.flink.yarn.YARNSessionFIFOITCase)
> Run 1: YARNSessionFIFOITCase.testJavaAPI:642 » Runtime Unable to get
> Cluster status f...
> Run 2:
> 
YARNSessionFIFOITCase.checkForProhibitedLogContents:94->YarnTestBase.ensureNoProhibitedStringInLogFiles:294
> Found a file
> 
/home/travis/build/apache/flink/flink-yarn-tests/target/flink-yarn-tests-fifo/flink-yarn-tests-fifo-logDir-nm-1_0/application_1442651724242_0009/container_1442651724242_0009_01_01/jobmanager-main.log
> with a prohibited string: [Exception, Started
> SelectChannelConnector@0.0.0.0:8081]
>
> I'll try to rerun the CI and may report this error if this is not due to
> my PR.
>
> —
> Reply to this email directly or view it on GitHub
> .
>



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2622][streaming]add WriteMode for write...

2015-09-21 Thread tillrohrmann
Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/1098#issuecomment-141897668
  
All checkstyle errors can also be found by simply executiong `mvn verify`
on your local machine. That way you can directly see what's going wrong.

On Sat, Sep 19, 2015 at 9:03 AM, HuangWHWHW 
wrote:

> Many thanks!
> Fixed it.
>
> —
> Reply to this email directly or view it on GitHub
> .
>



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2622][streaming]add WriteMode for write...

2015-09-19 Thread HuangWHWHW
Github user HuangWHWHW commented on the pull request:

https://github.com/apache/flink/pull/1098#issuecomment-141631769
  
Many thanks!
Fixed it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2622][streaming]add WriteMode for write...

2015-09-19 Thread chiwanpark
Github user chiwanpark commented on the pull request:

https://github.com/apache/flink/pull/1098#issuecomment-141629844
  
There is a checkstyle error:

```
[INFO] There is 1 error reported by Checkstyle 6.2 with 
/tools/maven/checkstyle.xml ruleset.
[ERROR] 
src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java[992] 
(regexp) RegexpSinglelineJava: Line has leading space characters; indentation 
should be performed with tabs only.
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2622][streaming]add WriteMode for write...

2015-09-19 Thread HuangWHWHW
Github user HuangWHWHW commented on the pull request:

https://github.com/apache/flink/pull/1098#issuecomment-141629775
  
Hi, could anyone tell me why the last CI failed?
Thanks:)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2622][streaming]add WriteMode for write...

2015-09-18 Thread HuangWHWHW
Github user HuangWHWHW commented on the pull request:

https://github.com/apache/flink/pull/1098#issuecomment-141371306
  
I updated the PR:
1.Update the class and method documentation.
2.Change the test to `extends StreamingMultipleProgramsTestBase` and split 
the tests.
3.Add the fail tests for `NO_OVERWRITE`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2622][streaming]add WriteMode for write...

2015-09-16 Thread fhueske
Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1098#issuecomment-140877442
  
Hi @HuangWHWHW, your PR #1131 has been merged.
Can you update this PR and remove those changes?
Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2622][streaming]add WriteMode for write...

2015-09-16 Thread HuangWHWHW
Github user HuangWHWHW commented on the pull request:

https://github.com/apache/flink/pull/1098#issuecomment-140934136
  
Yes, I will take a fix for the comments:)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2622][streaming]add WriteMode for write...

2015-09-15 Thread HuangWHWHW
Github user HuangWHWHW commented on the pull request:

https://github.com/apache/flink/pull/1098#issuecomment-140396374
  
@fhueske 
@chiwanpark 
Hello!
I opened a new PR:https://github.com/apache/flink/pull/1131 to fix the 
problem of Table API tests.
Is it valid?
I just want to pass the tests while fixing this PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2622][streaming]add WriteMode for write...

2015-09-13 Thread HuangWHWHW
Github user HuangWHWHW commented on the pull request:

https://github.com/apache/flink/pull/1098#issuecomment-139933029
  
`I still cannot understand what is the purpose of adding javaSet to the 
Table API. We can get the DataSet by using toDataSet method.`

@fhueske 
@chiwanpark 
Hi, don't think too complicated.
This is just about the compile error for the Table API tests since I added 
the `def writeAsText( filePath: String, writeMode: FileSystem.WriteMode = 
null): DataSink[T]` method in `DataStream.scala`.
For easy understanding, you can also take a experiment that add the method 
and you will get some errors in these Table API tests.
To add the `javaSet` just a way that I can pass these tests and I'm not 
sure this change is suitable.
If not, how can I do?
Refactor Table API in a new JIRA or here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2622][streaming]add WriteMode for write...

2015-09-11 Thread HuangWHWHW
Github user HuangWHWHW commented on a diff in the pull request:

https://github.com/apache/flink/pull/1098#discussion_r39254440
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/CsvOutputFormatITCase.java
 ---
@@ -36,6 +37,10 @@ protected void preSubmit() throws Exception {
@Override
protected void testProgram() throws Exception {
OutputFormatTestPrograms.wordCountToCsv(WordCountData.TEXT, 
resultPath);
+   OutputFormatTestPrograms.wordCountToCsv(WordCountData.TEXT, 
resultPath, 1);
--- End diff --

If it is expected to fail, could you tell me how to pass the test?
Just I have no idea about this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2622][streaming]add WriteMode for write...

2015-09-11 Thread fhueske
Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1098#issuecomment-139494655
  
The semantics of `NO_OVERWRITE` are correct and pretty clear, IMO. 
If we would change `NO_OVERWRITE` to even fail if there is no file, it's 
semantics would change to `NO_WRITE` and always fail. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2622][streaming]add WriteMode for write...

2015-09-11 Thread fhueske
Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1098#issuecomment-139499573
  
What's the purpose of adding `javaSet` to the Table API? 
This looks like a separate issue to me and should not be part of this PR if 
that is the case.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2622][streaming]add WriteMode for write...

2015-09-11 Thread fhueske
Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1098#issuecomment-139499368
  
There is no append. `OVERWRITE` simply overwrites existing files and 
`NO_OVERWRITE` fails if the output file exists.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2622][streaming]add WriteMode for write...

2015-09-11 Thread HuangWHWHW
Github user HuangWHWHW commented on the pull request:

https://github.com/apache/flink/pull/1098#issuecomment-139500923
  
`What's the purpose of adding javaSet to the Table API? 
This looks like a separate issue to me and should not be part of this PR if 
that is the case.`

It is because of this following:

![image](https://cloud.githubusercontent.com/assets/13193847/9811803/11922a28-58ac-11e5-9f3f-4451f7db3b02.png)



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2622][streaming]add WriteMode for write...

2015-09-11 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1098#discussion_r39253553
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/CsvOutputFormatITCase.java
 ---
@@ -36,6 +37,10 @@ protected void preSubmit() throws Exception {
@Override
protected void testProgram() throws Exception {
OutputFormatTestPrograms.wordCountToCsv(WordCountData.TEXT, 
resultPath);
+   OutputFormatTestPrograms.wordCountToCsv(WordCountData.TEXT, 
resultPath, 1);
--- End diff --

It is expected that the test fails if you set the overwrite mode to 
`NO_OVERWRITE` because you start multiple programs that all write to the same 
location.

I debugged the issue and found that the `ForkableFlinkMiniCluster` sets the 
overwrite mode to OVERWRITE. That explains the behavior. However, this also 
means that you cannot test the correctness of your methods by setting the 
overwrite mode to OVERWRITE because this is the default behavior.

I would change the `CsvOuputFormatITCase` to extend 
`StreamingMultipleProgramsTestBase` and run each program in a dedicated test 
method. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2622][streaming]add WriteMode for write...

2015-09-11 Thread HuangWHWHW
Github user HuangWHWHW commented on the pull request:

https://github.com/apache/flink/pull/1098#issuecomment-139496123
  
Doesn't `NO_OVERWRITE` mean "append"?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2622][streaming]add WriteMode for write...

2015-09-11 Thread chiwanpark
Github user chiwanpark commented on the pull request:

https://github.com/apache/flink/pull/1098#issuecomment-139507621
  
Basically, we should prevent modifying the current Table API tests. This PR 
covers about streaming API only.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2622][streaming]add WriteMode for write...

2015-09-11 Thread chiwanpark
Github user chiwanpark commented on the pull request:

https://github.com/apache/flink/pull/1098#issuecomment-139507227
  
I still cannot understand what is the purpose of adding `javaSet` to the 
Table API. We can get the `DataSet` by using `toDataSet` method.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2622][streaming]add WriteMode for write...

2015-09-11 Thread chiwanpark
Github user chiwanpark commented on the pull request:

https://github.com/apache/flink/pull/1098#issuecomment-139513146
  
As you know, It seems because of ambiguousness between `Table` from 
`DataSet` and that from `DataStream`. We need refactor for Table API.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2622][streaming]add WriteMode for write...

2015-09-10 Thread HuangWHWHW
Github user HuangWHWHW commented on the pull request:

https://github.com/apache/flink/pull/1098#issuecomment-139233867
  
@tillrohrmann 
Hi, I got a problem in this failed CI.
It is just a scala programming issue.
I found both in `DataStream.scala` and `DataSet.scala` had the same method 
now:
`def writeAsText(
  filePath: String,
  writeMode: FileSystem.WriteMode = null): DataSink[T]`
So in some failed tests(.e.g 
`org.apache.flink.api.scala.table.test.AsITCase`), `ds.writeAsText(resultPath, 
WriteMode.OVERWRITE)` cannot find the correct `writeAsText` method.
Could you tell me how to fix it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2622][streaming]add WriteMode for write...

2015-09-10 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1098#discussion_r39175337
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
 ---
@@ -727,13 +727,118 @@ class DataStream[T](javaStream: JavaStream[T]) {
* every element of the DataStream the result of .toString
* is written.
*
+   * @param writeMode
+   * Control the behavior for existing files. Options are
+   * NO_OVERWRITE and OVERWRITE.
+   *
+   */
+  def writeAsText(
--- End diff --

This method does not have a `millis` parameter. Please fix the 
documentation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2622][streaming]add WriteMode for write...

2015-09-10 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1098#discussion_r39174305
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
 ---
@@ -1004,10 +989,38 @@ public ExecutionConfig getExecutionConfig() {
@SuppressWarnings("unchecked")
public  DataStreamSink writeAsCsv(String path, 
WriteMode writeMode,
long millis) {
+   return writeAsCsv(path, writeMode, millis, 
CsvOutputFormat.DEFAULT_LINE_DELIMITER, 
CsvOutputFormat.DEFAULT_FIELD_DELIMITER);
+   }
+
+   /**
+* Writes a DataStream to the file specified by path in csv format. The
+* writing is performed periodically, in every millis milliseconds.
+*
+* 
+* For every field of an element of the DataStream the result of {@link 
Object#toString()}
+* is written. This method can only be used on data streams of tuples.
+*
+* @param path
+*the path pointing to the location the text file is 
written to
+* @param writeMode
+*Controls the behavior for existing files. Options are
+*NO_OVERWRITE and OVERWRITE.
+* @param millis
--- End diff --

Please add both delimiter parameters to the JavaDocs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2622][streaming]add WriteMode for write...

2015-09-10 Thread HuangWHWHW
Github user HuangWHWHW commented on the pull request:

https://github.com/apache/flink/pull/1098#issuecomment-139242079
  
I added the `javaSet` to the end of the table APIs.
I am not sure if it is suitable.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2622][streaming]add WriteMode for write...

2015-09-10 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1098#discussion_r39175801
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
 ---
@@ -727,13 +727,118 @@ class DataStream[T](javaStream: JavaStream[T]) {
* every element of the DataStream the result of .toString
* is written.
*
+   * @param writeMode
--- End diff --

please add the `path` parameter as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2622][streaming]add WriteMode for write...

2015-09-10 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1098#discussion_r39175887
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
 ---
@@ -727,13 +727,118 @@ class DataStream[T](javaStream: JavaStream[T]) {
* every element of the DataStream the result of .toString
* is written.
*
+   * @param writeMode
+   * Control the behavior for existing files. Options are
+   * NO_OVERWRITE and OVERWRITE.
+   *
+   */
+  def writeAsText(
+  path: String,
+  writeMode: FileSystem.WriteMode): DataStreamSink[T] = {
+if (writeMode != null) {
+  javaStream.writeAsText(path, writeMode)
+} else {
+  javaStream.writeAsText(path)
+}
+  }
+
+  /**
+   * Writes a DataStream to the file specified by path in text format. The
+   * writing is performed periodically, in every millis milliseconds. For
+   * every element of the DataStream the result of .toString
+   * is written.
+   *
+   * @param writeMode
+   * Control the behavior for existing files. Options are
+   * NO_OVERWRITE and OVERWRITE.
+   *
+   */
+  def writeAsText(
+  path: String,
+  writeMode: FileSystem.WriteMode,
+  millis: Long): DataStreamSink[T] = {
+if (writeMode != null) {
+  javaStream.writeAsText(path, writeMode, millis)
+} else {
+  javaStream.writeAsText(path, millis)
+}
+  }
+
+  /**
+   * Writes a DataStream to the file specified by path in text format. The
--- End diff --

`writeToCsv` does not write `in text format` and does also not use the 
`.toString` method.
Please fix the documentation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2622][streaming]add WriteMode for write...

2015-09-10 Thread fhueske
Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1098#issuecomment-139299973
  
Please check the documentation of the newly added functions.
- The text corresponds to the function (CSV and text output, not mentioning 
parameters which are not offered by the method, ...)
- All method parameters are described

Can you also double check that the test which overwrites files is working 
and if it is, why the write mode is set to OVERWRITE (should be NO_OVERWRITE)?
Thanks


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2622][streaming]add WriteMode for write...

2015-09-10 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1098#discussion_r39177248
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/CsvOutputFormatITCase.java
 ---
@@ -36,6 +37,10 @@ protected void preSubmit() throws Exception {
@Override
protected void testProgram() throws Exception {
OutputFormatTestPrograms.wordCountToCsv(WordCountData.TEXT, 
resultPath);
+   OutputFormatTestPrograms.wordCountToCsv(WordCountData.TEXT, 
resultPath, 1);
--- End diff --

Is this actually working? 
I thought by default files are not overwritten. So writing to the same file 
again, should not work and the second job should fail.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2622][streaming]add WriteMode for write...

2015-09-10 Thread HuangWHWHW
Github user HuangWHWHW commented on the pull request:

https://github.com/apache/flink/pull/1098#issuecomment-139443340
  
Hi, I think there is a issue that:
* if you want to no overwrite in a exist file, you will get the error "File 
already exists"
* if you want to no overwrite and the file is not exist, the `NO_OVERWRITE` 
is just the same as `OVERWRITE`
So I think currently the `NO_OVERWRITE` is not make sense.
Maybe we need discuss and fix on it through a new JIRA?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2622][streaming]add WriteMode for write...

2015-09-10 Thread HuangWHWHW
Github user HuangWHWHW commented on the pull request:

https://github.com/apache/flink/pull/1098#issuecomment-139436457
  
@fhueske 
Hi, if I change to the NO_OVERWRITE in the `CsvOutputFormatITCase.java`, 
tests will fail with errors following:
`Caused by: java.io.IOException: File already 
exists:file:/C:/Users/H00292~1/AppData/Local/Temp/org.apache.flink.streaming.scala.api.CsvOutputFormatITCase-result/1
at 
org.apache.flink.core.fs.local.LocalFileSystem.create(LocalFileSystem.java:247)
at 
org.apache.flink.core.fs.local.LocalFileSystem.create(LocalFileSystem.java:263)
at 
org.apache.flink.api.common.io.FileOutputFormat.open(FileOutputFormat.java:247)
at 
org.apache.flink.api.scala.operators.ScalaCsvOutputFormat.open(ScalaCsvOutputFormat.java:161)
at 
org.apache.flink.streaming.api.functions.sink.FileSinkFunction.open(FileSinkFunction.java:58)
at 
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:33)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:69)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:232)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:167)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581)
at java.lang.Thread.run(Thread.java:722)`

This is due to the exist file check in `LocalFileSystem.java`.
See the code following:
`if (exists(f) && !overwrite) {
throw new IOException("File already exists:" + f);
}`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2622][streaming]add WriteMode for write...

2015-09-10 Thread HuangWHWHW
Github user HuangWHWHW commented on a diff in the pull request:

https://github.com/apache/flink/pull/1098#discussion_r39237069
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/CsvOutputFormatITCase.java
 ---
@@ -36,6 +37,10 @@ protected void preSubmit() throws Exception {
@Override
protected void testProgram() throws Exception {
OutputFormatTestPrograms.wordCountToCsv(WordCountData.TEXT, 
resultPath);
+   OutputFormatTestPrograms.wordCountToCsv(WordCountData.TEXT, 
resultPath, 1);
--- End diff --

The `writeMode` by setting default is already exist.
It will check whether the file can be overwrite or not in 
`FileOutputFormat.java`:
`private static final void initDefaultsFromConfiguration(Configuration 
configuration) {
final boolean overwrite = 
configuration.getBoolean(ConfigConstants

.FILESYSTEM_DEFAULT_OVERWRITE_KEY,
ConfigConstants.DEFAULT_FILESYSTEM_OVERWRITE);

DEFAULT_WRITE_MODE = overwrite ? WriteMode.OVERWRITE : 
WriteMode.NO_OVERWRITE;

final boolean alwaysCreateDirectory = 
configuration.getBoolean(ConfigConstants

.FILESYSTEM_OUTPUT_ALWAYS_CREATE_DIRECTORY_KEY,

ConfigConstants.DEFAULT_FILESYSTEM_ALWAYS_CREATE_DIRECTORY);

DEFAULT_OUTPUT_DIRECTORY_MODE = alwaysCreateDirectory ? 
OutputDirectoryMode.ALWAYS : OutputDirectoryMode.PARONLY;
}`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2622][streaming]add WriteMode for write...

2015-09-08 Thread HuangWHWHW
Github user HuangWHWHW commented on the pull request:

https://github.com/apache/flink/pull/1098#issuecomment-138449988
  
I take a fix for the consistent to JAVA and SCALA APIs.
As a result, we can call `writeAsText` and `writeAsCsv` method with:
*path
*path, write frequency
*path, writemode
*path. writemode, write frequency

And I add tests for these APIs.
And I noticed that in JAVA API, `writeAsCsv` cannot be configured the 
`Delimiter` for a row or a filed while the SCALA API is able.
I'm not sure neither to remove the feature in Scala API nor add it to the 
Java API.
So I added a new `writeAsCsv` method in Java DataStream for that we can 
configure the `Delimiter`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2622][streaming]add WriteMode for write...

2015-09-07 Thread tillrohrmann
Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/1098#issuecomment-138249409
  
I mean that the APIs are not consistent.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2622][streaming]add WriteMode for write...

2015-09-07 Thread HuangWHWHW
Github user HuangWHWHW commented on the pull request:

https://github.com/apache/flink/pull/1098#issuecomment-138250808
  
Ah, I get you.
Maybe one writeAsText  method is not enough.
Add more writeAsText methods as JAVA DataStream?
And is there the same issue in writeAsCsv()?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2622][streaming]add WriteMode for write...

2015-09-07 Thread tillrohrmann
Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/1098#issuecomment-138243780
  
It's not only about the order but also about the support signatures. Best 
you take a look at the Java `DataStream` implementation to see what's supported.

The error is `error 
file=/home/travis/build/apache/flink/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
 message=File line length exceeds 100 characters line=721`. If you run `mvn 
clean verify` locally, you will spot these issues without Travis.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2622][streaming]add WriteMode for write...

2015-09-07 Thread tillrohrmann
Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/1098#issuecomment-138251605
  
1. Yes
2. You should check


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2622][streaming]add WriteMode for write...

2015-09-07 Thread HuangWHWHW
Github user HuangWHWHW commented on the pull request:

https://github.com/apache/flink/pull/1098#issuecomment-138247906
  
@tillrohrmann 
Hi,
Thank you for the CI info!
As I see, the writeAsText in scala DataStream calls the same method in Java 
DataStream(any misunderstanding?).
So the scala DataStream just pack the Java DataStream api.
Do you mean that there are some issues in other APIs(.e.g writeAsCsv())?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2622][streaming]add WriteMode for write...

2015-09-07 Thread HuangWHWHW
Github user HuangWHWHW commented on the pull request:

https://github.com/apache/flink/pull/1098#issuecomment-138236351
  
@tillrohrmann 
Hi,
I see. 
The issue about order is just in the scala-example 
SocketTextStreamWordCount.scala as following:

{
if (fileOutput) {
  counts.writeAsText(outputPath, 1)
} else {
  counts print
}
}

So I changed the order for compile access.
As for the tests, I`ll try since I`m not good at scala programming.
BTW:Can you tell me why the CI is failed? It is still blocked in China.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2622][streaming]add WriteMode for write...

2015-09-07 Thread tillrohrmann
Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/1098#issuecomment-138249745
  
In the Java DataStream API you can call the `writeAsText` method with:

* path
* path, write frequency
* path, writemode
* path. writemode, write frequency

In Scala this should also be supported.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2622][streaming]add WriteMode for write...

2015-09-07 Thread tillrohrmann
Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/1098#issuecomment-138230399
  
Thanks for your contribution @HuangWHWHW. However, your changes are not 
compatible to the Java DataStream API. Both APIs should support the same 
functionality with the same order of parameters. Furthermore, this PR is 
lacking tests.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2622][streaming]add WriteMode for write...

2015-09-07 Thread HuangWHWHW
Github user HuangWHWHW commented on the pull request:

https://github.com/apache/flink/pull/1098#issuecomment-138267803
  
@tillrohrmann 
Thank you for the comments!
I will take the fix and add tests for this.
BTW:Can you take a look about this 
PR:https://github.com/apache/flink/pull/1030?
I got the CI passed and is there any new comment or could that be merged?
Very sorry for spending your time. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2622][streaming]add WriteMode for write...

2015-09-06 Thread HuangWHWHW
GitHub user HuangWHWHW opened a pull request:

https://github.com/apache/flink/pull/1098

[FLINK-2622][streaming]add WriteMode for writeAsText in DataStream.scala

Add a parameter writeMode in writeAsText with default setting "null".

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/HuangWHWHW/flink FLINK-2622-new

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1098.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1098


commit 9d260190f016d39b2e7e24afb908b9af6fc1fa6f
Author: HuangWHWHW <404823...@qq.com>
Date:   2015-09-07T04:15:18Z

[FLINK-2622][streaming]add WriteMode for writeAsText in DataStream.scala




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---