Hi Jay,

Creating a batch and a streaming environment in a single Java source file
is fine, they just run separately. (If you run it from an IDE locally they
might conflict as the second one would try to launch a local executor on a
port that is most likely already taken by the first one.) I would suggest
to have these jobs in separate files currently, exactly for the previous
reason.

Looking at your
code 
ExecutionEnvironment.getExecutionEnvironment().registerTypeWithKryoSerializer(com.github.rnowling.bps.datagenerator.datamodels.Product.class,
new FlinkBPSGenerator.ProductSerializer()); does not do much good for you.
You need to register your serializers to the environment to which you are
using. Currently you would need to register it to the streaming env
variable. If you would like to also assemble a batch job you need to add
them there too.

As for the streaming job I assume that you are using Flink version 0.9.1
and checking out the problem shortly.

Best,

Marton

On Sun, Oct 4, 2015 at 3:37 AM, jay vyas <jayunit100.apa...@gmail.com>
wrote:

> Here is a distilled example of the issue, should be easier to debug for
> folks interested... :)
>
> public static void main(String[] args) {
>
>
>     
> ExecutionEnvironment.getExecutionEnvironment().registerTypeWithKryoSerializer(com.github.rnowling.bps.datagenerator.datamodels.Product.class,
>  new FlinkBPSGenerator.ProductSerializer());
>     
> ExecutionEnvironment.getExecutionEnvironment().registerTypeWithKryoSerializer(com.github.rnowling.bps.datagenerator.datamodels.Transaction.class,
>  new FlinkBPSGenerator.TransactionSerializer());
>
>     final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>
>     //when running "env.execute" this stream should start consuming...
>     DataStream<String> dataStream = env.readFileStream("/tmp/a", 1000, 
> FileMonitoringFunction.WatchType.ONLY_NEW_FILES);
>     dataStream.iterate().map(new MapFunction<String, String>() {
>         public String map(String value) throws Exception {
>             System.out.println(value);
>             return ">>> > > > > " + value + " < < < <  <<<";
>         }
>     });
>     try {
>         env.execute();
>     }
>     catch(Exception e){
>         e.printStackTrace();
>     }
> }
>
>
>
> On Sat, Oct 3, 2015 at 9:08 PM, jay vyas <jayunit100.apa...@gmail.com>
> wrote:
>
>> Hi flink !
>>
>> Looks like "setSlotSharing" is throwing an NPE when I try to start a
>> Thread  which runs a streaming job.
>>
>> I'm trying to do this by creating a dataStream from env.readFileStream,
>> and then later starting a job which writes files out ...
>>
>> However, I get
>>
>> Exception in thread "main" java.lang.NullPointerException
>>     at
>> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.setSlotSharing(StreamingJobGraphGenerator.java:361)
>>
>> What is the right way to create a stream and batch job all in one
>> environment?
>>
>> For reference, here is a gist of the code
>> https://gist.github.com/jayunit100/c7ab61d1833708d290df, and the
>> offending line is the
>>
>> DataStream<String> dataStream = env.readFileStream("/tmp/a",1000, 
>> FileMonitoringFunction.WatchType.ONLY_NEW_FILES);
>>
>> line.
>>
>> Thanks again !
>>
>> --
>> jay vyas
>>
>
>
>
> --
> jay vyas
>

Reply via email to