manual scaling with savepoint

2017-01-10 Thread gallenvara
Hi, everyone. Now, Flink can't do with auto-scaling and  we can realize this
by restart the savepoint with different parallelism. I wonder how flink
handle with state managerment. For example, for parallelism=3 to 4, how the
state of 3 deal with new parallelism? Can you explain the internal to me ?
Thanks a lot.





--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/manual-scaling-with-savepoint-tp10974.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Custom writer with Rollingsink

2017-01-10 Thread Biswajit Das
Hello ,

I have to create a custom Parquet writer with rolling sink , I'm seeing
error like this , I'm expecting every partition should write in a new file
?? Any tips  ?

---
18:12:12.551 [flink-akka.actor.default-dispatcher-5] DEBUG
akka.event.EventStream - shutting down: StandardOutLogger started
Exception in thread "main"
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply$mcV$sp(JobManager.scala:822)
at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply(JobManager.scala:768)
at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply(JobManager.scala:768)
at
scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at
scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.RuntimeException: Could not forward element to next
operator
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:376)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:358)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:399)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:381)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:346)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:329)
at
org.apache.flink.streaming.api.operators.StreamSource$NonTimestampContext.collect(StreamSource.java:161)
at
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecord(AbstractFetcher.java:225)
at
org.apache.flink.streaming.connectors.kafka.internals.SimpleConsumerThread.run(SimpleConsumerThread.java:379)
Caused by:
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException):
Failed to CREATE_FILE /y=2017/m=01/d=10/H=18/M=12/_part-0-0.in-progress for
DFSClient_NONMAPREDUCE_1062142735_3


How to get help on ClassCastException when re-submitting a job

2017-01-10 Thread Giuliano Caliari
Hello,



I need some guidance on how to report a bug.



I’m testing version 1.2 on my local cluster and the first time I submit the
job everything works but whenever I re-submit the same job it fails with

org.apache.flink.client.program.ProgramInvocationException: The program
execution failed: Job execution failed.

at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)

at
org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:101)

at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:400)

at
org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)

at
org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:634)

at au.com.my.package.pTraitor.OneTrait.execute(Traitor.scala:147)

at
au.com.my.package.pTraitor.TraitorAppOneTrait$.delayedEndpoint$au$com$my$package$pTraitor$TraitorAppOneTrait$1(TraitorApp.scala:22)

at
au.com.my.package.pTraitor.TraitorAppOneTrait$delayedInit$body.apply(TraitorApp.scala:21)

at scala.Function0$class.apply$mcV$sp(Function0.scala:34)

at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)

at scala.App$$anonfun$main$1.apply(App.scala:76)

at scala.App$$anonfun$main$1.apply(App.scala:76)

at scala.collection.immutable.List.foreach(List.scala:381)

at
scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)

at scala.App$class.main(App.scala:76)

at au.com.my.package.pTraitor.TraitorAppOneTrait$.main(TraitorApp.scala:21)

at au.com.my.package.pTraitor.TraitorAppOneTrait.main(TraitorApp.scala)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:498)

at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528)

at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:419)

at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:339)

at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:831)

at org.apache.flink.client.CliFrontend.run(CliFrontend.java:256)

at
org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1073)

at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1120)

at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1117)

at
org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:29)

at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1116)

Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
execution failed.

at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:900)

at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843)

at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843)

at
scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)

at
scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)

at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)

at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)

at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)

at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Caused by: java.lang.RuntimeException: Could not forward element to next
operator

at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:415)

at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:397)

at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:749)

at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)

at
org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.collectWithTimestamp(StreamSourceContexts.java:272)

at
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:261)

at
org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:88)

at
org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:157)

at

Re: Continuous File monitoring not reading nested files

2017-01-10 Thread Kostas Kloudas
Aljoscha is right!

Any contribution is more than welcomed.

Kostas

