[GitHub] flink pull request: [FLINK-1615] [java api] SimpleTweetInputFormat

2015-04-29 Thread aljoscha
Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/621#issuecomment-97401561
  
Our travis builds are a bit unstable right now. I'm running some last tests 
and then I'll merge this. Thanks for staying with this and working on my 
requests! :smile: 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1615] [java api] SimpleTweetInputFormat

2015-04-29 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/621


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1615] [java api] SimpleTweetInputFormat

2015-04-29 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/442


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1615] [java api] SimpleTweetInputFormat

2015-04-28 Thread Elbehery
Github user Elbehery commented on the pull request:

https://github.com/apache/flink/pull/621#issuecomment-97068257
  
@aljoscha  GOT IT .. Now the build success locally.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1615] [java api] SimpleTweetInputFormat

2015-04-28 Thread aljoscha
Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/621#issuecomment-96953776
  
Unfortunately, Travis cuts of the log if it is too long, like here: 
https://travis-ci.org/aljoscha/flink/jobs/60177866 (that's from your pull 
request). You have to click on the Download log button, then you can view the 
whole log, there you see the check style errors.

You can also see the errors if you run a mvn clean verify on your local 
machine.

It seems your code still contains tabs. (From the recently failed travis 
builds)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1615] [java api] SimpleTweetInputFormat

2015-04-27 Thread Elbehery
Github user Elbehery commented on the pull request:

https://github.com/apache/flink/pull/621#issuecomment-96809741
  
@aljoscha  I have run code format from Intellij, I think the problem should 
be solved now .. 

I have tried to read the travis build log, but I could not find the cause 
of the problem, would you please tell me how to find it, for saving time in the 
future ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1615] [java api] SimpleTweetInputFormat

2015-04-27 Thread aljoscha
Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/621#issuecomment-96565454
  
The tests are failing because you use spaces in you code for indentation. 
Could you please change all indentation to tabs to satisfy the style checker?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1615] [java api] SimpleTweetInputFormat

2015-04-26 Thread Elbehery
Github user Elbehery commented on the pull request:

https://github.com/apache/flink/pull/621#issuecomment-96418152
  
@aljoscha  I have checked the Travic log. The problem is in Flink 
Streaming, so why my branch failed ?!!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1615] [java api] SimpleTweetInputFormat

2015-04-24 Thread aljoscha
Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/442#issuecomment-95846017
  
The problem is, that I can't see it in the github interface. On what branch 
are your changes? Could you please rebase them on top of the current master?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1615] [java api] SimpleTweetInputFormat

2015-04-24 Thread Elbehery
Github user Elbehery commented on the pull request:

https://github.com/apache/flink/pull/442#issuecomment-95866843
  
@aljoscha  u were right.. I could not find the code on my repo, somehow it 
was lost !!! .. 

I have created a new PR now, #621  .. it has the same commit of #442  .. 

Please let me know if there is any issue.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1615] [java api] SimpleTweetInputFormat

2015-04-24 Thread Elbehery
GitHub user Elbehery opened a pull request:

https://github.com/apache/flink/pull/621

[FLINK-1615] [java api] SimpleTweetInputFormat

A replacement for PR #442 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/Elbehery/flink tweet_into_pojo

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/621.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #621


commit 42a8a3898f3014685ed71d86c71ea6bea874a302
Author: elbehery eng.m.beh...@live.com
Date:   2015-04-24T09:12:37Z

[FLINK-1615] [java api] SimpleTweetInputFormat




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1615] [java api] SimpleTweetInputFormat

2015-04-24 Thread aljoscha
Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/621#issuecomment-95926348
  
The build still fails because of missing license headers in the model 
package.

By the way, did you write the files in the model package yourself or were 
they generated?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1615] [java api] SimpleTweetInputFormat

2015-04-24 Thread Elbehery
Github user Elbehery commented on the pull request:

https://github.com/apache/flink/pull/621#issuecomment-96041144
  
@aljoscha  Yes I wrote these Pojos myself, and I have tested the handler to 
retrieve distinct data, coz the tweets has redundant fields. 

I have added the License


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1615] [java api] SimpleTweetInputFormat

2015-04-23 Thread Elbehery
Github user Elbehery commented on the pull request:

https://github.com/apache/flink/pull/442#issuecomment-95603365
  
@aljoscha  you should be able to see it from the PR .. anyway this is mine 
https://github.com/Elbehery/flink


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1615] [java api] SimpleTweetInputFormat

2015-04-23 Thread aljoscha
Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/442#issuecomment-95570894
  
Where is your git repository? So that I can checkout your commit and merge 
it?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1615] [java api] SimpleTweetInputFormat

2015-04-23 Thread Elbehery
Github user Elbehery commented on the pull request:

https://github.com/apache/flink/pull/442#issuecomment-95512206
  
I did rebase against the master before creating the PR .. but this was long 
time ago, could this be the problems for the conflicts ?!!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1615] [java api] SimpleTweetInputFormat

2015-04-23 Thread aljoscha
Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/442#issuecomment-95511047
  
This looks good to merge. Any objections?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1615] [java api] SimpleTweetInputFormat

2015-04-12 Thread Elbehery
Github user Elbehery commented on the pull request:

https://github.com/apache/flink/pull/442#issuecomment-92100736
  
@rmetzger  Still not merged, any updates ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1615] [java api] SimpleTweetInputFormat

2015-03-24 Thread rmetzger
Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/442#issuecomment-85686759
  
The failure is not your fault. It failed because your code has been rebased 
to a master version with failing tests.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1615] [java api] SimpleTweetInputFormat

2015-03-23 Thread Elbehery
Github user Elbehery commented on the pull request:

https://github.com/apache/flink/pull/442#issuecomment-84968643
  
@rmetzger  I think it failed again, but I cant see the reason 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1615] [java api] SimpleTweetInputFormat

2015-03-23 Thread rmetzger
Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/442#issuecomment-84912485
  
I triggered another travis build: 
https://travis-ci.org/rmetzger/flink/builds/55459787


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1615] [java api] SimpleTweetInputFormat

2015-03-19 Thread Elbehery
Github user Elbehery commented on the pull request:

https://github.com/apache/flink/pull/442#issuecomment-83499608
  
@StephanEwen  I have checked and I found that the RAT plugin is included in 
parent pom.xml file. Shall I exclude the resources folder in flink-contribute, 
or add the plugin to flink-contribute and exclude the file from there ?!!!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1615] [java api] SimpleTweetInputFormat

2015-03-19 Thread Elbehery
Github user Elbehery commented on the pull request:

https://github.com/apache/flink/pull/442#issuecomment-83519001
  
@StephanEwen, I have removed the license from the resource file, excluded 
it from RAT, and added the license file. Also I did rebase against 
upstream/master, before committing. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1615] [java api] SimpleTweetInputFormat

2015-03-17 Thread Elbehery
Github user Elbehery commented on the pull request:

https://github.com/apache/flink/pull/442#issuecomment-82254957
  
I have revised the commit .. All the files has the license header, except a 
resource file which contains 4 tweets for testing purpose .. Could this be the 
problem ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1615] [java api] SimpleTweetInputFormat

2015-03-17 Thread Elbehery
Github user Elbehery commented on the pull request:

https://github.com/apache/flink/pull/442#issuecomment-82261434
  
DONE


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1615] [java api] SimpleTweetInputFormat

2015-03-17 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/442#issuecomment-82260288
  
Yes, this can be the problem. Can you add a licence header (with comments) 
to this file?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1615] [java api] SimpleTweetInputFormat

2015-03-16 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/442#issuecomment-81837584
  
Have a look at the build server logs. They still complain about unapproved 
license headers.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1615] [java api] SimpleTweetInputFormat

2015-03-12 Thread Elbehery
Github user Elbehery commented on a diff in the pull request:

https://github.com/apache/flink/pull/442#discussion_r26284910
  
--- Diff: 
flink-contrib/src/main/java/org/apache/flink/contrib/tweetinputformat/io/SimpleTweetInputFormat.java
 ---
@@ -0,0 +1,67 @@
+package org.apache.flink.contrib.tweetinputformat.io;
+
+import org.apache.flink.api.common.io.DelimitedInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.contrib.tweetinputformat.model.tweet.Tweet;
+import org.codehaus.jackson.JsonParseException;
+import org.json.simple.parser.JSONParser;
+import org.json.simple.parser.ParseException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+
+
+public class SimpleTweetInputFormat extends DelimitedInputFormatTweet 
implements ResultTypeQueryableTweet {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(SimpleTweetInputFormat.class);
+
+@Override
+public Tweet nextRecord(Tweet record) throws IOException {
+Boolean result = false;
+
+do {
+try {
+record.reset(0);
+record = super.nextRecord(record);
+result = true;
+
+} catch (JsonParseException e) {
+result = false;
+
+}
+} while (!result);
+
+return record;
+}
+
+@Override
+public Tweet readRecord(Tweet reuse, byte[] bytes, int offset, int 
numBytes) throws IOException {
+
+
+InputStreamReader jsonReader = new InputStreamReader(new 
ByteArrayInputStream(bytes));
+jsonReader.skip(offset);
+
+JSONParser parser = new JSONParser();
--- End diff --

I know, I have tried in the beginning to create them as instance fields in 
the class, however I received an error because the fields are not serializable. 
I tried to declare them as transient, but Flink threw Can not submit Job 
exception, If you have suggestions please let me know


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1615] [java api] SimpleTweetInputFormat

2015-03-12 Thread Elbehery
Github user Elbehery commented on a diff in the pull request:

https://github.com/apache/flink/pull/442#discussion_r26288077
  
--- Diff: 
flink-contrib/src/test/java/org/apache/flink/contrib/SimpleTweetInputFormatTest.java
 ---
@@ -0,0 +1,62 @@
+package org.apache.flink.contrib;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.contrib.tweetinputformat.io.SimpleTweetInputFormat;
+import org.apache.flink.contrib.tweetinputformat.model.tweet.Tweet;
+import 
org.apache.flink.contrib.tweetinputformat.model.tweet.entities.HashTags;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+
+public class SimpleTweetInputFormatTest {
+
+private Tweet tweet;
+
+private SimpleTweetInputFormat simpleTweetInputFormat;
+
+private FileInputSplit fileInputSplit;
+
+protected Configuration config;
+
+protected File tempFile;
+
+
+@Before
+public void testSetUp() {
+
+
+simpleTweetInputFormat = new SimpleTweetInputFormat();
+
+File jsonFile = new  
File(../flink-contrib/src/main/resources/HashTagTweetSample.json);
+
+fileInputSplit = new FileInputSplit(0, new 
Path(jsonFile.getPath()), 0, jsonFile.length(), new String[] {localhost});
+}
+
+@Test
+public void testTweetInput() throws Exception {
+
+
+simpleTweetInputFormat.open(fileInputSplit);
+while (!simpleTweetInputFormat.reachedEnd()) {
--- End diff --

Changed, I have tried to use Switch instead of multiple if-else because 
it more efficient, but the compiler of flink uses Java 1.6, and it can not 
accept String in switches.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1615] [java api] SimpleTweetInputFormat

2015-03-12 Thread Elbehery
Github user Elbehery commented on a diff in the pull request:

https://github.com/apache/flink/pull/442#discussion_r26285363
  
--- Diff: 
flink-contrib/src/main/java/org/apache/flink/contrib/tweetinputformat/io/SimpleTweetInputFormat.java
 ---
@@ -0,0 +1,67 @@
+package org.apache.flink.contrib.tweetinputformat.io;
+
+import org.apache.flink.api.common.io.DelimitedInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.contrib.tweetinputformat.model.tweet.Tweet;
+import org.codehaus.jackson.JsonParseException;
+import org.json.simple.parser.JSONParser;
+import org.json.simple.parser.ParseException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+
+
+public class SimpleTweetInputFormat extends DelimitedInputFormatTweet 
implements ResultTypeQueryableTweet {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(SimpleTweetInputFormat.class);
+
+@Override
+public Tweet nextRecord(Tweet record) throws IOException {
+Boolean result = false;
+
+do {
+try {
+record.reset(0);
+record = super.nextRecord(record);
+result = true;
+
+} catch (JsonParseException e) {
+result = false;
+
+}
+} while (!result);
+
+return record;
+}
+
+@Override
+public Tweet readRecord(Tweet reuse, byte[] bytes, int offset, int 
numBytes) throws IOException {
+
+
+InputStreamReader jsonReader = new InputStreamReader(new 
ByteArrayInputStream(bytes));
+jsonReader.skip(offset);
+
+JSONParser parser = new JSONParser();
+TweetHandler handler = new TweetHandler();
+
+try {
+
+handler.reuse = reuse;
+parser.parse(jsonReader, handler, false);
+} catch (ParseException e) {
+
+LOG.debug(Class +SimpleTweetInputFormat.class+ 
+e.getMessage() );
--- End diff --

Edited


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1615] [java api] SimpleTweetInputFormat

2015-03-12 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/442#discussion_r26285316
  
--- Diff: 
flink-contrib/src/main/java/org/apache/flink/contrib/tweetinputformat/io/SimpleTweetInputFormat.java
 ---
@@ -0,0 +1,67 @@
+package org.apache.flink.contrib.tweetinputformat.io;
+
+import org.apache.flink.api.common.io.DelimitedInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.contrib.tweetinputformat.model.tweet.Tweet;
+import org.codehaus.jackson.JsonParseException;
+import org.json.simple.parser.JSONParser;
+import org.json.simple.parser.ParseException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+
+
+public class SimpleTweetInputFormat extends DelimitedInputFormatTweet 
implements ResultTypeQueryableTweet {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(SimpleTweetInputFormat.class);
+
+@Override
+public Tweet nextRecord(Tweet record) throws IOException {
+Boolean result = false;
+
+do {
+try {
+record.reset(0);
+record = super.nextRecord(record);
+result = true;
+
+} catch (JsonParseException e) {
+result = false;
+
+}
+} while (!result);
+
+return record;
+}
+
+@Override
+public Tweet readRecord(Tweet reuse, byte[] bytes, int offset, int 
numBytes) throws IOException {
+
+
+InputStreamReader jsonReader = new InputStreamReader(new 
ByteArrayInputStream(bytes));
+jsonReader.skip(offset);
+
+JSONParser parser = new JSONParser();
--- End diff --

Putting the parser into a `transient` field and initalizing it in the 
`open()` method is the way to go.

Can you give me the full stacktrace for the Can not submit Job exception?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1615] [java api] SimpleTweetInputFormat

2015-03-12 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/442#discussion_r26289003
  
--- Diff: 
flink-contrib/src/test/java/org/apache/flink/contrib/SimpleTweetInputFormatTest.java
 ---
@@ -0,0 +1,76 @@
+package org.apache.flink.contrib;
--- End diff --

Missing license header.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1615] [java api] SimpleTweetInputFormat

2015-03-12 Thread Elbehery
Github user Elbehery commented on a diff in the pull request:

https://github.com/apache/flink/pull/442#discussion_r26300742
  
--- Diff: 
flink-contrib/src/test/java/org/apache/flink/contrib/SimpleTweetInputFormatTest.java
 ---
@@ -0,0 +1,76 @@
+package org.apache.flink.contrib;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.contrib.tweetinputformat.io.SimpleTweetInputFormat;
+import org.apache.flink.contrib.tweetinputformat.model.tweet.Tweet;
+import 
org.apache.flink.contrib.tweetinputformat.model.tweet.entities.HashTags;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+
+public class SimpleTweetInputFormatTest {
+
+private Tweet tweet;
+
+private SimpleTweetInputFormat simpleTweetInputFormat;
+
+private FileInputSplit fileInputSplit;
+
+protected Configuration config;
+
+protected File tempFile;
+
+
+@Before
+public void testSetUp() {
+
+
+simpleTweetInputFormat = new SimpleTweetInputFormat();
+
+File jsonFile = new  
File(../flink-contrib/src/main/resources/HashTagTweetSample.json);
+
+fileInputSplit = new FileInputSplit(0, new 
Path(jsonFile.getPath()), 0, jsonFile.length(), new String[] {localhost});
+}
+
+@Test
+public void testTweetInput() throws Exception {
+
+
+simpleTweetInputFormat.open(fileInputSplit);
+ListString result;
+
+while (!simpleTweetInputFormat.reachedEnd()) {
+tweet = new Tweet();
+tweet = simpleTweetInputFormat.nextRecord(tweet);
+
+if(tweet != null){
--- End diff --

DONE


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1615] [java api] SimpleTweetInputFormat

2015-03-12 Thread Elbehery
Github user Elbehery commented on a diff in the pull request:

https://github.com/apache/flink/pull/442#discussion_r26299783
  
--- Diff: 
flink-contrib/src/test/java/org/apache/flink/contrib/SimpleTweetInputFormatTest.java
 ---
@@ -0,0 +1,76 @@
+package org.apache.flink.contrib;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.contrib.tweetinputformat.io.SimpleTweetInputFormat;
+import org.apache.flink.contrib.tweetinputformat.model.tweet.Tweet;
+import 
org.apache.flink.contrib.tweetinputformat.model.tweet.entities.HashTags;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+
+public class SimpleTweetInputFormatTest {
+
+private Tweet tweet;
+
+private SimpleTweetInputFormat simpleTweetInputFormat;
+
+private FileInputSplit fileInputSplit;
+
+protected Configuration config;
+
+protected File tempFile;
+
+
+@Before
+public void testSetUp() {
+
+
+simpleTweetInputFormat = new SimpleTweetInputFormat();
+
+File jsonFile = new  
File(../flink-contrib/src/main/resources/HashTagTweetSample.json);
+
+fileInputSplit = new FileInputSplit(0, new 
Path(jsonFile.getPath()), 0, jsonFile.length(), new String[] {localhost});
+}
+
+@Test
+public void testTweetInput() throws Exception {
+
+
+simpleTweetInputFormat.open(fileInputSplit);
+ListString result;
+
+while (!simpleTweetInputFormat.reachedEnd()) {
+tweet = new Tweet();
+tweet = simpleTweetInputFormat.nextRecord(tweet);
+
+if(tweet != null){
--- End diff --

I think this behavior was intended, by checking the DelimitedInputFormat 
UnitTest I found this, I will edit my test case, to fix the problem.

![selection_019](https://cloud.githubusercontent.com/assets/2375289/6618391/d3366786-c8c2-11e4-8854-2a38a8da4da3.png)
 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1615] [java api] SimpleTweetInputFormat

2015-03-12 Thread Elbehery
Github user Elbehery commented on the pull request:

https://github.com/apache/flink/pull/442#issuecomment-78480294
  
@rmetzger  This is the standard Tweet format as per Twitter. Here You can 
find [Twitter Official 
Documentation](https://dev.twitter.com/overview/api/tweets). My parser is 
retrieving all the tweet except Bounding Box object, and retweeted object. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1615] [java api] SimpleTweetInputFormat

2015-03-12 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/442#discussion_r26300021
  
--- Diff: 
flink-contrib/src/test/java/org/apache/flink/contrib/SimpleTweetInputFormatTest.java
 ---
@@ -0,0 +1,76 @@
+package org.apache.flink.contrib;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.contrib.tweetinputformat.io.SimpleTweetInputFormat;
+import org.apache.flink.contrib.tweetinputformat.model.tweet.Tweet;
+import 
org.apache.flink.contrib.tweetinputformat.model.tweet.entities.HashTags;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+
+public class SimpleTweetInputFormatTest {
+
+private Tweet tweet;
+
+private SimpleTweetInputFormat simpleTweetInputFormat;
+
+private FileInputSplit fileInputSplit;
+
+protected Configuration config;
+
+protected File tempFile;
+
+
+@Before
+public void testSetUp() {
+
+
+simpleTweetInputFormat = new SimpleTweetInputFormat();
+
+File jsonFile = new  
File(../flink-contrib/src/main/resources/HashTagTweetSample.json);
+
+fileInputSplit = new FileInputSplit(0, new 
Path(jsonFile.getPath()), 0, jsonFile.length(), new String[] {localhost});
+}
+
+@Test
+public void testTweetInput() throws Exception {
+
+
+simpleTweetInputFormat.open(fileInputSplit);
+ListString result;
+
+while (!simpleTweetInputFormat.reachedEnd()) {
+tweet = new Tweet();
+tweet = simpleTweetInputFormat.nextRecord(tweet);
+
+if(tweet != null){
--- End diff --

Cool. 
Then I'd recommend to add a similar test to your test.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1615] [java api] SimpleTweetInputFormat

2015-03-12 Thread Elbehery
Github user Elbehery commented on the pull request:

https://github.com/apache/flink/pull/442#issuecomment-78480464
  
@StephanEwen  I have added the license. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1615] [java api] SimpleTweetInputFormat

2015-03-12 Thread Elbehery
Github user Elbehery commented on a diff in the pull request:

https://github.com/apache/flink/pull/442#discussion_r26296323
  
--- Diff: 
flink-contrib/src/main/java/org/apache/flink/contrib/tweetinputformat/io/SimpleTweetInputFormat.java
 ---
@@ -0,0 +1,67 @@
+package org.apache.flink.contrib.tweetinputformat.io;
+
+import org.apache.flink.api.common.io.DelimitedInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.contrib.tweetinputformat.model.tweet.Tweet;
+import org.codehaus.jackson.JsonParseException;
+import org.json.simple.parser.JSONParser;
+import org.json.simple.parser.ParseException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+
+
+public class SimpleTweetInputFormat extends DelimitedInputFormatTweet 
implements ResultTypeQueryableTweet {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(SimpleTweetInputFormat.class);
+
+@Override
+public Tweet nextRecord(Tweet record) throws IOException {
+Boolean result = false;
+
+do {
+try {
+record.reset(0);
+record = super.nextRecord(record);
+result = true;
+
+} catch (JsonParseException e) {
+result = false;
+
+}
+} while (!result);
+
+return record;
+}
+
+@Override
+public Tweet readRecord(Tweet reuse, byte[] bytes, int offset, int 
numBytes) throws IOException {
+
+
+InputStreamReader jsonReader = new InputStreamReader(new 
ByteArrayInputStream(bytes));
+jsonReader.skip(offset);
+
+JSONParser parser = new JSONParser();
--- End diff --

It worked now, Thanks


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1615] [java api] SimpleTweetInputFormat

2015-03-12 Thread Elbehery
Github user Elbehery commented on a diff in the pull request:

https://github.com/apache/flink/pull/442#discussion_r26296679
  
--- Diff: 
flink-contrib/src/test/java/org/apache/flink/contrib/SimpleTweetInputFormatTest.java
 ---
@@ -0,0 +1,76 @@
+package org.apache.flink.contrib;
--- End diff --

DONE


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1615] [java api] SimpleTweetInputFormat

2015-03-12 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/442#discussion_r26288947
  
--- Diff: 
flink-contrib/src/test/java/org/apache/flink/contrib/SimpleTweetInputFormatTest.java
 ---
@@ -0,0 +1,76 @@
+package org.apache.flink.contrib;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.contrib.tweetinputformat.io.SimpleTweetInputFormat;
+import org.apache.flink.contrib.tweetinputformat.model.tweet.Tweet;
+import 
org.apache.flink.contrib.tweetinputformat.model.tweet.entities.HashTags;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+
+public class SimpleTweetInputFormatTest {
+
+private Tweet tweet;
+
+private SimpleTweetInputFormat simpleTweetInputFormat;
+
+private FileInputSplit fileInputSplit;
+
+protected Configuration config;
+
+protected File tempFile;
+
+
+@Before
+public void testSetUp() {
+
+
+simpleTweetInputFormat = new SimpleTweetInputFormat();
+
+File jsonFile = new  
File(../flink-contrib/src/main/resources/HashTagTweetSample.json);
+
+fileInputSplit = new FileInputSplit(0, new 
Path(jsonFile.getPath()), 0, jsonFile.length(), new String[] {localhost});
+}
+
+@Test
+public void testTweetInput() throws Exception {
+
+
+simpleTweetInputFormat.open(fileInputSplit);
+ListString result;
+
+while (!simpleTweetInputFormat.reachedEnd()) {
+tweet = new Tweet();
+tweet = simpleTweetInputFormat.nextRecord(tweet);
+
+if(tweet != null){
--- End diff --

Can you make the test fail if `tweet` is null?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1615] [java api] SimpleTweetInputFormat

2015-03-01 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/442#discussion_r25566598
  
--- Diff: 
flink-contrib/src/main/java/org/apache/flink/contrib/tweetinputformat/io/SimpleTweetInputFormat.java
 ---
@@ -0,0 +1,67 @@
+package org.apache.flink.contrib.tweetinputformat.io;
+
+import org.apache.flink.api.common.io.DelimitedInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.contrib.tweetinputformat.model.tweet.Tweet;
+import org.codehaus.jackson.JsonParseException;
+import org.json.simple.parser.JSONParser;
+import org.json.simple.parser.ParseException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+
+
+public class SimpleTweetInputFormat extends DelimitedInputFormatTweet 
implements ResultTypeQueryableTweet {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(SimpleTweetInputFormat.class);
+
+@Override
+public Tweet nextRecord(Tweet record) throws IOException {
+Boolean result = false;
+
+do {
+try {
+record.reset(0);
+record = super.nextRecord(record);
+result = true;
+
+} catch (JsonParseException e) {
+result = false;
+
+}
+} while (!result);
+
+return record;
+}
+
+@Override
+public Tweet readRecord(Tweet reuse, byte[] bytes, int offset, int 
numBytes) throws IOException {
+
+
+InputStreamReader jsonReader = new InputStreamReader(new 
ByteArrayInputStream(bytes));
+jsonReader.skip(offset);
+
+JSONParser parser = new JSONParser();
--- End diff --

I don't think you need to create a new parser and handler for each record.
You can probably improve the performance by reusing the parser.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1615] [java api] SimpleTweetInputFormat

2015-02-27 Thread Elbehery
GitHub user Elbehery opened a pull request:

https://github.com/apache/flink/pull/442

[FLINK-1615] [java api] SimpleTweetInputFormat

This is a contribution with a TweetInputFormat, Jira 
[FLINK-1615](https://issues.apache.org/jira/browse/FLINK-1615).

This commit contains, the InputFormat, the Pojos for tweet and nested 
object, the Hanlder, and a UnitTest with a Test File of four tweets.

Before Pushing, the branch was rebased against upstream/master branch.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/Elbehery/flink Tweet-into-Pojo

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/442.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #442


commit f25e2e83d60992ac11e38c5a842e185e2ad1c134
Author: elbehery eng.m.beh...@live.com
Date:   2015-02-27T16:34:27Z

[FLINK-1615] [java api] SimpleTweetInputFormat




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---