> On Jan 10, 2017, at 3:48 PM, Aljoscha Krettek  wrote:
> 
> Yes, please go ahead with the fix! :-)
> 
> (If I'm not mistaken Kostas is working on other stuff right now.)
> 
> On Mon, 9 Jan 2017 at 23:19 Yassine MARZOUGUI  > wrote:
> Hi,
> 
> I found the root cause of the problem : the listEligibleFiles method in 
> ContinuousFileMonitoringFunction scans only the topmost files and ignores the 
> nested files. By fixing that I was able to get the expected output. I created 
> Jira issue: https://issues.apache.org/jira/browse/FLINK-5432 
> .
> 
> @Kostas, If you haven't already started working on a fix for this, I would 
> happily contribute a fix for it if you like.
> 
> Best,
> Yassine
> 
> 2017-01-09 17:23 GMT+01:00 Yassine MARZOUGUI  >:
> Hi Kostas,
> 
> I debugged the code and the nestedFileEnumeration parameter was always true 
> during the execution. I noticed however that in the following loop in 
> ContinuousFileMonitoringFunction, for some reason, the fileStatus was null 
> for files in nested folders, and non null for files directly under the parent 
> path, so no splits were forwarded in the case of nested folders.
> 
> for(int var5 = 0; var5 < var4; ++var5) {
> FileInputSplit split = var3[var5];
> FileStatus fileStatus = 
> (FileStatus)eligibleFiles.get(split.getPath());
> if(fileStatus != null) {
> Long modTime = 
> Long.valueOf(fileStatus.getModificationTime());
> Object splitsToForward = 
> (List)splitsByModTime.get(modTime);
> if(splitsToForward == null) {
> splitsToForward = new ArrayList();
> splitsByModTime.put(modTime, splitsToForward);
> }
> 
> ((List)splitsToForward).add(new 
> TimestampedFileInputSplit(modTime.longValue(), split.getSplitNumber(), 
> split.getPath(), split.getStart(), split.getLength(), split.getHostnames()));
> }
> }
> 
> Thanks,
> Yassine
> 
> 
> 2017-01-09 15:04 GMT+01:00 Kostas Kloudas  >:
> Hi Yassine,
> 
> I suspect that the problem is in the way the input format (and not the 
> reader) scans nested files, 
> but could you see if in the code that is executed by the tasks, the 
> nestedFileEnumeration parameter is still true?
> 
> I am asking in order to pin down if the problem is in the way we ship the 
> code to the tasks or in reading the 
> nested files.
> 
> Thanks,
> Kostas
> 
>> On Jan 9, 2017, at 12:56 PM, Yassine MARZOUGUI > > wrote:
>> 
>> Hi,
>> 
>> Any updates on this issue? Thank you.
>> 
>> Best,
>> Yassine
>> 
>> 
>> On Dec 20, 2016 6:15 PM, "Aljoscha Krettek" > > wrote:
>> +kostas, who probably has the most experience with this by now. Do you have 
>> an idea what might be going on?
>> 
>> On Fri, 16 Dec 2016 at 15:45 Yassine MARZOUGUI > > wrote:
>> Looks like this is not specific to the continuous file monitoring, I'm 
>> having the same issue (files in nested directories are not read) when using:
>> 
>> env.readFile(fileInputFormat, "hdfs:///shared/mydir <>", 
>> FileProcessingMode.PROCESS_ONCE, -1L)
>> 
>> 2016-12-16 11:12 GMT+01:00 Yassine MARZOUGUI > >:
>> Hi all,
>> 
>> I'm using the following code to continuously process files from a directory 
>> "mydir".
>> 
>> final StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> 
>> FileInputFormat fileInputFormat = new TextInputFormat(new 
>> Path("hdfs:///shared/mydir <>"));
>> fileInputFormat.setNestedFileEnumeration(true);
>> 
>> env.readFile(fileInputFormat,
>> "hdfs:///shared/mydir <>",
>> FileProcessingMode.PROCESS_CONTINUOUSLY, 1L)
>> .print();
>> 
>> env.execute();
>> 
>> If I add directory under mydir, say "2016-12-16", and then add a file 
>> "2016-12-16/file.txt", its contents are not printed. If I add the same file 
>> directly under "mydir",  its contents are correctly printed. After that the 
>> logs will show the following :
>> 
>> 10:55:44,928 DEBUG 
>> org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction
>>   - Ignoring hdfs://mlxbackoffice/shared/ <>mydir/2016-12-16, with mod time= 
>> 1481882041587 and global mod time= 1481882126122
>> 10:55:44,928 DEBUG 
>> org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction
>>   - Ignoring 

FLINK-5236. flink 1.1.3 and modifying the classpath to use another version of fasterxml.

2017-01-10 Thread Estela Gallardo Zapata
Hello everyone,


I'm new in  the flink mailing-list :)  and I'll try to be respectful and 
helpful.

I'm writing  to this mailing list, due to an issue l have with Flink 1.1.3 and 
my application.jar, related to this previous thread: 
https://issues.apache.org/jira/browse/FLINK-5236


I use Intelli-J to run locally my maven application and it runs correctly with 
flink 1.1.3 and scala 2.10.

But when I try to run my application's jar in the cluster, it seems that there 
is an issue with library faxterxml.

It seems that Flink uses a different version of this library and, as it 
overwrites the user-classpath, I can't get my application running in the 
cluster.

I've tried modifying the Hadoop-conf file, making a flat-jar, modifying 
HADOOP_USER_CLASSPATH_FIRST, copy my application's jar directly in flink/lib...


Could someone please give me some more clues or help me clarify the explanation 
in the mailling list?


Thank you very much.

Estela.


Re: Reading and Writing to S3

2017-01-10 Thread M. Dale
Sam,  I just happened to answer a similar question on Stackoverflow at Does 
Apache Flink AWS S3 Sink require Hadoop for local testing?. I also submitted a 
PR to make that (for me) a little clearer on the Apache Flink documentation 
(https://github.com/apache/flink/pull/3054/files).  
|  
|   
|   
|   ||

   |

  |
|  
|   |  
Does Apache Flink AWS S3 Sink require Hadoop for local testing?
 I am relatively new to Apache Flink and I am trying to create a simple project 
that produces a file to an AWS S3...  |   |

  |

  |

 
Let me know if that works for you.
Thanks,Markus 

On Tuesday, January 10, 2017 3:17 PM, Samra Kasim 
 wrote:
 

 Hi,
I am new to Flink and I've written two small test projects: 1) to read data 
from s3 and 2) to push data to s3. However, I am getting two different errors 
for the projects relating to, i think, how the core-site.xml file is being 
read. I am running the project locally in IntelliJ. I have the environment 
variable in run configurations set to 
HADOOP_HOME=path/to/dir-with-core-site.xml. I have also tried saving the 
core-site.xml in the src/main/resources folder but get the same errors. I want 
to know if my core-site.xml file is configured correctly for using s3a and how 
to have IntelliJ read the core-site.xml file? Also, are the core-site.xml 
configurations different for reading versus writing to s3?
This is my code for reading data from s3:
public class DesktopWriter { public static voidmain(String[] args) throws 
Exception {ExecutionEnvironment env 
=ExecutionEnvironment.createLocalEnvironment();   DataSet data = 
env.readTextFile("s3://flink-test/flink-test.txt");    data.print();    }}I 
get the error: Caused by: java.io.IOException: Cannot determine access key to 
Amazon S3. Please make sure to configure it by setting the configuration key 
'fs.s3.accessKey'.This is my code for writing to S3:public class S3Sink {
    public static void main(String[] args) throws Exception {
        Map configs = ConfigUtils.loadConfigs(“path/ 
to/config.yaml");

        final ParameterTool parameterTool = ParameterTool.fromMap(configs) ;

        StreamExecutionEnvironment env = StreamExecutionEnvironment.get 
ExecutionEnvironment();
        env.getConfig(). disableSysoutLogging();
        env.getConfig(). setGlobalJobParameters( parameterTool); 

        DataStream messageStream = env
                .addSource(new FlinkKafkaConsumer09(
                        parameterTool.getRequired(" kafka.topic"),
                        new SimpleStringSchema(),
                        parameterTool.getProperties()) );

        messageStream.writeAsText(" 
s3a://flink-test/flinktest.txt").setParallelism(1);

        env.execute();
    }I get the error: Caused by: java.io.IOException: The given file URI 
(s3://flink-test/flinktest.txt) points to the HDFS NameNode at flink-test, but 
the File System could not be initialized with that address: Unable to load AWS 
credentials from any provider in the chain
This is my core-site.xml:
            fs.defaultFS        
hdfs://localhost:9000                
fs.s3.impl        org.apache.hadoop.fs. 
s3a.S3AFileSystem    
                
fs.s3a.buffer.dir        /tmp    
                fs.s3a.awsAccessKeyId        
*    
                fs.s3a. 
awsSecretAccessKey        *    
This is my pom.xml:    
   org.apache.flink   
flink-java   1.1.4   
    org.apache.flink    
flink-streaming-java_2.10   
1.1.4       
org.apache.flink   
flink-clients_2.10   1.1.4   
    org.apache.flink   
flink-connector-kafka-0.9_2.10   
1.1.4       
com.amazonaws    aws-java-sdk   
1.7.4       
org.apache.hadoop   hadoop-aws  
 2.7.2   
org.apache.httpcomponents   
httpclient   4.2.5   
       
org.apache.httpcomponents   
httpcore   4.2.5   

Thanks!Sam

   

Re: keyBy called twice. Second time, INetAddress and Array[Byte] are empty

2017-01-10 Thread Jamie Grier
Hi Jonas,

The issue has to do with serializing/deserializing InetAddress.  If you
look at the InetAddress class the data members that hold the actual ip
address are transient fields and such are not serialized/deserialized in
the way that you would expect.  This is what is causing the issue.

I suggest you simply do not use InetAddress in your Person data type but
rather a simple string or other properly serializable type for instance.

-Jamie


On Mon, Jan 9, 2017 at 9:46 AM, Jonas  wrote:

> So I created a minimal working example where this behaviour can still be
> seen. It is 15 LOC and can be downloaded here:
> https://github.com/JonasGroeger/flink-inetaddress-zeroed
>
> To run it, use sbt:
>
> If you don't want to do the above fear not, here is the code:
>
> For some reason, java.net.InetAddress objects get zeroed. Why is that?
>
>
>
> --
> View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/keyBy-called-
> twice-Second-time-INetAddress-and-Array-Byte-are-empty-tp10907p10947.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>



-- 

Jamie Grier
data Artisans, Director of Applications Engineering
@jamiegrier 
ja...@data-artisans.com


Reading and Writing to S3

2017-01-10 Thread Samra Kasim
Hi,

I am new to Flink and I've written two small test projects: 1) to read data
from s3 and 2) to push data to s3. However, I am getting two different
errors for the projects relating to, i think, how the core-site.xml file is
being read. I am running the project locally in IntelliJ. I have the
environment variable in run configurations set to
HADOOP_HOME=path/to/dir-with-core-site.xml. I have also tried saving the
core-site.xml in the src/main/resources folder but get the same errors. I
want to know if my core-site.xml file is configured correctly for using s3a
and how to have IntelliJ read the core-site.xml file? Also, are the
core-site.xml configurations different for reading versus writing to s3?

This is my code for reading data from s3:

public class DesktopWriter {



public static void main(String[] args) throws Exception {



ExecutionEnvironment env =
ExecutionEnvironment.createLocalEnvironment();

DataSet data =
env.readTextFile("s3://flink-test/flink-test.txt");

data.print();

}

}

I get the error: Caused by: java.io.IOException: Cannot determine access
key to Amazon S3. Please make sure to configure it by setting the
configuration key 'fs.s3.accessKey'.

This is my code for writing to S3:

public class S3Sink {
public static void main(String[] args) throws Exception {
Map configs = ConfigUtils.*loadConfigs*(“path/
to/config.yaml");

final ParameterTool parameterTool = ParameterTool.*fromMap*(configs)
;

StreamExecutionEnvironment env = StreamExecutionEnvironment.
*getExecutionEnvironment*();
env.getConfig().disableSysoutLogging();
env.getConfig().setGlobalJobParameters(parameterTool);

DataStream messageStream = env
.addSource(new FlinkKafkaConsumer09(
parameterTool.getRequired("kafka.topic"),
new SimpleStringSchema(),
parameterTool.getProperties()));

messageStream.writeAsText("s3a://flink-test/flinktest.txt"
).setParallelism(1);

env.execute();
}

I get the error: Caused by: java.io.IOException: The given file URI
(s3://flink-test/flinktest.txt) points to the HDFS NameNode at flink-test,
but the File System could not be initialized with that address: Unable to
load AWS credentials from any provider in the chain

This is my core-site.xml:





fs.defaultFS

hdfs://localhost:9000





fs.s3.impl

org.apache.hadoop.fs.s3a.S3AFileSystem








fs.s3a.buffer.dir

/tmp








fs.s3a.awsAccessKeyId

*








fs.s3a.awsSecretAccessKey

*




This is my pom.xml:





org.apache.flink

flink-java

1.1.4







org.apache.flink

flink-streaming-java_2.10

1.1.4







org.apache.flink

flink-clients_2.10

1.1.4







org.apache.flink

flink-connector-kafka-0.9_2.10

1.1.4







com.amazonaws

aws-java-sdk

1.7.4







org.apache.hadoop

hadoop-aws

2.7.2







org.apache.httpcomponents

httpclient

4.2.5





org.apache.httpcomponents

httpcore

4.2.5





Thanks!
Sam


Re: Queryable State

2017-01-10 Thread Dawid Wysakowicz
Hey Ufuk.
Did you maybe had a while to have a look at that problem?

2017-01-09 10:47 GMT+01:00 Ufuk Celebi :

> Hey Dawid! Thanks for reporting this. I will try to have a look over
> the course of the day. From a first impression, this seems like a bug
> to me.
>
> On Sun, Jan 8, 2017 at 4:43 PM, Dawid Wysakowicz
>  wrote:
> > Hi I was experimenting with the Query State feature and I have some
> problems
> > querying the state.
> >
> > The code which I use to produce the queryable state is:
> >
> > env.addSource(kafkaConsumer).map(
> >   e => e match {
> > case LoginClickEvent(_, t) => ("login", 1, t)
> > case LogoutClickEvent(_, t) => ("logout", 1, t)
> > case ButtonClickEvent(_, _, t) => ("button", 1, t)
> >   }).keyBy(0).timeWindow(Time.seconds(1))
> >   .reduce((e1, e2) => (e1._1, e1._2 + e2._2, Math.max(e1._3, e2._3)))
> >   .map(e => new KeyedDataPoint[java.lang.Integer](e._1, e._3, e._2))
> >   .keyBy("key")
> >   .asQueryableState(
> > "type-time-series-count",
> > new ListStateDescriptor[KeyedDataPoint[java.lang.Integer]](
> >   "type-time-series-count",
> >   classOf[KeyedDataPoint[java.lang.Integer]]))
> >
> > As you see it is a rather simple job, in which I try to count events of
> > different types in windows and then query by event type.
> >
> > In client code I do:
> > // Query Flink state
> > val future = client.getKvState(jobId, "type-time-series-count",
> > key.hashCode, seralizedKey)
> >
> > // Await async result
> > val serializedResult: Array[Byte] = Await.result(
> >   future, new FiniteDuration(
> > 10,
> > duration.SECONDS))
> >
> > // Deserialize response
> > val results = deserializeResponse(serializedResult)
> >
> > results
> >   }
> >
> >   private def deserializeResponse(serializedResult: Array[Byte]):
> > util.List[KeyedDataPoint[lang
> >   .Integer]] = {
> > KvStateRequestSerializer.deserializeList(serializedResult,
> > getValueSerializer())
> >   }
> >
> > As I was trying to debug the issue I see the first element in list gets
> > deserialized correctly, but it fails on the second one. It seems like the
> > serialized result is broken. Do you have any idea if I am doing sth
> wrong or
> > there is some bug?
> >
> >
> > The exception I get is:
> > java.io.EOFException: null
> > at
> > org.apache.flink.runtime.util.DataInputDeserializer.readFully(
> DataInputDeserializer.java:157)
> > at
> > org.apache.flink.runtime.util.DataInputDeserializer.readUTF(
> DataInputDeserializer.java:240)
> > at
> > org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(
> PojoSerializer.java:386)
> > at
> > org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer.
> deserializeList(KvStateRequestSerializer.java:487)
> > at
> > com.dataartisans.stateserver.queryclient.QueryClient.
> deserializeResponse(QueryClient.scala:44)
> >
> > You can browse the exact code at: https://github.com/dawidwys/
> flink-intro
> >
> > I would be grateful for any advice.
> >
> > Regards
> > Dawid Wysakowicz
>


Re: Sliding Event Time Window Processing: Window Function inconsistent behavior

2017-01-10 Thread Sujit Sakre
Hi Aljoscha,

Thanks.

I have used the following code for testing:

main

keyedStream.keyBy(4)
.window(SlidingEventTimeWindows.of(Time.minutes6), Time.minutes(2)))   // 6
min window with 2 min sliding window
.apply(new CustomSlidingWindowFunction());

keyedStream.addSink(new SinkFunction>() {
/**
*
*/
private static final long serialVersionUID = 1L;

public void invoke(Tuple5 value) {
System.out.println(value.f1.toString().trim()+", " +
value.f0 + ", "+value.f2 + ", " + value.f3);
}
});


in WindowFunction apply

...

// Condition for selecting a window
if(d.after(x) && d.before(y)){

for (Tuple5 tr: input){
// Write the window to Collector
out.collect(new Tuple5<>(tr.f0, tr.f1, tr.f2, tr.f3, tr.f4));
}

I am getting all input records instead of those windows selected by
the condition. Is there something I am doing wrong? Does this need to be
done in a different way?

Please let me know.

Thanks.



*Sujit Sakre*

Senior Technical Architect
Tel: +91 22 6660 6600
Ext:
247
Direct: 6740 5247

Mobile: +91 98672 01204

www.rave-tech.com



Follow us on: Twitter  / LinkedIn
 / YouTube




Rave Technologies – A Northgate Public Services Company




Please consider the environment before printing this email

On 10 January 2017 at 20:24, Aljoscha Krettek  wrote:

> Hi,
> instead of writing to files, could you please simply output a value using
> the Collector and then write the result stream of the window operation to a
> sink (such as a file sink) to see how many windows are being processed.
> Having side effects (especially output) in user functions can lead to
> programs with quite unexpected behaviour and I would highly discourage
> doing that.
>
> Cheers,
> Aljoscha
>
> On Tue, 10 Jan 2017 at 13:44 Sujit Sakre 
> wrote:
>
>> Hi,
>>
>> In the link (http://stackoverflow.com/questions/41143518/sliding-
>> processing-time-window-computes-inconsistent-results), Fabian has
>> mentioned that if Event Time is used, consistent results are possible.
>>
>> However, that's not the case with us. We are getting very random results.
>>
>> Please suggest.
>>
>>
>> *Sujit Sakre*
>>
>>
>> On 9 January 2017 at 22:27, Sujit Sakre 
>> wrote:
>>
>> Hi,
>>
>> We are using Sliding Event Time Window with Kafka Consumer. The window
>> size is 6 minutes, and slide is 2 minutes. We have written a window
>> function to select a particular window out of multiple windows for a keyed
>> stream, e.g. we select about 16 windows out of multiple windows for the
>> keyed stream based on a particular condition.
>>
>> Upon a normal execution, we get 16 windows for processing inside the
>> condition (in window function mentioned). These windows we are putting in
>> different files, named after window start and end times.
>>
>> the code is as below:
>>
>> Calling code
>>
>>
>> public class RealTimeProcessingSlidingWindow{
>>
>> public static void main(String[] args) throws Exception {
>>
>> // set up the execution environment
>> StreamExecutionEnvironment env = StreamExecutionEnvironment.
>> getExecutionEnvironment();
>>
>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>
>> // configure the Kafka consumer
>> Properties kafkaProps = new Properties();
>> kafkaProps.setProperty("zookeeper.connect", LOCAL_ZOOKEEPER_HOST);
>> kafkaProps.setProperty("bootstrap.servers", LOCAL_KAFKA_BROKER);
>> kafkaProps.setProperty("group.id", DEMO_GROUP);
>> // always read the Kafka topic from the start
>> kafkaProps.setProperty("auto.offset.reset" ,"earliest");
>>
>> FlinkKafkaConsumer09> Float, String>> consumer = new FlinkKafkaConsumer09<>(
>> "test",// kafka topic name
>> new dataSchema(),
>> kafkaProps);
>> DataStream>
>> stream1 = env.addSource(consumer);
>> DataStream>
>> keyedStream = stream1.assignTimestampsAndWatermarks(new
>> BoundedOutOfOrdernessGenerator2());
>>
>> keyedStream.keyBy(4)
>> .window(SlidingEventTimeWindows.of(Time.minutes6), Time.minutes(2)))
>> // 6 min window with 2 min sliding window
>> .apply(new CustomSlidingWindowFunction());
>>
>> env.execute("Sliding Event Time Window Processing");
>>
>>}
>> }
>>
>>
>> public static class CustomSlidingWindowFunction implements
>> WindowFunction

Re: window function outputs two different values

2017-01-10 Thread Yury Ruchin
Hi,

Is there a strict requirement that elements must proceed along the
processing pipeline exactly after being accounted by the reduce function?
If not, you could derive two streams from the original one to be processed
concurrently, something like this:

val protoStream = kafka source -> keyBy

val aggregateStream = protoStream -> window -> reduce
val someOtherStream = protoStream -> 

Or, if the above is not an option and window collection latency is not an
issue, you could just use generic window function or fold function. The
former gives access to window elements as an iterable, the latter allows
using custom accumulator that contains the intermediate count and window
elements seen so far.

Regards,
Yury

2017-01-10 17:43 GMT+03:00 Aljoscha Krettek :

> Hi,
> I'm afraid this is not possible with the current model. A reduce function
> is only meant to combine two values and output the result of that. Side
> effects, such as emitting further data are not allowed right now.
>
> Cheers,
> Aljoscha
>
> On Mon, 9 Jan 2017 at 15:27 tao xiao  wrote:
>
>> Hi team,
>>
>> any suggestions on below topic?
>>
>> I have a requirement that wants to output two different values from a
>> time window reduce function. Here is basic workflow
>>
>> 1. fetch data from Kafka
>> 2. flow the data to a event session window. kafka source -> keyBy ->
>> session window -> reduce
>> 3. inside the reduce function, count the number of data and also emit the
>> data itself to another operator for further processing
>>
>> As the reduce function can only emit the count, I want to know how to
>> also emit the data as well?
>>
>>
>>
>> On Sat, 7 Jan 2017 at 20:30 tao xiao  wrote:
>>
>> Hi team,
>>
>> I have a requirement that wants to output two different values from a
>> time window reduce function. Here is basic workflow
>>
>> 1. fetch data from Kafka
>> 2. flow the data to a event session window. kafka source -> keyBy ->
>> session window -> reduce
>> 3. inside the reduce function, count the number of data and also emit the
>> data itself to another operator for further processing
>>
>> As the reduce function can only emit the count, I want to know how to
>> also emit the data as well?
>>
>>


Re: Sliding Event Time Window Processing: Window Function inconsistent behavior

2017-01-10 Thread Aljoscha Krettek
Hi,
instead of writing to files, could you please simply output a value using
the Collector and then write the result stream of the window operation to a
sink (such as a file sink) to see how many windows are being processed.
Having side effects (especially output) in user functions can lead to
programs with quite unexpected behaviour and I would highly discourage
doing that.

Cheers,
Aljoscha

On Tue, 10 Jan 2017 at 13:44 Sujit Sakre 
wrote:

> Hi,
>
> In the link (
> http://stackoverflow.com/questions/41143518/sliding-processing-time-window-computes-inconsistent-results),
> Fabian has mentioned that if Event Time is used, consistent results are
> possible.
>
> However, that's not the case with us. We are getting very random results.
>
> Please suggest.
>
>
> *Sujit Sakre*
>
>
> On 9 January 2017 at 22:27, Sujit Sakre 
> wrote:
>
> Hi,
>
> We are using Sliding Event Time Window with Kafka Consumer. The window
> size is 6 minutes, and slide is 2 minutes. We have written a window
> function to select a particular window out of multiple windows for a keyed
> stream, e.g. we select about 16 windows out of multiple windows for the
> keyed stream based on a particular condition.
>
> Upon a normal execution, we get 16 windows for processing inside the
> condition (in window function mentioned). These windows we are putting in
> different files, named after window start and end times.
>
> the code is as below:
>
> Calling code
>
>
> public class RealTimeProcessingSlidingWindow{
>
> public static void main(String[] args) throws Exception {
>
> // set up the execution environment
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>
> // configure the Kafka consumer
> Properties kafkaProps = new Properties();
> kafkaProps.setProperty("zookeeper.connect", LOCAL_ZOOKEEPER_HOST);
> kafkaProps.setProperty("bootstrap.servers", LOCAL_KAFKA_BROKER);
> kafkaProps.setProperty("group.id", DEMO_GROUP);
> // always read the Kafka topic from the start
> kafkaProps.setProperty("auto.offset.reset" ,"earliest");
>
> FlinkKafkaConsumer09 String>> consumer = new FlinkKafkaConsumer09<>(
> "test",// kafka topic name
> new dataSchema(),
> kafkaProps);
> DataStream>
> stream1 = env.addSource(consumer);
> DataStream>
> keyedStream = stream1.assignTimestampsAndWatermarks(new
> BoundedOutOfOrdernessGenerator2());
>
> keyedStream.keyBy(4)
> .window(SlidingEventTimeWindows.of(Time.minutes6), Time.minutes(2)))   //
> 6 min window with 2 min sliding window
> .apply(new CustomSlidingWindowFunction());
>
> env.execute("Sliding Event Time Window Processing");
>
>}
> }
>
>
> public static class CustomSlidingWindowFunction implements
> WindowFunction, Tuple5 String, Float, Float, String>, Tuple, TimeWindow>{
>
> @Override
> public void apply(Tuple key, TimeWindow window, Iterable String, Float, Float, String>> input,
> Collector> out) throws
> Exception {
>
> HashMap> windowMap=
> new HashMap>();
> for (Tuple5 wr: input){
> windowMap.put(wr.f1.toString().trim(), wr);
> }
>
> ...
>
> SimpleDateFormat sf = new SimpleDateFormat(IndianDateTimeFormat);
>
> if(windowMap.containsKey(tk)){
> Tuple5 t = (Tuple5 Float, Float, String>) windowMap.get(tk);
>
> Date d = sf.parse(t.f0.trim());
>
> ...
>
> // Condition for selecting a window
> if(d.after(x) && d.before(y)){
> // Write the window output to separate files named after window Lat and Lon
> writeWindowToFile(t, window, input);
> }
>  }
> }
> }
>
> // Get the buffered writer
> private static synchronized BufferedWriter getWriter(String fileName)
> throws IOException{
> return new BufferedWriter(new FileWriter(fileName, true));
> }
> // Writes an entire window to file for the records in that window
> private static synchronized void writeWindowToFile(Tuple5 Float, Float, String> target, TimeWindow window, Iterable String, Float, Float, String>> input) throws IOException{
> // Create a file to write a window to
> String fileName = target.f2.toString() + "-" + target.f3.toString()+".csv";
> BufferedWriter br = getWriter(fileName);
>
> // Iterate and put the records in file
> for (Tuple5

Re: Continuous File monitoring not reading nested files

2017-01-10 Thread Aljoscha Krettek
Yes, please go ahead with the fix! :-)

(If I'm not mistaken Kostas is working on other stuff right now.)

On Mon, 9 Jan 2017 at 23:19 Yassine MARZOUGUI 
wrote:

> Hi,
>
> I found the root cause of the problem : the listEligibleFiles method in 
> ContinuousFileMonitoringFunction
> scans only the topmost files and ignores the nested files. By fixing that
> I was able to get the expected output. I created Jira issue:
> https://issues.apache.org/jira/browse/FLINK-5432.
>
> @Kostas, If you haven't already started working on a fix for this, I would
> happily contribute a fix for it if you like.
>
> Best,
> Yassine
>
> 2017-01-09 17:23 GMT+01:00 Yassine MARZOUGUI :
>
> Hi Kostas,
>
> I debugged the code and the nestedFileEnumeration parameter was always
> true during the execution. I noticed however that in the following loop
> in ContinuousFileMonitoringFunction, for some reason, the fileStatus was
> null for files in nested folders, and non null for files directly under the
> parent path, so no splits were forwarded in the case of nested folders.
>
> for(int var5 = 0; var5 < var4; ++var5) {
> FileInputSplit split = var3[var5];
> FileStatus fileStatus =
> (FileStatus)eligibleFiles.get(split.getPath());
> if(fileStatus != null) {
> Long modTime =
> Long.valueOf(fileStatus.getModificationTime());
> Object splitsToForward =
> (List)splitsByModTime.get(modTime);
> if(splitsToForward == null) {
> splitsToForward = new ArrayList();
> splitsByModTime.put(modTime, splitsToForward);
> }
>
> ((List)splitsToForward).add(new
> TimestampedFileInputSplit(modTime.longValue(), split.getSplitNumber(),
> split.getPath(), split.getStart(), split.getLength(),
> split.getHostnames()));
> }
> }
>
> Thanks,
> Yassine
>
>
> 2017-01-09 15:04 GMT+01:00 Kostas Kloudas :
>
> Hi Yassine,
>
> I suspect that the problem is in the way the input format (and not the
> reader) scans nested files,
> but could you see if in the code that is executed by the tasks, the
> nestedFileEnumeration parameter is still true?
>
> I am asking in order to pin down if the problem is in the way we ship the
> code to the tasks or in reading the
> nested files.
>
> Thanks,
> Kostas
>
> On Jan 9, 2017, at 12:56 PM, Yassine MARZOUGUI 
> wrote:
>
> Hi,
>
> Any updates on this issue? Thank you.
>
> Best,
> Yassine
>
>
> On Dec 20, 2016 6:15 PM, "Aljoscha Krettek"  wrote:
>
> +kostas, who probably has the most experience with this by now. Do you
> have an idea what might be going on?
>
> On Fri, 16 Dec 2016 at 15:45 Yassine MARZOUGUI 
> wrote:
>
> Looks like this is not specific to the continuous file monitoring, I'm
> having the same issue (files in nested directories are not read) when using:
>
> env.readFile(fileInputFormat, "hdfs:///shared/mydir", 
> FileProcessingMode.PROCESS_ONCE,
> -1L)
>
> 2016-12-16 11:12 GMT+01:00 Yassine MARZOUGUI :
>
> Hi all,
>
> I'm using the following code to continuously process files from a
> directory "mydir".
>
> final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>
> FileInputFormat fileInputFormat = new TextInputFormat(new Path("
> hdfs:///shared/mydir"));
> fileInputFormat.setNestedFileEnumeration(true);
>
> env.readFile(fileInputFormat,
> "hdfs:///shared/mydir",
> FileProcessingMode.PROCESS_CONTINUOUSLY, 1L)
> .print();
>
> env.execute();
>
> If I add directory under mydir, say "2016-12-16", and then add a file "
> *2016-12-16/file.txt"*, its contents are not printed. If I add the same
> file directly under "*mydir"*,  its contents are correctly printed. After
> that the logs will show the following :
>
> 10:55:44,928 DEBUG
> org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction
>  - Ignoring hdfs://mlxbackoffice/shared/mydir/2016-12-16, with mod time=
> 1481882041587 and global mod time= 1481882126122
> 10:55:44,928 DEBUG
> org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction
>  - Ignoring hdfs://mlxbackoffice/shared/mydir/file.txt, with mod time=
> 1481881788704 and global mod time= 1481882126122
>
> Looks like the ContinuousFileMonitoringFunction  considered it already
> read 2016-12-16 as a file and then excludes it, but its contents were not
> processed. Any Idea why this happens?
> Thank you.
>
> Best,
> Yassine
>
>
>
>
>
>
>


Re: window function outputs two different values

2017-01-10 Thread Aljoscha Krettek
Hi,
I'm afraid this is not possible with the current model. A reduce function
is only meant to combine two values and output the result of that. Side
effects, such as emitting further data are not allowed right now.

Cheers,
Aljoscha

On Mon, 9 Jan 2017 at 15:27 tao xiao  wrote:

> Hi team,
>
> any suggestions on below topic?
>
> I have a requirement that wants to output two different values from a time
> window reduce function. Here is basic workflow
>
> 1. fetch data from Kafka
> 2. flow the data to a event session window. kafka source -> keyBy ->
> session window -> reduce
> 3. inside the reduce function, count the number of data and also emit the
> data itself to another operator for further processing
>
> As the reduce function can only emit the count, I want to know how to also
> emit the data as well?
>
>
>
> On Sat, 7 Jan 2017 at 20:30 tao xiao  wrote:
>
> Hi team,
>
> I have a requirement that wants to output two different values from a time
> window reduce function. Here is basic workflow
>
> 1. fetch data from Kafka
> 2. flow the data to a event session window. kafka source -> keyBy ->
> session window -> reduce
> 3. inside the reduce function, count the number of data and also emit the
> data itself to another operator for further processing
>
> As the reduce function can only emit the count, I want to know how to also
> emit the data as well?
>
>


Re: 答复: [DISCUSS] Apache Flink 1.2.0 RC0 (Non-voting testing release candidate)

2017-01-10 Thread Robert Metzger
Hi,

this depends a lot on the number of issues we find during the testing.


These are the issues I found so far:

https://issues.apache.org/jira/browse/FLINK-5379 (unresolved)
https://issues.apache.org/jira/browse/FLINK-5383 (resolved)
https://issues.apache.org/jira/browse/FLINK-5382 (resolved)
https://issues.apache.org/jira/browse/FLINK-5381 (resolved)
https://issues.apache.org/jira/browse/FLINK-5380 (pending PR)




On Tue, Jan 10, 2017 at 11:58 AM, shijinkui  wrote:

> Do we have a probable time of 1.2 release? This month or Next month?
>
> -邮件原件-
> 发件人: Robert Metzger [mailto:rmetz...@apache.org]
> 发送时间: 2017年1月3日 20:44
> 收件人: d...@flink.apache.org
> 抄送: user@flink.apache.org
> 主题: [DISCUSS] Apache Flink 1.2.0 RC0 (Non-voting testing release candidate)
>
> Hi,
>
> First of all, I wish everybody a happy new year 2017.
>
> I've set user@flink in CC so that users who are interested in helping
> with the testing get notified. Please respond only to the dev@ list to
> keep the discussion there!
>
> According to the 1.2 release discussion thread, I've created a first
> release candidate for Flink 1.2.
> The release candidate will not be the final release, because I'm certain
> that we'll find at least one blocking issue in the candidate :)
>
> Therefore, the RC is meant as a testing only release candidate.
> Please report every issue we need to fix before the next RC in this thread
> so that we have a good overview.
>
> The release artifacts are located here:
> http://people.apache.org/~rmetzger/flink-1.2.0-rc0/
>
> The maven staging repository is located here:
> https://repository.apache.org/content/repositories/orgapacheflink-
>
> The release commit (in branch "release-1.2.0-rc0"):
> http://git-wip-us.apache.org/repos/asf/flink/commit/f3c59ced
>
>
> Happy testing!
>


Re: Sliding Event Time Window Processing: Window Function inconsistent behavior

2017-01-10 Thread Sujit Sakre
Hi,

In the link (
http://stackoverflow.com/questions/41143518/sliding-processing-time-window-computes-inconsistent-results),
Fabian has mentioned that if Event Time is used, consistent results are
possible.

However, that's not the case with us. We are getting very random results.

Please suggest.


*Sujit Sakre*


On 9 January 2017 at 22:27, Sujit Sakre  wrote:

> Hi,
>
> We are using Sliding Event Time Window with Kafka Consumer. The window
> size is 6 minutes, and slide is 2 minutes. We have written a window
> function to select a particular window out of multiple windows for a keyed
> stream, e.g. we select about 16 windows out of multiple windows for the
> keyed stream based on a particular condition.
>
> Upon a normal execution, we get 16 windows for processing inside the
> condition (in window function mentioned). These windows we are putting in
> different files, named after window start and end times.
>
> the code is as below:
>
> Calling code
>
>
> public class RealTimeProcessingSlidingWindow{
>
> public static void main(String[] args) throws Exception {
>
> // set up the execution environment
> StreamExecutionEnvironment env = StreamExecutionEnvironment.get
> ExecutionEnvironment();
>
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>
> // configure the Kafka consumer
> Properties kafkaProps = new Properties();
> kafkaProps.setProperty("zookeeper.connect", LOCAL_ZOOKEEPER_HOST);
> kafkaProps.setProperty("bootstrap.servers", LOCAL_KAFKA_BROKER);
> kafkaProps.setProperty("group.id", DEMO_GROUP);
> // always read the Kafka topic from the start
> kafkaProps.setProperty("auto.offset.reset" ,"earliest");
>
> FlinkKafkaConsumer09 String>> consumer = new FlinkKafkaConsumer09<>(
> "test",// kafka topic name
> new dataSchema(),
> kafkaProps);
> DataStream>
> stream1 = env.addSource(consumer);
> DataStream>
> keyedStream = stream1.assignTimestampsAndWatermarks(new
> BoundedOutOfOrdernessGenerator2());
>
> keyedStream.keyBy(4)
> .window(SlidingEventTimeWindows.of(Time.minutes6), Time.minutes(2)))   //
> 6 min window with 2 min sliding window
> .apply(new CustomSlidingWindowFunction());
>
> env.execute("Sliding Event Time Window Processing");
>
>}
> }
>
>
> public static class CustomSlidingWindowFunction implements
> WindowFunction, Tuple5 String, Float, Float, String>, Tuple, TimeWindow>{
>
> @Override
> public void apply(Tuple key, TimeWindow window, Iterable String, Float, Float, String>> input,
> Collector> out) throws
> Exception {
>
> HashMap> windowMap=
> new HashMap>();
> for (Tuple5 wr: input){
> windowMap.put(wr.f1.toString().trim(), wr);
> }
>
> ...
>
> SimpleDateFormat sf = new SimpleDateFormat(IndianDateTimeFormat);
>
> if(windowMap.containsKey(tk)){
> Tuple5 t = (Tuple5 Float, Float, String>) windowMap.get(tk);
>
> Date d = sf.parse(t.f0.trim());
>
> ...
>
> // Condition for selecting a window
> if(d.after(x) && d.before(y)){
> // Write the window output to separate files named after window Lat and Lon
> writeWindowToFile(t, window, input);
> }
>  }
> }
> }
>
> // Get the buffered writer
> private static synchronized BufferedWriter getWriter(String fileName)
> throws IOException{
> return new BufferedWriter(new FileWriter(fileName, true));
> }
> // Writes an entire window to file for the records in that window
> private static synchronized void writeWindowToFile(Tuple5 Float, Float, String> target, TimeWindow window, Iterable String, Float, Float, String>> input) throws IOException{
> // Create a file to write a window to
> String fileName = target.f2.toString() + "-" + target.f3.toString()+".csv";
> BufferedWriter br = getWriter(fileName);
>
> // Iterate and put the records in file
> for (Tuple5 tr: input){
> br.write(tr.f1.toString().trim()+", "+
> convertLongIntoDate(window.getStart())+", 
> "+convertLongIntoDate(window.getEnd())+",
> "+
> tr.f0+", "+tr.f2+", "+tr.f3+'\n');
> }
> // flush the writer and close it
> br.close();
> }
>
> We have written the code to be threadsafe while creating and writing to
> file
>
> In this code, If we execute the code multiple times on the Kafka Stream
> (with certain records) most times we get 16 files with corresponding window
> 

Re: Increasing parallelism skews/increases overall job processing time linearly

2017-01-10 Thread Chakravarthy varaga
Hi Guys,

I understand that you are extremely busy but any pointers here is
highly appreciated. I can proceed forward towards concluding the activity !

Best Regards
CVP

On Mon, Jan 9, 2017 at 11:43 AM, Chakravarthy varaga <
chakravarth...@gmail.com> wrote:

> Anything that I could check or collect for you for investigation ?
>
> On Sat, Jan 7, 2017 at 1:35 PM, Chakravarthy varaga <
> chakravarth...@gmail.com> wrote:
>
>> Hi Stephen
>>
>> . Kafka version is: 0.9.0.1 the connector is flinkconsumer09
>> . The flatmap n coflatmap are connected by keyBy
>> . No data is broadcasted and the data is not exploded based on the
>> parallelism
>>
>> Cvp
>>
>> On 6 Jan 2017 20:16, "Stephan Ewen"  wrote:
>>
>>> Hi!
>>>
>>> You are right, parallelism 2 should be faster than parallelism 1 ;-) As
>>> ChenQin pointed out, having only 2 Kafka Partitions may prevent further
>>> scaleout.
>>>
>>> Few things to check:
>>>   - How are you connecting the FlatMap and CoFlatMap? Default, keyBy,
>>> broadcast?
>>>   - Broadcast for example would multiply the data based on parallelism,
>>> can lead to slowdown when saturating the network.
>>>   - Are you using the standard Kafka Source (which Kafka version)?
>>>   - Is there any part in the program that multiplies data/effort with
>>> higher parallelism (does the FlatMap explode data based on parallelism)?
>>>
>>> Stephan
>>>
>>>
>>> On Fri, Jan 6, 2017 at 7:27 PM, Chen Qin  wrote:
>>>
 Just noticed there are only two partitions per topic. Regardless of how
 large parallelism set. Only two of those will get partition assigned at
 most.

 Sent from my iPhone

 On Jan 6, 2017, at 02:40, Chakravarthy varaga 
 wrote:

 Hi All,

 Any updates on this?

 Best Regards
 CVP

 On Thu, Jan 5, 2017 at 1:21 PM, Chakravarthy varaga <
 chakravarth...@gmail.com> wrote:

>
> Hi All,
>
> I have a job as attached.
>
> I have a 16 Core blade running RHEL 7. The taskmanager default number
> of slots is set to 1. The source is a kafka stream and each of the 2
> sources(topic) have 2 partitions each.
>
>
> *What I notice is that when I deploy a job to run with #parallelism=2
> the total processing time doubles the time it took when the same job was
> deployed with #parallelism=1. It linearly increases with the parallelism.*
> Since the numberof slots is set to 1 per TM, I would assume that the
> job would be processed in parallel in 2 different TMs and that each
> consumer in each TM is connected to 1 partition of the topic. This
> therefore should have kept the overall processing time the same or less 
> !!!
>
> The co-flatmap connects the 2 streams & uses ValueState (checkpointed
> in FS). I think this is distributed among the TMs. My understanding is 
> that
> the search of values state could be costly between TMs.  Do you sense
> something wrong here?
>
> Best Regards
> CVP
>
>
>
>
>

>>>
>


Re: Changing parallelism

2017-01-10 Thread Fabian Hueske
Hi Abhishek,

state can be emitted from funtions as regular records. There is no way to
share state the local state of a task with other tasks of the same operator
or with other operators.
Flink's key-partitioned state is always scoped to the key of the current
record. It is not possible to iterate over all local state.
If your function implements the Checkpointed interface, you have one state
object for the whole function. In this case, you can see all local state.
However, the Checkpointed functions have the limitation that they cannot be
rescaled.

Best, Fabian

2017-01-08 17:01 GMT+01:00 abhishekrs :

> Hi Stephan,
>
> I want to pursue your idea. How do I emit state from an operator. Operator
> for me is a rich function. Or will I need a different style operator? I am
> unable to find how to iterate over all state - in open or otherwise (from
> an
> operator).
>
> Are there APIs to inspect the savepoints - using offline programs?
>
> -Abhishek-
>
>
>
> --
> View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/Changing-
> parallelism-tp4967p10911.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>