Re: Running on AWS/EMR/Yarn - where is the WebUI?

2016-08-15 Thread Chiwan Park
Hi Jon,

You can connect Flink Web UI via clicking ApplicationMaster link in YARN 
administrator UI.

Regards,
Chiwan Park

> On Aug 15, 2016, at 2:24 PM, Jon Yeargers <jon.yearg...@cedexis.com> wrote:
> 
> Working with a 3 node cluster. Started via YARN. 
> 
> If I go to port 8080 I see the Tomcat start screen. 8088 has the Yarn screen.
> 
> Didn't see anything obvious to start the UI in the bin folder.



Re: Code related to spilling data to disk

2016-06-22 Thread Chiwan Park
Hi,

I’m not sure about the reason to use JVM heap instead of managed memory, but It 
seems that the reason is using JVM heap makes development easier. Maybe Stephan 
can give exact answer to you. I think managed memory still has benefit in terms 
of GC  time and memory utilization.

The Flink community has a plan [1] to move data structures for streaming 
operators to managed memory.

[1]: 
https://docs.google.com/document/d/1ExmtVpeVVT3TIhO1JoBpC5JKXm-778DAD7eqw5GANwE/edit#

Regards,
Chiwan Park

> On Jun 22, 2016, at 8:39 PM, Tae-Geon Um <taegeo...@gmail.com> wrote:
> 
> Thank you for your answer to my question, Chiwan :)  
> Can I ask another question?  
> 
> 
>> On Jun 22, 2016, at 7:22 PM, Chiwan Park <chiwanp...@apache.org> wrote:
>> 
>> Hi Tae-Geon,
>> 
>> AFAIK, spilling *data* to disk happens only when managed memory is used. 
>> Currently, streaming API (DataStream) doesn’t use managed memory yet. 
>> `MutableHashTable` is one of representative usage of managed memory with 
>> disk spilling. Note that some special structures such as 
>> `CompactingHashTable` doesn’t spill data to disk even though they use the 
>> manage memory to achieve high performance.
> 
> As far as I understand, spilling data is only performed on batch mode. 
> Do you know why streaming mode does not use managed memory? 
> Is this because the performance gain is negligible?
> 
>> 
>> About spilling *states*, I think that it depends on how state backends is 
>> implemented. For example, `FsStateBackend` saves states to file system but 
>> `MemoryStateBackend` doesn’t. `RocksDBStateBackend` uses memory first and 
>> also can spill states to disk.
> 
> I’ve found a nice document on the state backend [1]. I will take a look at 
> this doc to know the detail. 
> Thanks! 
> 
> Taegeon
> 
> [1]: 
> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/state_backends.html#state-backends
> 
>> 
>> Regards,
>> Chiwan Park
>> 
>>> On Jun 22, 2016, at 3:27 PM, Tae-Geon Um <taegeo...@gmail.com> wrote:
>>> 
>>> I have another question. 
>>> Is the spilling only executed on batch mode? 
>>> What happen on streaming mode?  
>>> 
>>>> On Jun 22, 2016, at 1:48 PM, Tae-Geon Um <taegeo...@gmail.com> wrote:
>>>> 
>>>> Hi, all
>>>> 
>>>> As far as I know, Flink spills data (states?) to disk if the data exceeds 
>>>> memory threshold or there exists memory pressure.
>>>> i’d like to know the detail of how Flink spills data to disk. 
>>>> 
>>>> Could you please let me know which codes do I have to investigate? 
>>>> 
>>>> Thanks,
>>>> Taegeon
>>> 
>> 



Re: Code related to spilling data to disk

2016-06-22 Thread Chiwan Park
Hi Tae-Geon,

AFAIK, spilling *data* to disk happens only when managed memory is used. 
Currently, streaming API (DataStream) doesn’t use managed memory yet. 
`MutableHashTable` is one of representative usage of managed memory with disk 
spilling. Note that some special structures such as `CompactingHashTable` 
doesn’t spill data to disk even though they use the manage memory to achieve 
high performance.

About spilling *states*, I think that it depends on how state backends is 
implemented. For example, `FsStateBackend` saves states to file system but 
`MemoryStateBackend` doesn’t. `RocksDBStateBackend` uses memory first and also 
can spill states to disk.

Regards,
Chiwan Park

> On Jun 22, 2016, at 3:27 PM, Tae-Geon Um <taegeo...@gmail.com> wrote:
> 
> I have another question. 
> Is the spilling only executed on batch mode? 
> What happen on streaming mode?  
> 
>> On Jun 22, 2016, at 1:48 PM, Tae-Geon Um <taegeo...@gmail.com> wrote:
>> 
>> Hi, all
>> 
>> As far as I know, Flink spills data (states?) to disk if the data exceeds 
>> memory threshold or there exists memory pressure.
>> i’d like to know the detail of how Flink spills data to disk. 
>> 
>> Could you please let me know which codes do I have to investigate? 
>> 
>> Thanks,
>> Taegeon
> 



Re: S3 as streaming source

2016-06-03 Thread Chiwan Park
Hi all,

I think we can use `readFile`, `readFileStream` methods in 
`StreamExecutionEnvironment` to create streaming source from S3 because data 
are stored as file in S3. But I haven’t test it.

Regards,
Chiwan Park

> On Jun 3, 2016, at 2:37 PM, Tzu-Li (Gordon) Tai <tzuli...@gmail.com> wrote:
> 
> Hi Soumya,
> 
> No, currently there is no Flink standard supported S3 streaming source. As
> far as I know, there isn't one out in the public yet either. The community
> is open to submissions for new connectors, so if you happen to be working on
> one for S3, you can file up a JIRA to let us know.
> 
> Also, are you looking for a S3 streaming source that fetches S3 event
> notifications (ref:
> http://docs.aws.amazon.com/AmazonS3/latest/dev/NotificationHowTo.html), or
> streaming files / objects from S3 for a data stream program? I assume the
> first one, since otherwise writing Flink batch jobs will suit you more (the
> batch DataSet API already supports this).
> 
> 
> 
> --
> View this message in context: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/S3-as-streaming-source-tp7357p7358.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at 
> Nabble.com.



Re: Why Scala Option is not a valid key?

2016-05-30 Thread Chiwan Park
I’ve merged a patch [1] for this issue. Now we can use Option as a key.

[1]: 
https://git-wip-us.apache.org/repos/asf?p=flink.git;a=commit;h=c60326f85faaa38bcc359d555cd2d2818ef2e4e7

Regards,
Chiwan Park

> On Apr 5, 2016, at 2:08 PM, Chiwan Park <chiwanp...@apache.org> wrote:
> 
> I just found that Timur created a JIRA issue for this (FLINK-3698).
> 
> Regards,
> Chiwan Park
> 
>> On Mar 31, 2016, at 7:27 PM, Till Rohrmann <trohrm...@apache.org> wrote:
>> 
>> Actually I think that it’s not correct that the OptionType cannot be used as 
>> a key type. In fact it is similar to a composite type and should be usable 
>> as a key iff it’s element can be used as a key. Then we only have to provide 
>> an OptionTypeComparator which will compare the elements if they are set. If 
>> not, then the None element will be smaller, for example.
>> 
>> @Timur, if you want, then you can file a JIRA issue to add that.
>> 
>> Cheers,
>> Till
>> 
>> 
>> On Wed, Mar 30, 2016 at 7:17 PM, Timur Fayruzov <timur.fairu...@gmail.com> 
>> wrote:
>> Ok, I can't make Option comparable, so the only way that I see is to 
>> translate a key to a Comparable data structure and use it (as it was alluded 
>> to in your example above). Thank you for clarification!
>> 
>> Thanks,
>> Timur
>> 
>> On Wed, Mar 30, 2016 at 9:22 AM, Chiwan Park <chiwanp...@apache.org> wrote:
>> Hi Timur,
>> 
>> Sorry for confusing. I meant KeySelector.
>> 
>> `GenericType` could be used as a key type if the `T` implements 
>> `Comparable`. For example, `GenericType` could be used as a key 
>> type but `GenericType` could not.
>> 
>> About my example in previous mail, the type of key is `Int` because the 
>> return type of KeySelector is `Int`. `TypeInformation` is not generic 
>> type.
>> 
>> Regards,
>> Chiwan Park
>> 
>>> On Mar 31, 2016, at 1:09 AM, Timur Fayruzov <timur.fairu...@gmail.com> 
>>> wrote:
>>> 
>>> Thank you for your answers, Chiwan! That would mean that a generic type 
>>> can't be used as a key in general? This is a non-obvious limitation of 
>>> Flink DSL that I didn't see in documentation.
>>> 
>>> Could you please elaborate what you mean by KeyExtractor? I see that inside 
>>> `where` operator an instance of KeySelector is initialized, but I don't see 
>>> how can I pass a custom KeySelector in.
>>> 
>>> Thanks,
>>> Timur
>>> 
>>> On Wed, Mar 30, 2016 at 12:53 AM, Chiwan Park <chiwanp...@apache.org> wrote:
>>> Hi Timur,
>>> 
>>> Because Option[T] is not comparable type generally (if T is a POJO type), 
>>> you cannot use Option[T] as a key type. I think you have to implement 
>>> KeyExtractor to compare objects including Option[T]s.
>>> 
>>> ```
>>> case class MyKey(k1: Option[String], k2: Option[String])
>>> 
>>> val data1 = env.fromElements(MyKey(Some("a"), None), MyKey(Some("a"), 
>>> Some("c")))
>>> val data2 = env.fromElements(MyKey(Some("b"), None), MyKey(Some("a"), 
>>> Some("c")))
>>> 
>>> data1.join(data2)
>>>  .where(_.hashCode())
>>>  .equalTo(_.hashCode()).apply {
>>>(left: MyKey, right: MyKey) => (left, right)
>>>  }.print()
>>> ```
>>> 
>>> Note that the approach in example (using hashCode()) cannot be applied to 
>>> sort task.
>>> 
>>> Regards,
>>> Chiwan Park
>>> 
>>>> On Mar 30, 2016, at 2:37 AM, Timur Fayruzov <timur.fairu...@gmail.com> 
>>>> wrote:
>>>> 
>>>> There is some more detail to this question that I missed initially. It 
>>>> turns out that my key is a case class of a form MyKey(k1: Option[String], 
>>>> k2: Option[String]). Keys.SelectorFunctionKeys is performing a recursive 
>>>> check whether every element of the MyKey class can be a key and fails when 
>>>> encountering an Option.
>>>> 
>>>> Is it possible to work around this situation without giving up Options? 
>>>> Inability to use Options in Domain objects could be really frustrating.
>>>> 
>>>> Thanks,
>>>> Timur
>>>> 
>>>> On Tue, Mar 29, 2016 at 10:19 AM, Timur Fayruzov 
>>>> <timur.fairu...@gmail.com> wrote:
>>>> Hello,
>>>> 
>>>> I'm evaluating Flink and one thing I noticed is Option[A] can't be used as 
>>>> a key for coGroup (looking specifically here: 
>>>> https://github.com/apache/flink/blob/master/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionTypeInfo.scala#L39).
>>>>  I'm not clear about the reason of this and appreciate if someone can 
>>>> explain.
>>>> 
>>>> Thanks,
>>>> Timur
>>>> 
>>> 
>>> 
>> 
>> 
>> 
> 



Re: Write matrix/vector

2016-05-30 Thread Chiwan Park
Hi Lydia,

`FlinkMLTools.persist` method is used to save ML models and can be used to save 
Matrix and Vector object. Note that the method uses TypeSerializerOutputFormat 
which is binary output format.

Regards,
Chiwan Park

> On May 30, 2016, at 11:31 AM, Lydia Ickler <ickle...@googlemail.com> wrote:
> 
> Hi,
> 
> I would like to know how to write a Matrix or Vector (Dense/Sparse) to file?
> 
> Thanks in advance!
> Best regards,
> Lydia
> 



Re: Discarding header from CSV file

2016-04-27 Thread Chiwan Park
It seems that type of `buildingManager` is not matched to CSV column. In source 
code, `buildingManager` is defined as `Int`. But in your CSV file, it starts 
with a character `M`.

I succeeded in running the code with your CSV file after changing the type of 
`buildingManager` to `String`.

Regards,
Chiwan Park

> On Apr 28, 2016, at 9:51 AM, nsengupta <sengupta.nirma...@gmail.com> wrote:
> 
> Hello Chiwan,
> 
> Yes, that's an oversight on my part. In my hurry, I didn't even try to
> explore the source of that /Exception/. Thanks, again.
> 
> However, I still don't know why I am not being able to read the CSV file. As
> the output shows, using standard IO routines, I can read the same file
> anyway. 
> 
> Could you spot my mistake?
> 
> -- Nirmalya
> 
> 
> 
> --
> View this message in context: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Discarding-header-from-CSV-file-tp6474p6519.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at 
> Nabble.com.



Re: Discarding header from CSV file

2016-04-27 Thread Chiwan Park
Hi,

You don’t need to call execute() method after calling print() method. print() 
method triggers the execution. The exception is raised because you call 
execute() after print() method.

Regards,
Chiwan Park

> On Apr 27, 2016, at 6:35 PM, nsengupta <sengupta.nirma...@gmail.com> wrote:
> 
> Till,
> 
> Thanks for looking into this.
> 
> I have removed the toList() from the collect() function, to align the code
> with what I generally do in a Flink application. It throws an Exception, and
> I can't figure out why.
> 
> *Here's my code (shortened for brevity):*
> 
> case class BuildingInformation(buildingID: Int, buildingManager: Int,
> buildingAge: Int, productID: String, country: String)
> 
> object HVACReadingsAnalysis {
> 
>  def main(args: Array[String]): Unit = {
> 
>val envDefault = ExecutionEnvironment.getExecutionEnvironment
> 
>val buildings =
> readBuildingInfo(envDefault,"./SensorFiles/building.csv")
> 
>buildings.print
> 
>envDefault.execute("HVAC Simulation")
>  }
> 
>  private def readBuildingInfo(env: ExecutionEnvironment, inputPath: String)
> = {
> 
>   // [NS]: I can see the lines, read correctly from the CSV file here
>println("As read from CSV file")
>println(Source.fromFile(inputPath).getLines.toList.mkString("#\n"))
> 
>// [NS]: Then, I read the same file using the library function
>   env.readCsvFile [BuildingInformation] (
>  inputPath,
>  ignoreFirstLine = true,
>  pojoFields =
> Array("buildingID","buildingManager","buildingAge","productID","country")
>)
>  }
> 
> 
> *Relevant portion of the output:
> *
> As read from CSV file
> BuildingID,BuildingMgr,BuildingAge,HVACproduct,Country#
> 1,M1,25,AC1000,USA#
> 2,M2,27,FN39TG,France#
> 3,M3,28,JDNS77,Brazil#
> 4,M4,17,GG1919,Finland#
> 5,M5,3,ACMAX22,Hong Kong#
> 6,M6,9,AC1000,Singapore#
> 7,M7,13,FN39TG,South Africa#
> 8,M8,25,JDNS77,Australia#
> 9,M9,11,GG1919,Mexico#
> 10,M10,23,ACMAX22,China#
> 11,M11,14,AC1000,Belgium#
> 12,M12,26,FN39TG,Finland#
> 13,M13,25,JDNS77,Saudi Arabia#
> 14,M14,17,GG1919,Germany#
> 15,M15,19,ACMAX22,Israel#
> 16,M16,23,AC1000,Turkey#
> 17,M17,11,FN39TG,Egypt#
> 18,M18,25,JDNS77,Indonesia#
> 19,M19,14,GG1919,Canada#
> 20,M20,19,ACMAX22,Argentina
> 15:34:18,914 INFO  org.apache.flink.api.java.ExecutionEnvironment 
>   
> - The job has 0 registered types and 0 default Kryo serializers
> 15:34:19,104 INFO  org.apache.flink.runtime.minicluster.FlinkMiniCluster  
>   
> - Starting FlinkMiniCluster.
> 15:34:19,912 INFO  akka.event.slf4j.Slf4jLogger   
>   
> - Slf4jLogger started
> 
> 
> // ..
> // ... more log statements
> // ..
> 
> Exception in thread "main" java.lang.RuntimeException: No new data sinks
> have been defined since the last execution. The last execution refers to the
> latest call to 'execute()', 'count()', 'collect()', or 'print()'.
>   at
> org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:979)
>   at
> org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:961)
>   at
> org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:84)
>   at
> org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:652)
>   at
> main.scala.hortonworks.tutorial.HVACReadingsAnalysis$.main(HVACReadingsAnalysis.scala:60)
>   at
> main.scala.hortonworks.tutorial.HVACReadingsAnalysis.main(HVACReadingsAnalysis.scala)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:497)
>   at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
> 
> Process finished with exit code 1
> 
> 
> 
> 
> --
> View this message in context: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Discarding-header-from-CSV-file-tp6474p6494.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at 
> Nabble.com.



Re: Discarding header from CSV file

2016-04-26 Thread Chiwan Park
Hi, Nirmalya

I recommend readCsvFile() method rather than readTextFile() to read CSV file. 
readCsvFile() provides some features for CSV file such as ignoreFirstLine() 
(what you are looking for), ignoreComments(), and etc.

If you have to use readTextFile() method, I think, you can ignore column 
headers by calling zipWithIndex method and filtering it based on the index.

Regards,
Chiwan Park

> On Apr 27, 2016, at 10:32 AM, nsengupta <sengupta.nirma...@gmail.com> wrote:
> 
> What is the recommended way of discarding the Column Header(s) from a CSV
> file, if I am using
> 
> /environment.readTextFile()
> /
> facility? Obviously, we don't know beforehand, which of the nodes will read
> the Header(s)? So, we cannot use usual tricks like drop(1)?
> 
> I don't recall well: has this been discussed and closed earlier in this
> forum? If so, can someone point that out to me please?
> 
> -- Nirmalya
> 
> 
> 
> --
> View this message in context: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Discarding-header-from-CSV-file-tp6474.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at 
> Nabble.com.



Re: Integrate Flink with S3 on EMR cluster

2016-04-05 Thread Chiwan Park
Hi Timur,

Great! Bootstrap action for Flink is good for AWS users. I think the bootstrap 
action scripts would be placed in `flink-contrib` directory.

If you want, one of people in PMC of Flink will be assign FLINK-1337 to you.

Regards,
Chiwan Park

> On Apr 6, 2016, at 3:36 AM, Timur Fayruzov <timur.fairu...@gmail.com> wrote:
> 
> I had a guide like that.
> 



Re: Why Scala Option is not a valid key?

2016-04-04 Thread Chiwan Park
I just found that Timur created a JIRA issue for this (FLINK-3698).

Regards,
Chiwan Park

> On Mar 31, 2016, at 7:27 PM, Till Rohrmann <trohrm...@apache.org> wrote:
> 
> Actually I think that it’s not correct that the OptionType cannot be used as 
> a key type. In fact it is similar to a composite type and should be usable as 
> a key iff it’s element can be used as a key. Then we only have to provide an 
> OptionTypeComparator which will compare the elements if they are set. If not, 
> then the None element will be smaller, for example.
> 
> @Timur, if you want, then you can file a JIRA issue to add that.
> 
> Cheers,
> Till
> 
> 
> On Wed, Mar 30, 2016 at 7:17 PM, Timur Fayruzov <timur.fairu...@gmail.com> 
> wrote:
> Ok, I can't make Option comparable, so the only way that I see is to 
> translate a key to a Comparable data structure and use it (as it was alluded 
> to in your example above). Thank you for clarification!
> 
> Thanks,
> Timur
> 
> On Wed, Mar 30, 2016 at 9:22 AM, Chiwan Park <chiwanp...@apache.org> wrote:
> Hi Timur,
> 
> Sorry for confusing. I meant KeySelector.
> 
> `GenericType` could be used as a key type if the `T` implements 
> `Comparable`. For example, `GenericType` could be used as a key type 
> but `GenericType` could not.
> 
> About my example in previous mail, the type of key is `Int` because the 
> return type of KeySelector is `Int`. `TypeInformation` is not generic 
> type.
> 
> Regards,
> Chiwan Park
> 
> > On Mar 31, 2016, at 1:09 AM, Timur Fayruzov <timur.fairu...@gmail.com> 
> > wrote:
> >
> > Thank you for your answers, Chiwan! That would mean that a generic type 
> > can't be used as a key in general? This is a non-obvious limitation of 
> > Flink DSL that I didn't see in documentation.
> >
> > Could you please elaborate what you mean by KeyExtractor? I see that inside 
> > `where` operator an instance of KeySelector is initialized, but I don't see 
> > how can I pass a custom KeySelector in.
> >
> > Thanks,
> > Timur
> >
> > On Wed, Mar 30, 2016 at 12:53 AM, Chiwan Park <chiwanp...@apache.org> wrote:
> > Hi Timur,
> >
> > Because Option[T] is not comparable type generally (if T is a POJO type), 
> > you cannot use Option[T] as a key type. I think you have to implement 
> > KeyExtractor to compare objects including Option[T]s.
> >
> > ```
> > case class MyKey(k1: Option[String], k2: Option[String])
> >
> > val data1 = env.fromElements(MyKey(Some("a"), None), MyKey(Some("a"), 
> > Some("c")))
> > val data2 = env.fromElements(MyKey(Some("b"), None), MyKey(Some("a"), 
> > Some("c")))
> >
> > data1.join(data2)
> >   .where(_.hashCode())
> >   .equalTo(_.hashCode()).apply {
> > (left: MyKey, right: MyKey) => (left, right)
> >   }.print()
> > ```
> >
> > Note that the approach in example (using hashCode()) cannot be applied to 
> > sort task.
> >
> > Regards,
> > Chiwan Park
> >
> > > On Mar 30, 2016, at 2:37 AM, Timur Fayruzov <timur.fairu...@gmail.com> 
> > > wrote:
> > >
> > > There is some more detail to this question that I missed initially. It 
> > > turns out that my key is a case class of a form MyKey(k1: Option[String], 
> > > k2: Option[String]). Keys.SelectorFunctionKeys is performing a recursive 
> > > check whether every element of the MyKey class can be a key and fails 
> > > when encountering an Option.
> > >
> > > Is it possible to work around this situation without giving up Options? 
> > > Inability to use Options in Domain objects could be really frustrating.
> > >
> > > Thanks,
> > > Timur
> > >
> > > On Tue, Mar 29, 2016 at 10:19 AM, Timur Fayruzov 
> > > <timur.fairu...@gmail.com> wrote:
> > > Hello,
> > >
> > > I'm evaluating Flink and one thing I noticed is Option[A] can't be used 
> > > as a key for coGroup (looking specifically here: 
> > > https://github.com/apache/flink/blob/master/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionTypeInfo.scala#L39).
> > >  I'm not clear about the reason of this and appreciate if someone can 
> > > explain.
> > >
> > > Thanks,
> > > Timur
> > >
> >
> >
> 
> 
> 



Re: building for Scala 2.11

2016-04-04 Thread Chiwan Park
Hi Andrew,

The method to build Flink with Scala 2.11 is described in Flink documentation 
[1].

I think this is not relevant but just FYI, to build your application based on 
Flink 1.0 (or later) with Scala 2.11, you should be careful to set Flink 
dependencies. There is a guide in Wiki [2].

[1]: 
https://ci.apache.org/projects/flink/flink-docs-master/setup/building.html#scala-versions
[2]: 
https://cwiki.apache.org/confluence/display/FLINK/Maven+artifact+names+suffixed+with+Scala+version

Regards,
Chiwan Park

> On Apr 5, 2016, at 9:40 AM, Andrew Gaydenko <andrew.gayde...@gmail.com> wrote:
> 
> Hi!
> 
> How to build the project for Scala 2.11?
> -- 
> 
> Regards,
> Andrew



Re: Implicit inference of TypeInformation for join keys

2016-03-30 Thread Chiwan Park
Hi Timur,

You have to use `createTypeInfomation` method in `org.apache.flink.api` package 
to create TypeInformation object for Scala-specific objects such as case 
classes, tuples, eithers, options. For example:

```
import org.apache.flink.api.scala._ // to import package object

val a: DataSet[Thing] = …
val b: DataSet[Thing] = …

a.coGroup(b)
  .where(e => (e.f1, e.f2))
  .equalTo(e => (e.f1, e.f2))(createTypeInformation[(String, String)]) {
(left, right) => 1
  }.print()
```

Note that Flink creates internally copied 2-tuples consisted of (extracted key 
by KeySelector, original value). So there is some performance decrease when you 
are using KeySelector.

Regards,
Chiwan Park

> On Mar 31, 2016, at 12:58 AM, Timur Fayruzov <timur.fairu...@gmail.com> wrote:
> 
> Thank you Chiwan! Yes, I understand that there are workarounds that don't use 
> function argument (and thus do not require implicit arguments). I try to 
> avoid positional and string-based keys because there is no compiler 
> guarantees when you refactor or accidentally change the underlying case 
> classes. Providing a function is the cleanest solution (and arguably is the 
> most readable) so it'd be great to make it work.
> 
> BTW, TypeInformation.of has an implementation that takes TypeHint 
> (https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeHint.java)
>  which, according to documentation, is supposed to be used for generic 
> classes, but using it still leads to the same exception.
> 
> Thanks,
> Timur
> 
> 
> On Wed, Mar 30, 2016 at 12:05 AM, Chiwan Park <chiwanp...@apache.org> wrote:
> Hi Timur,
> 
> You can use a composite key [1] to compare keys consisting of multiple 
> fields. For example:
> 
> ```
> val a = env.fromCollection(Seq(Thing("a", "b"), Thing("c", "d")))
> val b = env.fromCollection(Seq(Thing("a", "x"), Thing("z", "m")))
> a.coGroup(b)
>   .where(“f1”, “f2”) // Flink compares the values of f1 first, and compares 
> the values of f2 if values of f1 are same.
>   .equalTo(“f1”, “f2”) { // Note that you must specify same number of keys
> (left, right) => 1
>   }
> ```
> 
> Composite key can be applied to Scala tuple also:
> 
> ```
> val a = env.fromCollection(Seq(("a", "b"), ("c", "d")))
> val b = env.fromCollection(Seq(("a", "x"), ("z", "m")))
> a.coGroup(b)
>   .where(0, 1) // Note that field numbers start from 0.
>   .equalTo(0, 1) {
> (left, right) => 1
>   }
> ```
> 
> I hope this helps.
> 
> [1]: 
> https://ci.apache.org/projects/flink/flink-docs-master/apis/common/index.html#define-keys-for-tuples
> 
> Regards,
> Chiwan Park
> 
> > On Mar 30, 2016, at 3:54 AM, Timur Fayruzov <timur.fairu...@gmail.com> 
> > wrote:
> >
> > Hello,
> >
> > Another issue I have encountered is incorrect implicit resolution (I'm 
> > using Scala 2.11.7). Here's the example (with a workaround):
> > val a = env.fromCollection(Seq(Thing("a", "b"), Thing("c", "d")))
> > val b = env.fromCollection(Seq(Thing("a", "x"), Thing("z", "m")))
> > a.coGroup(b)
> >   .where(e => e.f1)
> >   //.equalTo(e => e) { //this fails to compile because equalTo expects an 
> > implicit
> >   .equalTo("f1") {
> > (left, right) => 1
> >   }
> > However, the workaround does not quite work when key is a tuple (I suspect 
> > this applies to other generic classes as well):
> > val a = env.fromCollection(Seq(Thing("a", "b"), Thing("c", "d")))
> > val b = env.fromCollection(Seq(Thing("a", "x"), Thing("z", "m")))
> > a.coGroup(b)
> >   .where(e => (e.f1, e.f2))
> >   .equalTo(e => (e.f1, e.f2))(TypeInformation.of(classOf[(String, 
> > String)])) { (left, right) => 1} // throws InvalidProgramException
> > Here, I try to provide the implicit TypeInformation explicitly, but 
> > apparently it's not compatible with the way implicit inference is done. 
> > (TypeInformation I generate is GenericType, while 
> > scala.Tuple2<String, String> is expected).
> >
> > Now, I can split this in 2 operations like below:
> > val tmp = a.coGroup(b)
> >   .where(e => (e.f1, e.f2))
> >   .equalTo(e => (e.f1, e.f2))
> >
> > tmp { (left, right) => 1}
> > but, I would like to avoid adding clutter to my processing logic, and it's 
> > not entirely clear to me how this would be scheduled.
> >
> > As an option, I can hash the hell out of my keys like that:
> > a.coGroup(b)
> >   .where(e => (e.f1, e.f2).hashCode)
> >   .equalTo(e => (e.f1, e.f2).hashCode)(TypeInformation.of(classOf[Int])){ 
> > (left, right) => 1}
> > but that, again, adds some indirection and clutter, not mentioning the 
> > hassle of dealing with collisions (which can be alleviated by using fancy 
> > hashes, but I'd like to avoid that).
> >
> > Any insights on what is the way to go here are highly appreciated.
> >
> > Thanks,
> > Timur
> 
> 



Re: Why Scala Option is not a valid key?

2016-03-30 Thread Chiwan Park
Hi Timur,

Because Option[T] is not comparable type generally (if T is a POJO type), you 
cannot use Option[T] as a key type. I think you have to implement KeyExtractor 
to compare objects including Option[T]s.

```
case class MyKey(k1: Option[String], k2: Option[String])

val data1 = env.fromElements(MyKey(Some("a"), None), MyKey(Some("a"), 
Some("c")))
val data2 = env.fromElements(MyKey(Some("b"), None), MyKey(Some("a"), 
Some("c")))

data1.join(data2)
  .where(_.hashCode())
  .equalTo(_.hashCode()).apply {
(left: MyKey, right: MyKey) => (left, right)
  }.print()
```

Note that the approach in example (using hashCode()) cannot be applied to sort 
task.

Regards,
Chiwan Park

> On Mar 30, 2016, at 2:37 AM, Timur Fayruzov <timur.fairu...@gmail.com> wrote:
> 
> There is some more detail to this question that I missed initially. It turns 
> out that my key is a case class of a form MyKey(k1: Option[String], k2: 
> Option[String]). Keys.SelectorFunctionKeys is performing a recursive check 
> whether every element of the MyKey class can be a key and fails when 
> encountering an Option.
> 
> Is it possible to work around this situation without giving up Options? 
> Inability to use Options in Domain objects could be really frustrating.
> 
> Thanks,
> Timur
> 
> On Tue, Mar 29, 2016 at 10:19 AM, Timur Fayruzov <timur.fairu...@gmail.com> 
> wrote:
> Hello,
> 
> I'm evaluating Flink and one thing I noticed is Option[A] can't be used as a 
> key for coGroup (looking specifically here: 
> https://github.com/apache/flink/blob/master/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionTypeInfo.scala#L39).
>  I'm not clear about the reason of this and appreciate if someone can explain.
> 
> Thanks,
> Timur 
> 



Re: Implicit inference of TypeInformation for join keys

2016-03-30 Thread Chiwan Park
Hi Timur,

You can use a composite key [1] to compare keys consisting of multiple fields. 
For example:

```
val a = env.fromCollection(Seq(Thing("a", "b"), Thing("c", "d")))
val b = env.fromCollection(Seq(Thing("a", "x"), Thing("z", "m")))
a.coGroup(b)
  .where(“f1”, “f2”) // Flink compares the values of f1 first, and compares the 
values of f2 if values of f1 are same.
  .equalTo(“f1”, “f2”) { // Note that you must specify same number of keys 
(left, right) => 1
  }
```

Composite key can be applied to Scala tuple also:

```
val a = env.fromCollection(Seq(("a", "b"), ("c", "d")))
val b = env.fromCollection(Seq(("a", "x"), ("z", "m")))
a.coGroup(b)
  .where(0, 1) // Note that field numbers start from 0.
  .equalTo(0, 1) {
(left, right) => 1
  }
```

I hope this helps.

[1]: 
https://ci.apache.org/projects/flink/flink-docs-master/apis/common/index.html#define-keys-for-tuples

Regards,
Chiwan Park

> On Mar 30, 2016, at 3:54 AM, Timur Fayruzov <timur.fairu...@gmail.com> wrote:
> 
> Hello,
> 
> Another issue I have encountered is incorrect implicit resolution (I'm using 
> Scala 2.11.7). Here's the example (with a workaround):
> val a = env.fromCollection(Seq(Thing("a", "b"), Thing("c", "d")))
> val b = env.fromCollection(Seq(Thing("a", "x"), Thing("z", "m")))
> a.coGroup(b)
>   .where(e => e.f1)
>   //.equalTo(e => e) { //this fails to compile because equalTo expects an 
> implicit
>   .equalTo("f1") {
> (left, right) => 1
>   }
> However, the workaround does not quite work when key is a tuple (I suspect 
> this applies to other generic classes as well):
> val a = env.fromCollection(Seq(Thing("a", "b"), Thing("c", "d")))
> val b = env.fromCollection(Seq(Thing("a", "x"), Thing("z", "m")))
> a.coGroup(b)
>   .where(e => (e.f1, e.f2))
>   .equalTo(e => (e.f1, e.f2))(TypeInformation.of(classOf[(String, String)])) 
> { (left, right) => 1} // throws InvalidProgramException
> Here, I try to provide the implicit TypeInformation explicitly, but 
> apparently it's not compatible with the way implicit inference is done. 
> (TypeInformation I generate is GenericType, while 
> scala.Tuple2<String, String> is expected).
> 
> Now, I can split this in 2 operations like below:  
> val tmp = a.coGroup(b)
>   .where(e => (e.f1, e.f2))
>   .equalTo(e => (e.f1, e.f2))
> 
> tmp { (left, right) => 1}
> but, I would like to avoid adding clutter to my processing logic, and it's 
> not entirely clear to me how this would be scheduled.
> 
> As an option, I can hash the hell out of my keys like that:
> a.coGroup(b)
>   .where(e => (e.f1, e.f2).hashCode)
>   .equalTo(e => (e.f1, e.f2).hashCode)(TypeInformation.of(classOf[Int])){ 
> (left, right) => 1}
> but that, again, adds some indirection and clutter, not mentioning the hassle 
> of dealing with collisions (which can be alleviated by using fancy hashes, 
> but I'd like to avoid that).
> 
> Any insights on what is the way to go here are highly appreciated.
> 
> Thanks,
> Timur



Re: for loop slow

2016-03-27 Thread Chiwan Park
Hi Lydia,

To build iterative algorithm on Flink, using API for iterations [1] would be 
better than using for-loop. Your program triggers multiple executions by 
multiple calling `next.gap.print()`. In each execution, Flink reads whole data 
redundantly and it cause performance to decrease.

Regards,
Chiwan Park

[1]: 
https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/iterations.html

> On Mar 27, 2016, at 7:16 AM, Lydia Ickler <ickle...@googlemail.com> wrote:
> 
> Hi,
> 
> I have an issue with a for-loop.
> If I set the maximal iteration number i to more than 3 it gets stuck and I 
> cannot figure out why.
> With 1, 2 or 3 it runs smoothly.
> I attached the code below and marked the loop with //PROBLEM.
> 
> Thanks in advance!
> Lydia
> 
> package org.apache.flink.contrib.lifescience.examples;
> 
> import edu.princeton.cs.algs4.Graph;
> import edu.princeton.cs.algs4.SymbolDigraph;
> import org.apache.flink.api.common.functions.FilterFunction;
> import org.apache.flink.api.common.functions.FlatJoinFunction;
> import org.apache.flink.api.common.functions.MapFunction;
> import org.apache.flink.api.java.DataSet;
> import org.apache.flink.api.java.ExecutionEnvironment;
> import org.apache.flink.api.java.aggregation.Aggregations;
> import org.apache.flink.api.java.io.CsvReader;
> import org.apache.flink.api.java.operators.DataSource;
> import org.apache.flink.api.java.operators.IterativeDataSet;
> import org.apache.flink.api.java.tuple.Tuple2;
> import org.apache.flink.api.java.tuple.Tuple3;
> import org.apache.flink.contrib.lifescience.networks.algos.DataSetUtils;
> import 
> org.apache.flink.contrib.lifescience.networks.datatypes.networks.Network;
> import 
> org.apache.flink.contrib.lifescience.networks.datatypes.networks.NetworkEdge;
> import 
> org.apache.flink.contrib.lifescience.networks.datatypes.networks.NetworkNode;
> import org.apache.flink.core.fs.FileSystem;
> import org.apache.flink.util.Collector;
> 
> import java.util.*;
> 
> import static edu.princeton.cs.algs4.GraphGenerator.simple;
> 
> public class PowerIteration {
> 
> //path to input
> static String input = null;
> //path to output
> static String output = null;
> //number of iterations (default = 7)
> static int iterations = 7;
> //threshold
> static double delta = 0.01;
> 
> public void run() throws Exception {
> ExecutionEnvironment env = 
> ExecutionEnvironment.getExecutionEnvironment();
> 
> //read input file
> DataSet<Tuple3<Integer, Integer, Double>> matrixA = readMatrix(env, 
> input);
> 
> DataSet<Tuple3<Integer, Integer, Double>> eigenVector;
> DataSet<Tuple3<Integer, Integer, Double>> eigenValue;
> 
> //initial:
> //Approximate EigenVector by PowerIteration
> eigenVector = PowerIteration_getEigenVector(matrixA);
> //Approximate EigenValue by PowerIteration
> eigenValue = PowerIteration_getEigenValue(matrixA,eigenVector);
> //Deflate original matrix
> matrixA = 
> PowerIteration_getNextMatrix(matrixA,eigenVector,eigenValue);
> 
> MyResult initial = new MyResult(eigenVector,eigenValue,matrixA);
> 
> MyResult next = null;
> 
> //PROBLEM!!! get i eigenvalue gaps
> for(int i=0;i<2;i++){
> next = PowerIteration_routine(initial);
> initial = next;
> next.gap.print();
> }
> 
> env.execute("Power Iteration");
> }
> 
> public static DataSource<Tuple3<Integer, Integer, Double>> 
> readMatrix(ExecutionEnvironment env,
>   
> String filePath) {
> CsvReader csvReader = env.readCsvFile(filePath);
> csvReader.fieldDelimiter(",");
> csvReader.includeFields("ttt");
> return csvReader.types(Integer.class, Integer.class, Double.class);
> }
> 
> public static final class ProjectJoinResultMapper implements
> MapFunction<Tuple2<Tuple3<Integer, Integer, Double>,
> Tuple3<Integer, Integer, Double>>,
> Tuple3<Integer, Integer, Double>> {
> @Override
> public Tuple3<Integer, Integer, Double> map(
> Tuple2<Tuple3<Integer, Integer, Double>, Tuple3<Integer, 
> Integer, Double>> value)
> throws Exception {
> Integer row = value.f0.f0;
> Integer column = value.f1.f1;
> Double product = value.f0.f2 * value.f1.

Re: override file in flink

2016-03-22 Thread Chiwan Park
Hi subash,

You can pass WriteMode in second parameter of write* method. For example:

```
DataStream<…> myStream = …;
myStream.writeAsCsv(“path of output”, FileSystem.WriteMode.OVERWRITE);
```

I hope this helps.

Regards,
Chiwan Park

> On Mar 22, 2016, at 8:18 PM, subash basnet <yasub...@gmail.com> wrote:
> 
> Hello all,
> 
> I am trying to write the streaming data to file and update it recurrently 
> with the streaming data. I get the following unable to override exception 
> error:
> 
> Caused by: java.io.IOException: File or directory already exists. Existing 
> files and directories are not overwritten in NO_OVERWRITE mode. Use OVERWRITE 
> mode to overwrite existing files and directories.
>   at 
> org.apache.flink.core.fs.FileSystem.initOutPathLocalFS(FileSystem.java:594)
>   at 
> org.apache.flink.api.common.io.FileOutputFormat.open(FileOutputFormat.java:227)
>   at 
> org.apache.flink.api.java.io.CsvOutputFormat.open(CsvOutputFormat.java:160)
>   at 
> org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction.open(OutputFormatSinkFunction.java:60)
>   at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:314)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:214)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>   at java.lang.Thread.run(Thread.java:745)
> 
> 
> 
> How to set the necessary override property?
> 
> Best Regards,
> Subash Basnet
> 



Re: DataStream, Sink and JDBC

2016-03-07 Thread Chiwan Park
Hi Toletum,

You can initialize a JDBC connection with RichSinkFunction [1]. There are two 
methods, `open` and `close`. The `open` method is called once before calling 
`invoke` method. The `close` method is called lastly.

Note that you should add `transient` keyword to the JDBC connection object.

Regards,
Chiwan Park

[1]: 
https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/sink/RichSinkFunction.html

> On Mar 7, 2016, at 10:08 PM, tole...@toletum.org wrote:
> 
> Hi!
> I'm doing a process which reads from kafka, makes some things... and after 
> writes on Database (NEO4J). I can read from kafka, and make some things 
> But... I have problems with write on Database (JDBC).
> I tried use a SinkFunction It works, but it create a connection each 
> invoke method is called.
> 
> 
> 
> DataStream messageStream = this.env.addSource(new 
> FlinkKafkaConsumer082<>(properties.getProperty("topic"), new 
> SimpleStringSchema(), properties));
> 
> 
> messageStream.map(new StreamingCrimeSplitter 
> .filter(new filterFunction())
> .keyBy(1);
> .addSink(new sinkFunction());
> 
> 
> 
> 
> public class sinkFunction
> implements SinkFunction<Tuple7<String, String, String, String, String, 
> String,String>> {
> private static final long serialVersionUID = 2859601213304525959L;
> @Override
> public void invoke(Tuple7<String, String, String, String, String, 
> String, String> crime) throws Exception {
> System.out.println(crime.f0);
> //JDBC connection
> }
> }
> 
> 
> 
> Somebody knows how I could do just one connection? I tried to do in the 
> Constructor but the JDBC is not serializable.
> 
> 
> Thanks
> Toletum
> 
> 



Re: where can get the summary changes between flink-1.0 and flink-0.10

2016-02-17 Thread Chiwan Park
We’re testing a release candidate for 1.0 [1] currently. You can use new 
features I’m not sure because I’m not in PMC of Flink but I think we can 
release in a month.

Regards,
Chiwan Park

[1]: 
http://mail-archives.apache.org/mod_mbox/flink-user/201602.mbox/%3CCAGr9p8AkiT0CT_YBwMhHCUYmoC9Stw%3DLZzkNs2iRLNJ5rLMzdA%40mail.gmail.com%3E

> On Feb 17, 2016, at 5:08 PM, wangzhijiang999 <wangzhijiang...@aliyun.com> 
> wrote:
> 
> Hi Chiwan,
> 
>   Thank you for instant reply, when will the official flink-1.0 be 
> released, can you give a rough estimate?  I am interested in the new feature 
> of flink-1.0 like operator uid in order to solve my current problem.
> 
> Regards,
> 
> Zhijiang Wang
> 
> ------
> 发件人:Chiwan Park <chiwanp...@apache.org>
> 发送时间:2016年2月17日(星期三) 14:43
> 收件人:user <user@flink.apache.org>,wangzhijiang999 <wangzhijiang...@aliyun.com>
> 主 题:Re: where can get the summary changes between flink-1.0 and flink-0.10
> 
> Hi Zhijiang,
> 
> We have wiki pages about description of Flink 1.0 relesase [1] [2]. But the 
> pages are not updated in realtime. It is possible that there are some changes 
> that haven’t been described.
> 
> After releasing 1.0 officially, maybe we post an article dealing with the 
> changes in 1.0 to the Flink blog [3].
> 
> Regards,
> Chiwan Park
> 
> [1]: https://cwiki.apache.org/confluence/display/FLINK/1.0+Release
> [2]: 
> https://cwiki.apache.org/confluence/display/FLINK/Maven+artifact+names+suffixed+with+Scala+version
> [3]: http://flink.apache.org/blog/
> 
> > On Feb 17, 2016, at 3:34 PM, wangzhijiang999 <wangzhijiang...@aliyun.com> 
> > wrote:
> > 
> > Hi,
> > Where can get the summary changes between flink-1.0 and flink-0.10,  
> > thank you in advance!
> > 
> >  
> > 
> > 
> > 
> > Best Regards,
> > 
> > Zhijiang Wang
> 



Re: where can get the summary changes between flink-1.0 and flink-0.10

2016-02-16 Thread Chiwan Park
Hi Zhijiang,

We have wiki pages about description of Flink 1.0 relesase [1] [2]. But the 
pages are not updated in realtime. It is possible that there are some changes 
that haven’t been described.

After releasing 1.0 officially, maybe we post an article dealing with the 
changes in 1.0 to the Flink blog [3].

Regards,
Chiwan Park

[1]: https://cwiki.apache.org/confluence/display/FLINK/1.0+Release
[2]: 
https://cwiki.apache.org/confluence/display/FLINK/Maven+artifact+names+suffixed+with+Scala+version
[3]: http://flink.apache.org/blog/

> On Feb 17, 2016, at 3:34 PM, wangzhijiang999 <wangzhijiang...@aliyun.com> 
> wrote:
> 
> Hi,
> Where can get the summary changes between flink-1.0 and flink-0.10,  
> thank you in advance!
> 
>  
> 
> 
> 
> Best Regards,
> 
> Zhijiang Wang



Re: Join two Datasets --> InvalidProgramException

2016-02-09 Thread Chiwan Park
I wrote a sample inherited POJO example [1]. The example works with Flink 
0.10.1 and 1.0-SNAPSHOT.

[1]: https://gist.github.com/chiwanpark/0389ce946e4fff58d611

Regards,
Chiwan Park

> On Feb 9, 2016, at 8:07 PM, Fabian Hueske <fhue...@gmail.com> wrote:
> 
> What is the type of sessionId? 
> It must be a key type in order to be used as key. If it is a generic class, 
> it must implement Comparable to be used as key.
> 
> 2016-02-09 11:53 GMT+01:00 Dominique Rondé <dominique.ro...@codecentric.de>:
> The fields in SourceA and SourceB are private but have public getters and 
> setters. The classes provide an empty and public constructor.
> 
> Am 09.02.2016 11:47 schrieb "Chiwan Park" <chiwanp...@apache.org>:
> Oh, the fields in SourceA have public getters. Does the fields in SourceA 
> have public setter? SourceA needs public setter for private fields.
> 
> Regards,
> Chiwan Park
> 
> > On Feb 9, 2016, at 7:45 PM, Chiwan Park <chiwanp...@apache.org> wrote:
> >
> > Hi Dominique,
> >
> > It seems that `SourceA` is not dealt as POJO. Are all fields in SourceA 
> > public? There are some requirements for POJO classes [1].
> >
> > [1]: 
> > https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/programming_guide.html#pojos
> >
> > Regards,
> > Chiwan Park
> >
> >> On Feb 9, 2016, at 7:42 PM, Dominique Rondé 
> >> <dominique.ro...@codecentric.de> wrote:
> >>
> >> Hi  folks,
> >>
> >> i try to join two datasets containing some PoJos. Each PoJo inherit a 
> >> field "sessionId" from the parent class. The field is private but has a 
> >> public getter.
> >>
> >> The join is like this:
> >> DataSet<Tuple2<SourceA,SourceB>> joinedDataSet = 
> >> sourceA.join(SourceB).where("sessionId").equalTo("sessionId");
> >>
> >> But the result is the following execption:
> >>
> >> Exception in thread "main" 
> >> org.apache.flink.api.common.InvalidProgramException: This type 
> >> (GenericType) cannot be used as key.
> >>at 
> >> org.apache.flink.api.java.operators.Keys$ExpressionKeys.(Keys.java:287)
> >>at 
> >> org.apache.flink.api.java.operators.JoinOperator$JoinOperatorSets.where(JoinOperator.java:890)
> >>at 
> >> x.y.z.eventstore.processing.pmc.PmcProcessor.main(PmcProcessor.java:55)
> >>
> >> I spend some time with google around but I don't get an idea what is 
> >> wrong. I hope some of you can give me a hint...
> >>
> >> Greets
> >> Dominique
> >>
> >
> 
> 



Re: Join two Datasets --> InvalidProgramException

2016-02-09 Thread Chiwan Park
Hi Dominique,

It seems that `SourceA` is not dealt as POJO. Are all fields in SourceA public? 
There are some requirements for POJO classes [1].

[1]: 
https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/programming_guide.html#pojos

Regards,
Chiwan Park

> On Feb 9, 2016, at 7:42 PM, Dominique Rondé <dominique.ro...@codecentric.de> 
> wrote:
> 
> Hi  folks, 
> 
> i try to join two datasets containing some PoJos. Each PoJo inherit a field 
> "sessionId" from the parent class. The field is private but has a public 
> getter. 
> 
> The join is like this: 
> DataSet<Tuple2<SourceA,SourceB>> joinedDataSet = 
> sourceA.join(SourceB).where("sessionId").equalTo("sessionId"); 
> 
> But the result is the following execption: 
> 
> Exception in thread "main" 
> org.apache.flink.api.common.InvalidProgramException: This type 
> (GenericType) cannot be used as key. 
> at 
> org.apache.flink.api.java.operators.Keys$ExpressionKeys.(Keys.java:287)
> at 
> org.apache.flink.api.java.operators.JoinOperator$JoinOperatorSets.where(JoinOperator.java:890)
> at 
> x.y.z.eventstore.processing.pmc.PmcProcessor.main(PmcProcessor.java:55) 
> 
> I spend some time with google around but I don't get an idea what is wrong. I 
> hope some of you can give me a hint... 
> 
> Greets 
> Dominique 
> 



Re: Join two Datasets --> InvalidProgramException

2016-02-09 Thread Chiwan Park
Oh, the fields in SourceA have public getters. Does the fields in SourceA have 
public setter? SourceA needs public setter for private fields.

Regards,
Chiwan Park

> On Feb 9, 2016, at 7:45 PM, Chiwan Park <chiwanp...@apache.org> wrote:
> 
> Hi Dominique,
> 
> It seems that `SourceA` is not dealt as POJO. Are all fields in SourceA 
> public? There are some requirements for POJO classes [1].
> 
> [1]: 
> https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/programming_guide.html#pojos
> 
> Regards,
> Chiwan Park
> 
>> On Feb 9, 2016, at 7:42 PM, Dominique Rondé <dominique.ro...@codecentric.de> 
>> wrote:
>> 
>> Hi  folks, 
>> 
>> i try to join two datasets containing some PoJos. Each PoJo inherit a field 
>> "sessionId" from the parent class. The field is private but has a public 
>> getter. 
>> 
>> The join is like this: 
>> DataSet<Tuple2<SourceA,SourceB>> joinedDataSet = 
>> sourceA.join(SourceB).where("sessionId").equalTo("sessionId"); 
>> 
>> But the result is the following execption: 
>> 
>> Exception in thread "main" 
>> org.apache.flink.api.common.InvalidProgramException: This type 
>> (GenericType) cannot be used as key. 
>>at 
>> org.apache.flink.api.java.operators.Keys$ExpressionKeys.(Keys.java:287)
>>at 
>> org.apache.flink.api.java.operators.JoinOperator$JoinOperatorSets.where(JoinOperator.java:890)
>>at 
>> x.y.z.eventstore.processing.pmc.PmcProcessor.main(PmcProcessor.java:55) 
>> 
>> I spend some time with google around but I don't get an idea what is wrong. 
>> I hope some of you can give me a hint... 
>> 
>> Greets 
>> Dominique 
>> 
> 



Re: Simple Flink - Kafka Test

2016-02-09 Thread Chiwan Park
Hi shotte,

The exception is caused by Scala version mismatch. If you want to use Scala 
2.11, you have to set Flink dependencies compiled for Scala 2.11. We have a 
documentation about this in wiki [1].

I hope this helps.

[1]: 
https://cwiki.apache.org/confluence/display/FLINK/Maven+artifact+names+suffixed+with+Scala+version

Regards,
Chiwan Park

> On Feb 10, 2016, at 9:39 AM, shotte <hotte.sylv...@gmail.com> wrote:
> 
> Do I need to go to Flink 1.0 or the downgrade to Kafka 0.8 ?
> 
> 
> 
> --
> View this message in context: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Simple-Flink-Kafka-Test-tp4828p4829.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at 
> Nabble.com.



Re: Flink 1.0-SNAPSHOT scala 2.11 in S3 has scala 2.10

2016-02-09 Thread Chiwan Park
Hi David,

I just downloaded the "flink-1.0-SNAPSHOT-bin-hadoop2_2.11.tgz” but there is no 
jar compiled with Scala 2.10. Could you check again?

Regards,
Chiwan Park

> On Feb 10, 2016, at 2:59 AM, David Kim <david@braintreepayments.com> 
> wrote:
> 
> Hello,
> 
> I noticed that the flink binary for scala 2.11 located at 
> http://stratosphere-bin.s3.amazonaws.com/flink-1.0-SNAPSHOT-bin-hadoop2_2.11.tgz
>  contains the scala 2.10 flavor.
> 
> If you open the lib folder the name of the jar in lib is 
> flink-dist_2.10-1.0-SNAPSHOT.jar.
> 
> Could this be an error in the process that updates these files in S3?
> 
> We're using that download link following the suggestions here: 
> https://flink.apache.org/contribute-code.html#snapshots-nightly-builds. If 
> there's a better place let us know as well!
> 
> Thanks,
> David



Re: Simple Flink - Kafka Test

2016-02-09 Thread Chiwan Park
The documentation I sent is for Flink 1.0.

In Flink 0.10.x, there is no suffix of dependencies for Scala 2.10 (e.g. 
flink-streaming-java). But there is a suffix of dependencies for Scala 2.11 
(e.g. flink-streaming-java_2.11).

Regards,
Chiwan Park

> On Feb 10, 2016, at 1:46 PM, Chiwan Park <chiwanp...@apache.org> wrote:
> 
> Hi shotte,
> 
> The exception is caused by Scala version mismatch. If you want to use Scala 
> 2.11, you have to set Flink dependencies compiled for Scala 2.11. We have a 
> documentation about this in wiki [1].
> 
> I hope this helps.
> 
> [1]: 
> https://cwiki.apache.org/confluence/display/FLINK/Maven+artifact+names+suffixed+with+Scala+version
> 
> Regards,
> Chiwan Park
> 
>> On Feb 10, 2016, at 9:39 AM, shotte <hotte.sylv...@gmail.com> wrote:
>> 
>> Do I need to go to Flink 1.0 or the downgrade to Kafka 0.8 ?
>> 
>> 
>> 
>> --
>> View this message in context: 
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Simple-Flink-Kafka-Test-tp4828p4829.html
>> Sent from the Apache Flink User Mailing List archive. mailing list archive 
>> at Nabble.com.
> 



Re: Left join with unbalanced dataset

2016-01-30 Thread Chiwan Park
Hi Arnaud,

To join two datasets, the community recommends using join operation rather than 
cogroup operation. For left join, you can use leftOuterJoin method. Flink’s 
optimizer decides distributed join execution strategy using some statistics of 
the datasets such as size of the dataset. Additionally, you can set join hint 
to help optimizer decide the strategy.

In transformations section [1] of Flink documentation, you can find about outer 
join operation in detail.

I hope this helps.

[1]: 
https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/programming_guide.html#transformations

Regards,
Chiwan Park

> On Jan 30, 2016, at 6:43 PM, LINZ, Arnaud <al...@bouyguestelecom.fr> wrote:
> 
> Hello,
> 
> I have a very big dataset A to left join with a dataset B that is half its 
> size. That is to say, half of A records will be matched with one record of B, 
> and the other half with null values.
> 
> I used a CoGroup for that, but my batch fails because yarn kills the 
> container due to memory problems.
> 
> I guess that’s because one worker will get half of A dataset (the unmatched 
> ones), and that’s too much for a single JVM
> 
> Am I right in my diagnostic ? Is there a better way to left join unbalanced 
> datasets ?
> 
> Best regards,
> 
> Arnaud
> 
> 
> 
> L'intégrité de ce message n'étant pas assurée sur internet, la société 
> expéditrice ne peut être tenue responsable de son contenu ni de ses pièces 
> jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous 
> n'êtes pas destinataire de ce message, merci de le détruire et d'avertir 
> l'expéditeur.
> 
> The integrity of this message cannot be guaranteed on the Internet. The 
> company that sent this message cannot therefore be held liable for its 
> content nor attachments. Any unauthorized use or dissemination is prohibited. 
> If you are not the intended recipient of this message, then please delete it 
> and notify the sender.



Re: rowmatrix equivalent

2016-01-27 Thread Chiwan Park
There is a JIRA issue (FLINK-1873, [1]) that covers the distributed matrix 
implementation.

[1]: https://issues.apache.org/jira/browse/FLINK-1873

Regards,
Chiwan Park

> On Jan 27, 2016, at 5:21 PM, Chiwan Park <chiwanp...@apache.org> wrote:
> 
> I hope the distributed matrix and vector implementation on Flink. :)
> 
> Regards,
> Chiwan Park
> 
>> On Jan 27, 2016, at 2:29 AM, Lydia Ickler <ickle...@googlemail.com> wrote:
>> 
>> Hi Till,
>> 
>> maybe I will do that :) 
>> If I have some other questions I will let you know!
>> 
>> Best regards,
>> Lydia
>> 
>> 
>>> Am 24.01.2016 um 17:33 schrieb Till Rohrmann <trohrm...@apache.org>:
>>> 
>>> Hi Lydia,
>>> 
>>> Flink does not come with a distributed matrix implementation as Spark does 
>>> it with the RowMatrix, yet. However, you can easily implement it yourself. 
>>> This would also be a good contribution to the project if you want to tackle 
>>> the problem
>>> 
>>> Cheers,
>>> Till
>>> 
>>> On Sun, Jan 24, 2016 at 4:03 PM, Lydia Ickler <ickle...@googlemail.com> 
>>> wrote:
>>> Hi all,
>>> 
>>> this is maybe a stupid question but what within Flink is the equivalent to 
>>> Sparks’ RowMatrix ?
>>> 
>>> Thanks in advance,
>>> Lydia
>>> 
>> 
> 



Re: Reading ORC format on Flink

2016-01-27 Thread Chiwan Park
Hi Phil,

I think that you can read ORC file using OrcInputFormat [1] with readHadoopFile 
method.

There is an example on MapReduce [2] in Stackoveflow. The approach works also 
on Flink. Maybe you have to use RichMapFunction [3] to initialize OrcSerde and 
StructObjectInspector object.

Regards,
Chiwan Park

[1]: 
https://hive.apache.org/javadocs/r0.13.1/api/ql/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.html
[2]: 
http://stackoverflow.com/questions/22673222/how-do-you-use-orcfile-input-output-format-in-mapreduce
[3]: 
https://ci.apache.org/projects/flink/flink-docs-release-0.10/api/java/org/apache/flink/api/common/functions/RichMapFunction.html

> On Jan 28, 2016, at 4:44 AM, Philip Lee <philjj...@gmail.com> wrote:
> 
> Hello, 
> 
> Question about reading ORC format on Flink.
> 
> I want to use dataset after loadtesting csv to orc format by Hive.
> Can Flink support reading ORC format?
> 
> If so, please let me know how to use the dataset in Flink.
> 
> Best,
> Phil
> 
> 
> 
> 



Re: Compile fails with scala 2.11.4

2016-01-20 Thread Chiwan Park
Thanks for sharing, Ritesh!

Regards,
Chiwan Park

> On Jan 21, 2016, at 12:28 AM, Ritesh Kumar Singh 
> <riteshoneinamill...@gmail.com> wrote:
> 
> Thanks for the update Robert, I tried it out and it works fine for 
> scala_2.11.4 version.
> I've made a docker image of the same and put it up on the hub just in case 
> anyone else needs it.
> 
> Thanks,
> 
> Ritesh Kumar Singh,
> https://riteshtoday.wordpress.com/
> 
> On Wed, Jan 20, 2016 at 10:07 AM, Robert Metzger <rmetz...@apache.org> wrote:
> Hi,
> 
> in the latest master, the "tools/change-scala-version.sh" should be fixed. 
> Also, the 1.0-SNAPSHOT version deployed to the snapshot repository should be 
> good again.
> 
> @Ritesh: The commands were correct. I'm not sure if Flink builds with Scala 
> 2.11.4, the default 2.11 version we are using is 2.11.7.
> 
> 
> 
> On Tue, Jan 19, 2016 at 7:41 AM, Prez Cannady <revp...@correlatesystems.com> 
> wrote:
> Assuming you haven’t already migrated back to 2.10, you might try this;
> 
> $ git checkout release-0.10
> $ tools/change-scala-version 2.11
> $ mvn clean install -DskipTests=true -Dmaven.javadoc.skip=true 
> -Dscala.version=2.11.4 -Dscala.binary.version=2.11
> 
> Then try building your project.  Building under these conditions resolved my 
> issue which emitted the same error.
> 
> Prez Cannady  
> p: 617 500 3378  
> e: revp...@opencorrelate.org  
> GH: https://github.com/opencorrelate  
> LI: https://www.linkedin.com/in/revprez  
> 
> 
> 
> 
> 
> 
> 
> 
> 
>> On Jan 18, 2016, at 10:20 AM, Ritesh Kumar Singh 
>> <riteshoneinamill...@gmail.com> wrote:
>> 
>> Thanks for the replies.
>> 
>> @Chiwan, I am switching back to scala_2.10.4 for the time being. I was using 
>> scala_2.11.4 as this is the version I've compiled spark with. But anyways, I 
>> can wait for the bug to be resolved.
>> 
>> @Robert, the commands were as follows:
>> $tools/change-scala-version.sh 2.11
>> $mvn clean install -DskipTests -Dscala.version=2.11.4
>> 
>> I hope I'm doing it right ?
>> 
>> Thanks,
>> 
>> Ritesh Kumar Singh,
>> https://riteshtoday.wordpress.com/
>> 
>> On Mon, Jan 18, 2016 at 12:03 PM, Robert Metzger <rmetz...@apache.org> wrote:
>> How did start the Flink for Scala 2.11 compilation ?
>> 
>> On Mon, Jan 18, 2016 at 11:41 AM, Chiwan Park <chiwanp...@apache.org> wrote:
>> Hi Ritesh,
>> 
>> This problem seems already reported [1]. Flink community is investigating 
>> this issue. I think that if you don’t need Scala 2.11, use Scala 2.10 until 
>> the issue is solved.
>> 
>> [1]: 
>> http://mail-archives.apache.org/mod_mbox/flink-user/201601.mbox/%3CCAB6CeiZ_2snN-piXzd3gHnyQePu_PA0Ro7qXUF8%3DVTxoyL0YyA%40mail.gmail.com%3E
>> 
>> > On Jan 18, 2016, at 7:24 PM, Ritesh Kumar Singh 
>> > <riteshoneinamill...@gmail.com> wrote:
>> >
>> > [ERROR] 
>> > /home/flink/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala:703:
>> >  error: can't expand macros compiled by previous versions of Scala
>> > [ERROR]   assert(cachedGraph2.isArchived)
>> > [ERROR]   ^
>> > [ERROR] one error found
>> > [INFO] 
>> > 
>> > [INFO] Reactor Summary:
>> > [INFO]
>> > [INFO] flink .. SUCCESS [ 
>> > 24.820 s]
>> > [INFO] flink-annotations .. SUCCESS [  
>> > 2.755 s]
>> > [INFO] flink-shaded-hadoop  SUCCESS [  
>> > 0.208 s]
>> > [INFO] flink-shaded-hadoop2 ... SUCCESS [ 
>> > 15.627 s]
>> > [INFO] flink-shaded-include-yarn-tests  SUCCESS [ 
>> > 17.076 s]
>> > [INFO] flink-shaded-curator ... SUCCESS [  
>> > 0.200 s]
>> > [INFO] flink-shaded-curator-recipes ... SUCCESS [  
>> > 2.751 s]
>> > [INFO] flink-shaded-curator-test .. SUCCESS [  
>> > 0.355 s]
>> > [INFO] flink-core . SUCCESS [ 
>> > 33.052 s]
>> > [INFO] flink-java . SUCCESS [ 
>> > 10.224 s]
>> > [INFO] flink-runtime .. FAILURE [01:23 
>> > min]
>> > [INFO] flink-optimizer  SKIPPED
>> >
>> >
>> > Any workaround for scala_2.11.4 or do I have to switch back to 
>> > scala_2.10.4 ?
>> >
>> > Thanks,
>> > Ritesh Kumar Singh,
>> > https://riteshtoday.wordpress.com/
>> >
>> 
>> Regards,
>> Chiwan Park
>> 
>> 
>> 
>> 
> 
> 
> 
> 



Re: How to prepare data for K means clustering

2016-01-20 Thread Chiwan Park
Hi Ashutosh,

You can use basic Flink DataSet operations such as map and filter to transform 
your data. Basically, you have to declare a distance metric between each record 
in data. In example, we use euclidean distance (see euclideanDistance method in 
Point class).

In map method in SelectNearestCenter class, euclideanDistance method is used to 
measure the distance between each point. For your implementation, you have to 
substitute type to your data type (It can be your custom class or 
Flink-provided Tuple) and change distance metric for your data.

Regards,
Chiwan Park

> On Jan 21, 2016, at 4:14 PM, Ashutosh Kumar <ashutosh.disc...@gmail.com> 
> wrote:
> 
> I saw example code for K means clustering . It takes input  data points as 
> pair of double values (1.2 2.3\n5.3 7.2\.). My question is how do I convert 
> my business data to this format. I have customer data which has columns like 
> house hold income , education and several others. I want to do clustering on 
> multiple columns something like Neilsen segments. 
> 
> Thanks
> Ashutosh



Re: Compile fails with scala 2.11.4

2016-01-18 Thread Chiwan Park
Hi Ritesh,

This problem seems already reported [1]. Flink community is investigating this 
issue. I think that if you don’t need Scala 2.11, use Scala 2.10 until the 
issue is solved.

[1]: 
http://mail-archives.apache.org/mod_mbox/flink-user/201601.mbox/%3CCAB6CeiZ_2snN-piXzd3gHnyQePu_PA0Ro7qXUF8%3DVTxoyL0YyA%40mail.gmail.com%3E

> On Jan 18, 2016, at 7:24 PM, Ritesh Kumar Singh 
> <riteshoneinamill...@gmail.com> wrote:
> 
> [ERROR] 
> /home/flink/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala:703:
>  error: can't expand macros compiled by previous versions of Scala
> [ERROR]   assert(cachedGraph2.isArchived)
> [ERROR]   ^
> [ERROR] one error found
> [INFO] 
> 
> [INFO] Reactor Summary:
> [INFO] 
> [INFO] flink .. SUCCESS [ 24.820 
> s]
> [INFO] flink-annotations .. SUCCESS [  2.755 
> s]
> [INFO] flink-shaded-hadoop  SUCCESS [  0.208 
> s]
> [INFO] flink-shaded-hadoop2 ... SUCCESS [ 15.627 
> s]
> [INFO] flink-shaded-include-yarn-tests  SUCCESS [ 17.076 
> s]
> [INFO] flink-shaded-curator ... SUCCESS [  0.200 
> s]
> [INFO] flink-shaded-curator-recipes ... SUCCESS [  2.751 
> s]
> [INFO] flink-shaded-curator-test .. SUCCESS [  0.355 
> s]
> [INFO] flink-core . SUCCESS [ 33.052 
> s]
> [INFO] flink-java . SUCCESS [ 10.224 
> s]
> [INFO] flink-runtime .. FAILURE [01:23 
> min]
> [INFO] flink-optimizer  SKIPPED
> 
> 
> Any workaround for scala_2.11.4 or do I have to switch back to scala_2.10.4 ?
> 
> Thanks,
> Ritesh Kumar Singh,
> https://riteshtoday.wordpress.com/
> 

Regards,
Chiwan Park



Re: Where is Flink 0.10.1 documentation?

2016-01-08 Thread Chiwan Park
Great, thanks. :)

> On Jan 8, 2016, at 6:44 PM, Robert Metzger <rmetz...@apache.org> wrote:
> 
> I updated the version in the "release-0.10" branch, that should fix the 
> issue: http://git-wip-us.apache.org/repos/asf/flink/commit/1d05dbe5
> 
> On Fri, Jan 8, 2016 at 10:25 AM, Stephan Ewen <se...@apache.org> wrote:
> Hi!
> 
> I think we missed updating the variable "version" in the "docs/_config.yml" 
> for the 0.10.1 release.
> 
> Would be good to update it and push a new version of the docs.
> 
> Greetings,
> Stephan
> 
> On Fri, Jan 8, 2016 at 6:51 AM, Chiwan Park <chiwanp...@apache.org> wrote:
> Hi squirrels,
> 
> I connected to https://ci.apache.org/projects/flink/flink-docs-release-0.10/ 
> to read Flink 0.10.1 documentation. But the documentation version is still 
> 0.10.0. Although there are only few or no differences between 0.10.0 and 
> 0.10.1, I think that documentation version should be updated to announce 
> latest stable version to newcomers.
> 
> Is there any problem to update doc?
> 
> Regards,
> Chiwan Park

Regards,
Chiwan Park




Where is Flink 0.10.1 documentation?

2016-01-07 Thread Chiwan Park
Hi squirrels,

I connected to https://ci.apache.org/projects/flink/flink-docs-release-0.10/ to 
read Flink 0.10.1 documentation. But the documentation version is still 0.10.0. 
Although there are only few or no differences between 0.10.0 and 0.10.1, I 
think that documentation version should be updated to announce latest stable 
version to newcomers.

Is there any problem to update doc?

Regards,
Chiwan Park




Re: Flink on EMR Question

2016-01-06 Thread Chiwan Park
Great! Thanks for addressing!

> On Jan 6, 2016, at 5:51 PM, Stephan Ewen <se...@apache.org> wrote:
> 
> At a first look, I think that "flink-runtime" does not need Apache Httpclient 
> at all. I'll try to simply remove that dependency...
> 
> On Wed, Jan 6, 2016 at 7:14 AM, Chiwan Park <chiwanp...@apache.org> wrote:
> Hi,
> 
> Thanks for answering me!
> 
> It is happy to hear the problem will be addressed. :)
> 
> About question 2, flink-runtime uses Apache Httpclient 4.2.6 and S3 file 
> system api implemented by Amazon uses 4.3.x. There are some API changes, so 
> NoSuchMethodError exception occurs.
> 
> > On Jan 5, 2016, at 11:59 PM, Stephan Ewen <se...@apache.org> wrote:
> >
> > Hi!
> >
> > Concerning (1) We have seen that a few times. The JVMs / Threads do 
> > sometimes not properly exit in a graceful way, and YARN is not always able 
> > to kill the process (YARN bug). I am currently working on a refactoring of 
> > the YARN resource manager (to allow to easy addition of other frameworks) 
> > and have addressed this as part of that. Will be in the master in a bit.
> >
> > Concerning (2) Do you know which component in Flink uses the HTTP client?
> >
> > Greetings,
> > Stephan
> >
> >
> > On Tue, Jan 5, 2016 at 2:49 PM, Maximilian Bode 
> > <maximilian.b...@tngtech.com> wrote:
> > Hi everyone,
> >
> > Regarding Q1, I believe I have witnessed a comparable phenomenon in a 
> > (3-node, non-EMR) YARN cluster. After shutting down the yarn session via 
> > `stop`, one container seems to linger around. `yarn application -list` is 
> > empty, whereas `bin/yarn-session.sh -q` lists the left-over container. 
> > Also, there is still one application shown as ‚running‘ in Ambari’s YARN 
> > pane under current applications. Then, after some time (order of a few 
> > minutes) it disappears and the resources are available again.
> >
> > I have not tested this behavior extensibly so far. Noticeably, I was not 
> > able to reproduce it by just starting a session and then ending it again 
> > right away without looking at the JobManager web interface. Maybe this 
> > produces some kind of lag as far as YARN containers are concerned?
> >
> > Cheers,
> > Max
> >
> > > Am 04.01.2016 um 12:52 schrieb Chiwan Park <chiwanp...@apache.org>:
> > >
> > > Hi All,
> > >
> > > I have some problems using Flink on Amazon EMR cluster.
> > >
> > > Q1. Sometimes, jobmanager container still exists after destroying yarn 
> > > session by pressing Ctrl+C. In that case, Flink YARN app seems exited 
> > > correctly in YARN RM dashboard. But there is a running container in the 
> > > dashboard. From logs of the container, I realize that the container is 
> > > jobmanager.
> > >
> > > I cannot kill the container because there is no permission to restart 
> > > YARN RM in Amazon EMR. In my small Hadoop Cluster (w/3 nodes), the 
> > > problem doesn’t appear.
> > >
> > > Q2. I tried to use S3 file system in Flink on EMR. But I can’t use it 
> > > because of version conflict of Apache Httpclient. In default, 
> > > implementation of S3 file system in EMR is 
> > > `com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem` which is linked with 
> > > other version of Apache Httpclient.
> > >
> > > As I wrote above, I cannot restart Hadoop cluster after modifying 
> > > conf-site.xml because of lack of permission. How can I solve this problem?
> > >
> > > Regards,
> > > Chiwan Park
> > >
> > >
> 
> Regards,
> Chiwan Park

Regards,
Chiwan Park




Flink on EMR Question

2016-01-04 Thread Chiwan Park
Hi All,

I have some problems using Flink on Amazon EMR cluster.

Q1. Sometimes, jobmanager container still exists after destroying yarn session 
by pressing Ctrl+C. In that case, Flink YARN app seems exited correctly in YARN 
RM dashboard. But there is a running container in the dashboard. From logs of 
the container, I realize that the container is jobmanager.

I cannot kill the container because there is no permission to restart YARN RM 
in Amazon EMR. In my small Hadoop Cluster (w/3 nodes), the problem doesn’t 
appear.

Q2. I tried to use S3 file system in Flink on EMR. But I can’t use it because 
of version conflict of Apache Httpclient. In default, implementation of S3 file 
system in EMR is `com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem` which is 
linked with other version of Apache Httpclient.

As I wrote above, I cannot restart Hadoop cluster after modifying conf-site.xml 
because of lack of permission. How can I solve this problem?

Regards,
Chiwan Park




Re: Streaming in Flink

2016-01-04 Thread Chiwan Park
Hi Sourav,

Basically, Kafka consumer is pull-based [1]. If you want to build push-based 
system, you should use other options.

Flink supports both pull-based and push-based paradigm. It depends upon an 
implementation of data source. As one of examples, Flink provides a streaming 
source function based on socket [2].

[1] http://kafka.apache.org/documentation.html#design_pull
[2] 
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java

> On Jan 5, 2016, at 3:01 PM, Sourav Mazumder <sourav.mazumde...@gmail.com> 
> wrote:
> 
> Hi Gordon,
> 
> Need little more clarification around reading data from Kafka.
> 
> As soon as any component behaves as a consumer of a topic/queue (iof a 
> messaging system), it essentially does polling of the data after a regular 
> interval (that interval may be small though). Hence essentially it captures 
> all data/events accumulated in the queue from last polling instant to the 
> current polling in a pull manner
> 
> This pattern is very different from real time push of data where a daemon 
> process keeps on waiting on a continuous basis for any data pushed to it. 
> 
> So what I'm looking clarification for is whether Flink supports a mechanism 
> where a data source (actually any client application) can push a data to a 
> socket which is continuously listened by a deamon process of flink.
> 
> Regards,
> Sourav
> 
> On Mon, Jan 4, 2016 at 9:39 PM, Gordon Tai (戴資力) <tzuli...@gmail.com> wrote:
> Hi Sourav,
> 
> Flink's streaming processes incoming data by-each-entry (true streaming, as 
> compared to micro-batch), and streaming is inherently designed as a 
> push-model, where a topology of stream transformations "listens" to a data 
> source.
> 
> You can have a Flink streaming topology's data source configured to be 
> sockets or message queues such as Kafka's topics.
> Any event / data that is streamed to (or in other words, "pushed" to) the 
> socket or Kafka topic will be processed by the Flink topology in real-time.
> 
> Therefore, the answer is yes to your question. Hope this helps.
> 
> BR,
> Gordon
> 
> On Tue, Jan 5, 2016 at 12:42 PM, Sourav Mazumder 
> <sourav.mazumde...@gmail.com> wrote:
> Hi,
> 
> Does Flink support push based data streaming where the data source can push 
> the events/data to Flink cluster over a socket (instead of Flink pulling the 
> data at a given frequency)?
> 
> Regards,
> Sourav
> 
> 
> 
> -- 
> Tzu-Li Tai (Gordon Tai)
> 戴資力
> 
> National Cheng Kung University, Graduate Institute of Computer and 
> Communication Engineering
> High Performance Parallel and Distributed Systems Laboratory (HPDS Lab)
> 國立成功大學電機工程學系 - 電腦與通信工程研究所
> 高效能平行/分散系統實驗室 (HPDS Lab)
> 
> National Cheng Kung University, Engineering Science Dpt.
> 國立成功大學工程科學系
> 
> Contacts
> tzuli...@ee.ncku.edu.tw
> http://tzulitai.ee.ncku.edu.tw
> Linkedin: tw.linkedin.com/in/tzulitai
> +886981916890
> 

Regards,
Chiwan Park




Re: 2015: A Year in Review for Apache Flink

2015-12-30 Thread Chiwan Park
Happy New Year 2016 :)

> On Dec 31, 2015, at 11:22 AM, Henry Saputra <henry.sapu...@gmail.com> wrote:
> 
> Dear All,
> 
> It is almost end of 2015 and it has been busy and great year for Apache Flink 
> =)
> 
> Robert Metzger had posted great blog summarizing Apache Flink grow for
> this year:
> 
>  https://flink.apache.org/news/2015/12/18/a-year-in-review.html
> 
> Happy New Year everyone and thanks for being part of this great community!
> 
> 
> Thanks,
> 
> - Henry

Regards,
Chiwan Park





Re: Using Hadoop Input/Output formats

2015-11-24 Thread Chiwan Park
Thanks for correction @Fabian. :)

> On Nov 25, 2015, at 4:40 AM, Suneel Marthi <smar...@apache.org> wrote:
> 
> Guess, it makes sense to add readHadoopXXX() methods to 
> StreamExecutionEnvironment (for feature parity with what's existing presently 
> in ExecutionEnvironment).
> 
> Also Flink-2949 addresses the need to add relevant syntactic sugar wrappers 
> in DataSet api for the code snippet in Fabian's previous email. Its not cool, 
> having to instantiate a JobConf in client code and having to pass that 
> around. 
> 
> 
> 
> On Tue, Nov 24, 2015 at 2:26 PM, Fabian Hueske <fhue...@gmail.com> wrote:
> Hi Nick,
> 
> you can use Flink's HadoopInputFormat wrappers also for the DataStream API. 
> However, DataStream does not offer as much "sugar" as DataSet because 
> StreamEnvironment does not offer dedicated createHadoopInput or 
> readHadoopFile methods.
> 
> In DataStream Scala you can read from a Hadoop InputFormat (TextInputFormat 
> in this case) as follows:
> 
> val textData: DataStream[(LongWritable, Text)] = env.createInput(
>   new HadoopInputFormat[LongWritable, Text](
> new TextInputFormat,
> classOf[LongWritable],
> classOf[Text],
> new JobConf()
> ))
> 
> The Java version is very similar.
> 
> Note: Flink has wrappers for both MR APIs: mapred and mapreduce.
> 
> Cheers,
> Fabian
> 
> 2015-11-24 19:36 GMT+01:00 Chiwan Park <chiwanp...@apache.org>:
> I’m not streaming expert. AFAIK, the layer can be used with only DataSet. 
> There are some streaming-specific features such as distributed snapshot in 
> Flink. These need some supports of source and sink. So you have to implement 
> I/O.
> 
> > On Nov 25, 2015, at 3:22 AM, Nick Dimiduk <ndimi...@gmail.com> wrote:
> >
> > I completely missed this, thanks Chiwan. Can these be used with DataStreams 
> > as well as DataSets?
> >
> > On Tue, Nov 24, 2015 at 10:06 AM, Chiwan Park <chiwanp...@apache.org> wrote:
> > Hi Nick,
> >
> > You can use Hadoop Input/Output Format without modification! Please check 
> > the documentation[1] in Flink homepage.
> >
> > [1] 
> > https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/hadoop_compatibility.html
> >
> > > On Nov 25, 2015, at 3:04 AM, Nick Dimiduk <ndimi...@apache.org> wrote:
> > >
> > > Hello,
> > >
> > > Is it possible to use existing Hadoop Input and OutputFormats with Flink? 
> > > There's a lot of existing code that conforms to these interfaces, seems a 
> > > shame to have to re-implement it all. Perhaps some adapter shim..?
> > >
> > > Thanks,
> > > Nick
> >
> > Regards,
> > Chiwan Park
> >
> >
> 
> Regards,
> Chiwan Park
> 

Regards,
Chiwan Park





Re: Using Hadoop Input/Output formats

2015-11-24 Thread Chiwan Park
I’m not streaming expert. AFAIK, the layer can be used with only DataSet. There 
are some streaming-specific features such as distributed snapshot in Flink. 
These need some supports of source and sink. So you have to implement I/O.

> On Nov 25, 2015, at 3:22 AM, Nick Dimiduk <ndimi...@gmail.com> wrote:
> 
> I completely missed this, thanks Chiwan. Can these be used with DataStreams 
> as well as DataSets?
> 
> On Tue, Nov 24, 2015 at 10:06 AM, Chiwan Park <chiwanp...@apache.org> wrote:
> Hi Nick,
> 
> You can use Hadoop Input/Output Format without modification! Please check the 
> documentation[1] in Flink homepage.
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/hadoop_compatibility.html
> 
> > On Nov 25, 2015, at 3:04 AM, Nick Dimiduk <ndimi...@apache.org> wrote:
> >
> > Hello,
> >
> > Is it possible to use existing Hadoop Input and OutputFormats with Flink? 
> > There's a lot of existing code that conforms to these interfaces, seems a 
> > shame to have to re-implement it all. Perhaps some adapter shim..?
> >
> > Thanks,
> > Nick
> 
> Regards,
> Chiwan Park
> 
> 

Regards,
Chiwan Park





Re: Using Hadoop Input/Output formats

2015-11-24 Thread Chiwan Park
Hi Nick,

You can use Hadoop Input/Output Format without modification! Please check the 
documentation[1] in Flink homepage.

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/hadoop_compatibility.html

> On Nov 25, 2015, at 3:04 AM, Nick Dimiduk <ndimi...@apache.org> wrote:
> 
> Hello,
> 
> Is it possible to use existing Hadoop Input and OutputFormats with Flink? 
> There's a lot of existing code that conforms to these interfaces, seems a 
> shame to have to re-implement it all. Perhaps some adapter shim..?
> 
> Thanks,
> Nick

Regards,
Chiwan Park





Re: /home/user/udfs.jar same question

2015-09-25 Thread Chiwan Park
Hi Deng,

It sounds weird. In code [1], `jarFiles` parameter is defined as a varargs 
parameter. From this, we can omit the parameter.

Which version of Flink are you using?

Regards,
Chiwan Park

[1] 
https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java#L1148

> On Sep 25, 2015, at 5:15 PM, Deng Jie <smartdeng...@gmail.com> wrote:
> 
> Dear Chiwan,
> However, 'createRemoteEnvironment' method must specify this parameter.
> 
> -- Best wishes for you
> 邓杰
> 15994846535
> 
> 2015-09-25 13:42 GMT+08:00 Chiwan Park <chiwanp...@apache.org>:
> Hi Deng,
> 
> The jarFiles parameter of `createRemoteEnvironment` means that the path of 
> your custom library jar. If you don’t need custom library, you can omit the 
> parameter.
> 
> Regards,
> Chiwan Park
> 
> > On Sep 25, 2015, at 10:48 AM, Deng Jie <smartdeng...@gmail.com> wrote:
> >
> > Dear Flink org,i have same question,like:
> > public static void main(String[] args) throws Exception {
> >
> > ExecutionEnvironment env
> > =
> >  ExecutionEnvironment
> >
> > .createRemoteEnvironment("flink-master", 6123, "/home/user/udfs.jar");
> >
> >
> > DataSet
> >  data = env.readTextFile("hdfs://path/to/file");
> >
> >
> > data
> >
> > .filter(new FilterFunction() {
> >
> >
> > public boolean filter(String value) {
> >
> >
> > return value.startsWith("http://;);
> >
> >
> > }
> >
> >
> > })
> >
> >
> > .writeAsText("hdfs://path/to/result");
> >
> >
> > env
> > .execute();
> > }
> >
> > How to write the file(udfs.jar),can you give me an example?In addition,can 
> > this parameter as an option?
> >
> > -- Best wishes for you
> 
> 
> 
> 







Re: /home/user/udfs.jar same question

2015-09-25 Thread Chiwan Park
Oh, sorry for wrong information.
I have misunderstood about `jarFiles` parameter.

Regards,
Chiwan Park

> On Sep 25, 2015, at 5:27 PM, Fabian Hueske <fhue...@gmail.com> wrote:
> 
> Hi Deng Jie,
> 
> your Flink program needs to be packaged into a JAR file.
> The Flink quickstart Maven archetypes for Java [1] and Scala [2] help to 
> setup a Maven project that packages your program correctly into a JAR file.
> 
> Best, Fabian
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-0.9/quickstart/java_api_quickstart.html
> [2] 
> https://ci.apache.org/projects/flink/flink-docs-release-0.9/quickstart/scala_api_quickstart.html
> 
> 2015-09-25 10:15 GMT+02:00 Deng Jie <smartdeng...@gmail.com>:
> Dear Chiwan,
> However, 'createRemoteEnvironment' method must specify this parameter.
> 
> -- Best wishes for you
> 邓杰
> 15994846535
> 
> 2015-09-25 13:42 GMT+08:00 Chiwan Park <chiwanp...@apache.org>:
> Hi Deng,
> 
> The jarFiles parameter of `createRemoteEnvironment` means that the path of 
> your custom library jar. If you don’t need custom library, you can omit the 
> parameter.
> 
> Regards,
> Chiwan Park
> 
> > On Sep 25, 2015, at 10:48 AM, Deng Jie <smartdeng...@gmail.com> wrote:
> >
> > Dear Flink org,i have same question,like:
> > public static void main(String[] args) throws Exception {
> >
> > ExecutionEnvironment env
> > =
> >  ExecutionEnvironment
> >
> > .createRemoteEnvironment("flink-master", 6123, "/home/user/udfs.jar");
> >
> >
> > DataSet
> >  data = env.readTextFile("hdfs://path/to/file");
> >
> >
> > data
> >
> > .filter(new FilterFunction() {
> >
> >
> > public boolean filter(String value) {
> >
> >
> > return value.startsWith("http://;);
> >
> >
> > }
> >
> >
> > })
> >
> >
> > .writeAsText("hdfs://path/to/result");
> >
> >
> > env
> > .execute();
> > }
> >
> > How to write the file(udfs.jar),can you give me an example?In addition,can 
> > this parameter as an option?
> >
> > -- Best wishes for you
> 
> 
> 
> 
> 







Re: /home/user/udfs.jar same question

2015-09-24 Thread Chiwan Park
Hi Deng,

The jarFiles parameter of `createRemoteEnvironment` means that the path of your 
custom library jar. If you don’t need custom library, you can omit the 
parameter.

Regards,
Chiwan Park

> On Sep 25, 2015, at 10:48 AM, Deng Jie <smartdeng...@gmail.com> wrote:
> 
> Dear Flink org,i have same question,like:
> public static void main(String[] args) throws Exception {
> 
> ExecutionEnvironment env 
> =
>  ExecutionEnvironment
> 
> .createRemoteEnvironment("flink-master", 6123, "/home/user/udfs.jar");
> 
> 
> DataSet
>  data = env.readTextFile("hdfs://path/to/file");
> 
> 
> data
> 
> .filter(new FilterFunction() {
> 
> 
> public boolean filter(String value) {
> 
> 
> return value.startsWith("http://;);
> 
> 
> }
> 
> 
> })
> 
> 
> .writeAsText("hdfs://path/to/result");
> 
> 
> env
> .execute();
> }
> 
> How to write the file(udfs.jar),can you give me an example?In addition,can 
> this parameter as an option?
> 
> -- Best wishes for you





Re: Is there a way to change the port for the Flink monitoring page?

2015-09-17 Thread Chiwan Park
Hi Felix,

You can change the listening port of jobmanager web frontend by setting 
`jobmanager.web.port` in configuration (conf/flink-conf.yml).
I attached a link of documentation [1] about this.

Regards,
Chiwan Park

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-0.9/setup/config.html#jobmanager-web-frontend


> On Sep 17, 2015, at 8:31 AM, Felix Cheung <felixcheun...@hotmail.com> wrote:
> 
> 
> I'm  using Flink from Zeppelin in local mode and Zeppelin is using 8080 
> 
> 
> 
> 





Re: Inheritance and FlatMap with custom POJO

2015-09-16 Thread Chiwan Park
Hi Giacomo,

You should set your field as public. If you are set your field as private or 
protected, the class must provide getter and setter to be treated as POJO.

Maybe the documentation in homepage [1] would be helpful.

Regards,
Chiwan Park

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/apis/programming_guide.html#pojos

> On Sep 16, 2015, at 11:25 PM, Giacomo Licari <giacomo.lic...@gmail.com> wrote:
> 
> I run it only implementing java.io.Serializable without disabling the closure 
> cleaner.
> 
> Another question I have is about POJO classes. 
> I would also create a base POJO class with some common proprerties, and then 
> extend it in new classes. These classes are used to convert a CSV into a 
> dataset of POJO objects (of derived class type). 
> 
> In the following example, I create a DataSet of TwitterPOJO, which extends a 
> Base class, adding the new property "tweet".
> 
> DataSet ds_twitter = env.readCsvFile("file://"+path_twitter)
>.pojoType(TwitterPOJO.class, "table", "time", "tweet"); 
> 
> I obtain this error:
> [main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - class 
> com.Flink.POJO.TwitterPOJO is not a valid POJO type
> Exception in thread "main" java.lang.ClassCastException: 
> org.apache.flink.api.java.typeutils.GenericTypeInfo cannot be cast to 
> org.apache.flink.api.java.typeutils.PojoTypeInfo
> 
> Greetings,
> G.L.
> 
> On Wed, Sep 16, 2015 at 4:05 PM, Stephan Ewen <se...@apache.org> wrote:
> Could you also try the other variant (disabeling the closure cleaner)? I 
> would be curious if this behavior is expected Java Serialization behavior, or 
> whether our pre-processing code is causing it.
> 
> Greetings,
> Stephan
> 
> 
> On Wed, Sep 16, 2015 at 3:38 PM, Giacomo Licari <giacomo.lic...@gmail.com> 
> wrote:
> Thank you Martin and Stephan for your help.
> I tried directly to implement java.io.Serializable in Base class and it 
> worked perfectly!
> 
> Now I can develop more flexible and maintainable code. Thank you a lot guys.
> 
> Greetings,
> Giacomo
> 
> On Wed, Sep 16, 2015 at 1:46 PM, Stephan Ewen <se...@apache.org> wrote:
> Hi!
> 
> Interesting case. We use plain Java Serialization to distribute UDFs, and 
> perform additional "cleaning" of scopes, which may be causing the issue.
> 
> Can you try the following to see if any of those resolves the problem?
> 
> 1) On the environment, disable the closure cleaner (in the execution config).
> 
> 2) Let the CullTimeBase class implement java.io.Serializable.
> 
> Please let us know how it turns out!
> 
> Greetings,
> Stephan
> 
> 
> On Wed, Sep 16, 2015 at 1:29 PM, Martin Junghanns <m.jungha...@mailbox.org> 
> wrote:
> Hi Giacomo,
> 
> I ran into the same issue. Seems to be coupled to the serialization mechanism 
> of UDFs. I solved it by letting the base class implement the UDF interface 
> (e.g. FlatMapFunction) and in addition make it generic (which should be 
> necessary in your example).
> 
> public [abstract] class CullTimeBase<IN, OUT> implements FlatMapFunction<IN, 
> OUT> {
> // ...
> }
> 
> public class CullTimeRainFall extends CullTimeBase<RainFallPOJO, 
> RainFallPOJO> {
> // ...
> }
> 
> This should work.
> 
> Best,
> Martin
> 
> 
> On 16.09.2015 10:41, Giacomo Licari wrote:
>> Hi guys,
>> I'm trying to create a base class which is inherited from classes 
>> implementing FlatMap method on specific POJO types.
>> 
>> It seems inheritance doesn't work, I can access this.PropertyName or 
>> super.PropertyName from flatMap method but values are always null. 
>> 
>> Here the derived class, using RainfallPOJO:
>> 
>> public class CullTimeRainfall extends CullTimeBase implements 
>> FlatMapFunction<RainfallPOJO, RainfallPOJO> {
>> 
>>  public CullTimeRainfall(int num, int den, String time_data_name, String 
>> start_time, String end_time, int interval, String time_unit){
>>  super(num, den, time_data_name, start_time, end_time, interval, 
>> time_unit); 
>>  }   
>> 
>>  public void flatMap(RainfallPOJO obj, Collector coll) 
>> throws Exception {  
>>  DateFormat formatter = new 
>> SimpleDateFormat("-MM-dd'T'hh:mm:ss.SSS");
>>  try {
>>  Date time = formatter.parse( obj.getTime().replaceAll( 
>> "([0-9\\-T]+:[0-9]{2}:[0-9.+]+):([0-9]{2})", "$1$2" ) );
>>  if(time.after(this.startTime) &

Re: Inheritance and FlatMap with custom POJO

2015-09-16 Thread Chiwan Park
I created a JIRA issue [1]. After FLINK-2637 [2][3] is resolved, I’ll submit a 
patch to solve this.
Currently, there is no way to use derived class with CSV input.

Thank you for reporting.

[1] https://issues.apache.org/jira/browse/FLINK-2690
[2] https://issues.apache.org/jira/browse/FLINK-2637
[3] https://github.com/apache/flink/pull/1134

Regards,
Chiwan Park

> On Sep 17, 2015, at 1:33 AM, Chiwan Park <chiwanp...@apache.org> wrote:
> 
> It seems like a bug of CsvInputFormat. I succeed in reproducing in my local 
> machine.
> I will create a JIRA issue for this and submit a patch to fix it.
> 
> Which version of Flink are used?
> 
> Regards,
> Chiwan Park
> 
>> On Sep 17, 2015, at 12:20 AM, Giacomo Licari <giacomo.lic...@gmail.com> 
>> wrote:
>> 
>> Yes I did.
>> 
>> if anyone has a bypass solution, let us know.
>> 
>> Regards,
>> Giacomo Licari
>> 
>> On Wed, Sep 16, 2015 at 5:15 PM, Chiwan Park <chiwanp...@apache.org> wrote:
>> Hi Giacomo,
>> 
>> Did you create constructors without arguments in both base class and derived 
>> class?
>> If you do, it seems like a bug.
>> 
>> Regards,
>> Chiwan Park
>> 
>>> On Sep 17, 2015, at 12:04 AM, Giacomo Licari <giacomo.lic...@gmail.com> 
>>> wrote:
>>> 
>>> Hi Chiwan,
>>> I followed instructions in documentation.
>>> I have a simple base class with some properties (all public).
>>> Then I extend that class with a new public property (tweet in my case), I 
>>> provide also getter and setter for that property.
>>> 
>>> Now when I execute:
>>> DataSet ds_twitter = env.readCsvFile("file://"+path_twitter)
>>>   .pojoType(TwitterPOJO.class, "table", "time", "tweet");
>>> 
>>> I receive:
>>> There is no field called "table" in com.Flink.POJO.TwitterPOJO
>>> 
>>> table is a field of the Base class, declared as public with also getter and 
>>> setter.
>>> 
>>> Thank you for your help.
>>> 
>>> Giacomo
>>> 
>>> On Wed, Sep 16, 2015 at 4:42 PM, Chiwan Park <chiwanp...@apache.org> wrote:
>>> Hi Giacomo,
>>> 
>>> You should set your field as public. If you are set your field as private 
>>> or protected, the class must provide getter and setter to be treated as 
>>> POJO.
>>> 
>>> Maybe the documentation in homepage [1] would be helpful.
>>> 
>>> Regards,
>>> Chiwan Park
>>> 
>>> [1] 
>>> https://ci.apache.org/projects/flink/flink-docs-master/apis/programming_guide.html#pojos
>>> 
>>>> On Sep 16, 2015, at 11:25 PM, Giacomo Licari <giacomo.lic...@gmail.com> 
>>>> wrote:
>>>> 
>>>> I run it only implementing java.io.Serializable without disabling the 
>>>> closure cleaner.
>>>> 
>>>> Another question I have is about POJO classes.
>>>> I would also create a base POJO class with some common proprerties, and 
>>>> then extend it in new classes. These classes are used to convert a CSV 
>>>> into a dataset of POJO objects (of derived class type).
>>>> 
>>>> In the following example, I create a DataSet of TwitterPOJO, which extends 
>>>> a Base class, adding the new property "tweet".
>>>> 
>>>> DataSet ds_twitter = env.readCsvFile("file://"+path_twitter)
>>>>   .pojoType(TwitterPOJO.class, "table", "time", "tweet");
>>>> 
>>>> I obtain this error:
>>>> [main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - class 
>>>> com.Flink.POJO.TwitterPOJO is not a valid POJO type
>>>> Exception in thread "main" java.lang.ClassCastException: 
>>>> org.apache.flink.api.java.typeutils.GenericTypeInfo cannot be cast to 
>>>> org.apache.flink.api.java.typeutils.PojoTypeInfo
>>>> 
>>>> Greetings,
>>>> G.L.
>>>> 
>>>> On Wed, Sep 16, 2015 at 4:05 PM, Stephan Ewen <se...@apache.org> wrote:
>>>> Could you also try the other variant (disabeling the closure cleaner)? I 
>>>> would be curious if this behavior is expected Java Serialization behavior, 
>>>> or whether our pre-processing code is causing it.
>>>> 
>>>> Greetings,
>>>> Stephan
>>>> 
>>>> 
>>>> On Wed, Sep 16, 2015 at 3:38 PM

Re: Inheritance and FlatMap with custom POJO

2015-09-16 Thread Chiwan Park
It seems like a bug of CsvInputFormat. I succeed in reproducing in my local 
machine.
I will create a JIRA issue for this and submit a patch to fix it.

Which version of Flink are used?

Regards,
Chiwan Park

> On Sep 17, 2015, at 12:20 AM, Giacomo Licari <giacomo.lic...@gmail.com> wrote:
> 
> Yes I did.
> 
> if anyone has a bypass solution, let us know.
> 
> Regards,
> Giacomo Licari
> 
> On Wed, Sep 16, 2015 at 5:15 PM, Chiwan Park <chiwanp...@apache.org> wrote:
> Hi Giacomo,
> 
> Did you create constructors without arguments in both base class and derived 
> class?
> If you do, it seems like a bug.
> 
> Regards,
> Chiwan Park
> 
> > On Sep 17, 2015, at 12:04 AM, Giacomo Licari <giacomo.lic...@gmail.com> 
> > wrote:
> >
> > Hi Chiwan,
> > I followed instructions in documentation.
> > I have a simple base class with some properties (all public).
> > Then I extend that class with a new public property (tweet in my case), I 
> > provide also getter and setter for that property.
> >
> > Now when I execute:
> > DataSet ds_twitter = env.readCsvFile("file://"+path_twitter)
> >.pojoType(TwitterPOJO.class, "table", "time", "tweet");
> >
> > I receive:
> > There is no field called "table" in com.Flink.POJO.TwitterPOJO
> >
> > table is a field of the Base class, declared as public with also getter and 
> > setter.
> >
> > Thank you for your help.
> >
> > Giacomo
> >
> > On Wed, Sep 16, 2015 at 4:42 PM, Chiwan Park <chiwanp...@apache.org> wrote:
> > Hi Giacomo,
> >
> > You should set your field as public. If you are set your field as private 
> > or protected, the class must provide getter and setter to be treated as 
> > POJO.
> >
> > Maybe the documentation in homepage [1] would be helpful.
> >
> > Regards,
> > Chiwan Park
> >
> > [1] 
> > https://ci.apache.org/projects/flink/flink-docs-master/apis/programming_guide.html#pojos
> >
> > > On Sep 16, 2015, at 11:25 PM, Giacomo Licari <giacomo.lic...@gmail.com> 
> > > wrote:
> > >
> > > I run it only implementing java.io.Serializable without disabling the 
> > > closure cleaner.
> > >
> > > Another question I have is about POJO classes.
> > > I would also create a base POJO class with some common proprerties, and 
> > > then extend it in new classes. These classes are used to convert a CSV 
> > > into a dataset of POJO objects (of derived class type).
> > >
> > > In the following example, I create a DataSet of TwitterPOJO, which 
> > > extends a Base class, adding the new property "tweet".
> > >
> > > DataSet ds_twitter = env.readCsvFile("file://"+path_twitter)
> > >.pojoType(TwitterPOJO.class, "table", "time", "tweet");
> > >
> > > I obtain this error:
> > > [main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - class 
> > > com.Flink.POJO.TwitterPOJO is not a valid POJO type
> > > Exception in thread "main" java.lang.ClassCastException: 
> > > org.apache.flink.api.java.typeutils.GenericTypeInfo cannot be cast to 
> > > org.apache.flink.api.java.typeutils.PojoTypeInfo
> > >
> > > Greetings,
> > > G.L.
> > >
> > > On Wed, Sep 16, 2015 at 4:05 PM, Stephan Ewen <se...@apache.org> wrote:
> > > Could you also try the other variant (disabeling the closure cleaner)? I 
> > > would be curious if this behavior is expected Java Serialization 
> > > behavior, or whether our pre-processing code is causing it.
> > >
> > > Greetings,
> > > Stephan
> > >
> > >
> > > On Wed, Sep 16, 2015 at 3:38 PM, Giacomo Licari 
> > > <giacomo.lic...@gmail.com> wrote:
> > > Thank you Martin and Stephan for your help.
> > > I tried directly to implement java.io.Serializable in Base class and it 
> > > worked perfectly!
> > >
> > > Now I can develop more flexible and maintainable code. Thank you a lot 
> > > guys.
> > >
> > > Greetings,
> > > Giacomo
> > >
> > > On Wed, Sep 16, 2015 at 1:46 PM, Stephan Ewen <se...@apache.org> wrote:
> > > Hi!
> > >
> > > Interesting case. We use plain Java Serialization to distribute UDFs, and 
> > > perform additional "cleaning" of scopes, which may be causing the issue.
> > >
> &

Re: when use broadcast variable and run on bigdata display this error please help

2015-09-05 Thread Chiwan Park
Hi hagersaleh,

You should know why the error occurred with large scale data. Broadcast 
variables can handle only data of which size is fit for single machine.

I meant that using an external system such as Redis, HBase, …, etc. The 
connection with the external system could be initialized in `open` method of 
rich functions such as `RichFilterFunction`, `RichFlatMapFunction`, …, etc..

You can choose another solution which Stephen said. He said that rethink your 
approach. I think that rethinking your algorithm would be better than my 
suggestion.

From your code, I don’t understand why you want to use broadcast variable. You 
can do same thing with filter and join operations. Here is my implementation 
[1].


Regards,
Chiwan Park

[1] https://gist.github.com/chiwanpark/a0b0269c9a9b058d15d3


> On Sep 4, 2015, at 3:51 AM, hagersaleh <loveallah1...@yahoo.com> wrote:
> 
> Hi Chiwan Park 
> not understand this solution please explain more
> 
> 
> 
> --
> View this message in context: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/when-use-broadcast-variable-and-run-on-bigdata-display-this-error-please-help-tp2455p2676.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at 
> Nabble.com.






Re: Usage of Hadoop 2.2.0

2015-09-03 Thread Chiwan Park
+1 for dropping Hadoop 2.2.0

Regards,
Chiwan Park

> On Sep 4, 2015, at 5:58 AM, Ufuk Celebi <u...@apache.org> wrote:
> 
> +1 to what Robert said.
> 
> On Thursday, September 3, 2015, Robert Metzger <rmetz...@apache.org> wrote:
> I think most cloud providers moved beyond Hadoop 2.2.0.
> Google's Click-To-Deploy is on 2.4.1
> AWS EMR is on 2.6.0
> 
> The situation for the distributions seems to be the following:
> MapR 4 uses Hadoop 2.4.0 (current is MapR 5)
> CDH 5.0 uses 2.3.0 (the current CDH release is 5.4)
> 
> HDP 2.0  (October 2013) is using 2.2.0
> HDP 2.1 (April 2014) uses 2.4.0 already
> 
> So both vendors and cloud providers are multiple releases away from Hadoop 
> 2.2.0.
> 
> Spark does not offer a binary distribution lower than 2.3.0.
> 
> In addition to that, I don't think that the HDFS client in 2.2.0 is really 
> usable in production environments. Users were reporting ArrayIndexOutOfBounds 
> exceptions for some jobs, I also had these exceptions sometimes.
> 
> The easiest approach  to resolve this issue would be  (a) dropping the 
> support for Hadoop 2.2.0
> An alternative approach (b) would be:
>  - ship a binary version for Hadoop 2.3.0
>  - make the source of Flink still compatible with 2.2.0, so that users can 
> compile a Hadoop 2.2.0 version if needed.
> 
> I would vote for approach (a).
> 
> 
> On Tue, Sep 1, 2015 at 5:01 PM, Till Rohrmann <trohrm...@apache.org> wrote:
> While working on high availability (HA) for Flink's YARN execution I stumbled 
> across some limitations with Hadoop 2.2.0. From version 2.2.0 to 2.3.0, 
> Hadoop introduced new functionality which is required for an efficient HA 
> implementation. Therefore, I was wondering whether there is actually a need 
> to support Hadoop 2.2.0. Is Hadoop 2.2.0 still actively used by someone?
> 
> Cheers,
> Till
> 







Re: problem with union

2015-08-27 Thread Chiwan Park
Hi Michele,

We’re doing release process for 0.9.1. Ufuk Celebi will start vote for 0.9.1 
release soon.

Regards,
Chiwan Park

 On Aug 27, 2015, at 6:49 PM, Michele Bertoni 
 michele1.bert...@mail.polimi.it wrote:
 
 Hi everybody,
 I am still waiting for version 0.9.1 to solve this problem, any idea on when 
 it will be released?
 
 
 Thanks
 Best,
 
 michele
 
 
 
 
 Il giorno 15/lug/2015, alle ore 15:58, Maximilian Michels m...@apache.org 
 ha scritto:
 
 I was able to reproduce this problem. It turns out, this has already been 
 fixed in the snapshot version: 
 https://issues.apache.org/jira/browse/FLINK-2229
 
 The fix will be included in the upcoming 0.9.1 release. Thank you again for 
 reporting! 
 
 Kind regards,
 Max
 
 On Wed, Jul 15, 2015 at 11:33 AM, Maximilian Michels m...@apache.org wrote:
 Hi Michele,
 
 Thanks for reporting the problem. It seems like we changed the way we 
 compare generic types like your GValue type. I'm debugging that now. We can 
 get a fix in for the 0.9.1 release.
 
 Cheers,
 Max
 
 On Tue, Jul 14, 2015 at 5:35 PM, Michele Bertoni 
 michele1.bert...@mail.polimi.it wrote:
 Hi everybody, this discussion started in an other thread about a problem in 
 union, but you said it was a different error then i am opening a new topic
 
 I am doing the union of two dataset and I am getting this error
 
 
 
 
 Exception in thread main 
 org.apache.flink.api.common.InvalidProgramException: Cannot union inputs of 
 different types. Input1=scala.Tuple6(_1: Long, _2: String, _3: Long, _4: 
 Long, _5: Character, _6: 
 ObjectArrayTypeInfoGenericTypeit.polimi.genomics.core.DataTypes.GValue), 
 input2=scala.Tuple6(_1: Long, _2: String, _3: Long, _4: Long, _5: Character, 
 _6: 
 ObjectArrayTypeInfoGenericTypeit.polimi.genomics.core.DataTypes.GValue)
 at 
 org.apache.flink.api.java.operators.UnionOperator.init(UnionOperator.java:46)
 at org.apache.flink.api.scala.DataSet.union(DataSet.scala:1101)
 at 
 it.polimi.genomics.flink.FlinkImplementation.operator.region.GenometricCover2$.apply(GenometricCover2.scala:125)
 ...
 
 
 
 
 Input1=
 scala.Tuple6(_1: Long, _2: String, _3: Long, _4: Long, _5: Character, _6: 
 ObjectArrayTypeInfoGenericTypeit.polimi.genomics.core.DataTypes.GValue)
 input2=
 scala.Tuple6(_1: Long, _2: String, _3: Long, _4: Long, _5: Character, _6: 
 ObjectArrayTypeInfoGenericTypeit.polimi.genomics.core.DataTypes.GValue)
 
 
 as you can see the two datasets have the same type
 this error only happens with a custom data type (e.g. i am using an array of 
 GValue, an array of Int or Double works)
 
 in the last flink version it was working (milestone and snapshot) now in 
 0.9.0 it is not
 
 what can it be?
 
 
 thanks for help
 
 cheers,
 Michele
 
 
 







Re: New contributor tasks

2015-08-26 Thread Chiwan Park
Additionally, If you have any questions about contributing, please send a mail 
to dev mailing list.

Regards,
Chiwan Park

 On Aug 27, 2015, at 2:11 PM, Chiwan Park chiwanp...@apache.org wrote:
 
 Hi Naveen,
 
 There is a guide document [1] about contribution in homepage. Please read 
 first before contributing. Maybe a document of coding guidelines [2] would be 
 helpful to you. You can find some issues [3] to start contributing to Flink 
 in JIRA. The issues are labeled as `starter`, `newbie`, or `easyfix`.
 
 Happy contributing!
 
 Regards,
 Chiwan Park
 
 [1] http://flink.apache.org/how-to-contribute.html
 [2] http://flink.apache.org/coding-guidelines.html
 [3] 
 https://issues.apache.org/jira/issues/?jql=project%20%3D%20FLINK%20AND%20resolution%20%3D%20Unresolved%20AND%20labels%20%3D%20starter%20ORDER%20BY%20priority%20DESC
 
 On Aug 27, 2015, at 2:06 PM, Naveen Madhire vmadh...@umail.iu.edu wrote:
 
 Hi,
 
 I've setup Flink on my local linux machine and ran few examples as well. 
 Also setup the Intellij IDE for the coding environment. Can anyone please 
 let me know if there are any beginner tasks which I can take a look for 
 contributing to Apache Flink codebase.
 
 I am comfortable in Java and Scala programming. 
 
 Please let me know.
 
 Thanks,
 Naveen 
 
 
 
 






Re: New contributor tasks

2015-08-26 Thread Chiwan Park
Hi Naveen,

There is a guide document [1] about contribution in homepage. Please read first 
before contributing. Maybe a document of coding guidelines [2] would be helpful 
to you. You can find some issues [3] to start contributing to Flink in JIRA. 
The issues are labeled as `starter`, `newbie`, or `easyfix`.

Happy contributing!

Regards,
Chiwan Park

[1] http://flink.apache.org/how-to-contribute.html
[2] http://flink.apache.org/coding-guidelines.html
[3] 
https://issues.apache.org/jira/issues/?jql=project%20%3D%20FLINK%20AND%20resolution%20%3D%20Unresolved%20AND%20labels%20%3D%20starter%20ORDER%20BY%20priority%20DESC

 On Aug 27, 2015, at 2:06 PM, Naveen Madhire vmadh...@umail.iu.edu wrote:
 
 Hi,
 
 I've setup Flink on my local linux machine and ran few examples as well. Also 
 setup the Intellij IDE for the coding environment. Can anyone please let me 
 know if there are any beginner tasks which I can take a look for contributing 
 to Apache Flink codebase.
 
 I am comfortable in Java and Scala programming. 
 
 Please let me know.
 
 Thanks,
 Naveen 






Re: Flink test environment

2015-08-19 Thread Chiwan Park
Hi Hermann,

In 16 page of Slim’s slides [1], there is a pre-installed virtual machine based 
on VMWare. I haven’t run Flink on that machine. But maybe It works.

Regards,
Chiwan Park

[1] 
http://www.slideshare.net/sbaltagi/apache-flinkcrashcoursebyslimbaltagiandsrinipalthepu

 On Aug 19, 2015, at 10:11 PM, Hermann Azong hermann.az...@gmail.com wrote:
 
 Hey Flinkers,
 
 for testing purposes on cluster, I would like to know if there is a virtual 
 machine where flink allerady work as standalone or on yarn.
 Thank you in advance for answers!
 
 Cheers,
 Hermann





Re: Run Time Exception

2015-07-19 Thread Chiwan Park
Hi,

Flink program should have at least one data sink. When your program calls 
`print` method, the method adds a data sink into your program automatically and 
execute it immediately. If you want to run Flink program without calling 
`print` method, you should add a data sink into your program and execute it 
manually by calling `execute` method of `ExecutionEnvironment` object.

Note that only some methods about data sink such as `print`, `count` and 
`collect` execute the program immediately.

There are more detail descriptions about data sink [1] and lazy evaluation [2] 
in Flink documentation. The documentation will help you to understand the 
structure of Flink program.

Regards,
Chiwan Park

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-0.9/apis/programming_guide.html#data-sinks
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-0.9/apis/programming_guide.html#lazy-evaluation

 On Jul 20, 2015, at 2:59 AM, Madabhattula Rajesh Kumar mrajaf...@gmail.com 
 wrote:
 
 Hi Scahin,
 
 Thank you for the response. I have commented counts print line. After that I 
 got below exception
 
 Exception in thread main java.lang.RuntimeException: No data sinks have 
 been created yet. A program needs at least one sink that consumes data. 
 Examples are writing the data set or printing it.
 at 
 org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:914)
 at 
 org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:893)
 at 
 org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:50)
 at 
 org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:590)
 at WordCount$.main(WordCount.scala:13)
 at WordCount.main(WordCount.scala)
 
 Regards,
 Rajesh
 
 On Sun, Jul 19, 2015 at 8:26 PM, Sachin Goel sachingoel0...@gmail.com wrote:
 Hi
 You do not need to call env.execute after doing a print call. Print itself 
 triggers the execution. The reason for the Exception is quite obvious. After 
 the print call, there is no sink for the program execution. So, execution 
 cannot proceed. 
 You can however explicitly define a sink and then call env.execute.
 
 Cheers!
 Sachin
 
 -- Sachin Goel
 Computer Science, IIT Delhi
 m. +91-9871457685
 
 On Sun, Jul 19, 2015 at 8:06 PM, Madabhattula Rajesh Kumar 
 mrajaf...@gmail.com wrote:
 Hi,
 
 I have written simple wordcount program in scala. When I execute the program, 
 I'm getting below exception.
 
 Please let me know how to fix this issue. I'm using Flink 0.9.0 version
 
 Below is the program :-
 
 val env = ExecutionEnvironment.getExecutionEnvironment
 // get input data
 val text = env readTextFile(/Users/hadoop2/Data/word.txt)
 val counts = text flatMap(l=l split( )) map(word=(word,1)) groupBy(0) 
 sum(1)
 // emit result
 counts print 
 env.execute(TEST)  
 
 Exception :-
 
 Exception in thread main java.lang.RuntimeException: No new data sinks have 
 been defined since the last execution. The last execution refers to the 
 latest call to 'execute()', 'count()', 'collect()', or 'print()'.
 at 
 org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:910)
 at 
 org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:893)
 at 
 org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:50)
 at 
 org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:590)
 at WordCount$.main(WordCount.scala:17)
 at WordCount.main(WordCount.scala)
 
 Regards,
 Rajesh
 
 







Re: Flink Scala performance

2015-07-16 Thread Chiwan Park
You can increase Flink managed memory by increasing Taskmanager JVM Heap 
(taskmanager.heap.mb) in flink-conf.yaml.
There is some explanation of options in Flink documentation [1].

Regards,
Chiwan Park

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#common-options

 On Jul 16, 2015, at 7:23 PM, Vinh June hoangthevinh@gmail.com wrote:
 
 I found it in JobManager log
 
 21:16:54,986 INFO  org.apache.flink.runtime.taskmanager.TaskManager  

 - Using 25 MB for Flink managed memory.
 
 is there a way to explicitly assign this for local ?
 
 
 
 --
 View this message in context: 
 http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Scala-performance-tp2065p2087.html
 Sent from the Apache Flink User Mailing List archive. mailing list archive at 
 Nabble.com.





Re: Deterministic map?

2015-07-14 Thread Chiwan Park
Hi, If you use `partitionCustom()` method [1] with custom partitioner, you can 
guarantee the order of partition.

Regards,
Chiwan Park

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/java/DataSet.html#partitionCustom(org.apache.flink.api.common.functions.Partitioner,%20int)

 On Jul 15, 2015, at 1:10 AM, Juan Fumero 
 juan.jose.fumero.alfo...@oracle.com wrote:
 
 Hi, 
  I am doing pure map computation with typical benchmarks like
 BlackScholes and NBody. 
 
 I am using local configuration with multiple threads. It seems like,
 inside the chuck (total size / numThreads) the order is correct. But the
 ordering between chunks is not correct, giving an incorrect result at
 the end.
 
 What I mean by the order is, the correct result in the correct position
 of the array. 
 
 Is there any way to guarantee the result? 
 
 Many thanks
 Juan
 






Re: Deterministic map?

2015-07-14 Thread Chiwan Park
Sure, here is a example [1] of using `partitionCustom()` method in Java API. 
Scala API is
similar to Java API. 

You should implement PartitionerKEY interface. The interface has a method 
called
partition with two parameters. The first parameter is key value of each record 
and
the second parameter is number of partitions. The partition method must return
the number of partition which the record belongs to. The returned value must be
between 0 and numPartitions - 1.

There are 3 types of `partitionCustom()` method. The difference between three 
methods is
by method of specifying keys. If you want to know more detail of key specifying 
method
in Flink, please see the documentation [2] in Flink homepage.

Regards,
Chiwan Park

[1] https://gist.github.com/chiwanpark/e71d27cc8edae8bc7298
[2] 
https://ci.apache.org/projects/flink/flink-docs-master/apis/programming_guide.html#specifying-keys

 On Jul 15, 2015, at 2:40 AM, Juan Fumero 
 juan.jose.fumero.alfo...@oracle.com wrote:
 
 Hi Chiwan, 
  great thanks. Is there any example available? 
 
 Regards
 Juan
 
 On Wed, 2015-07-15 at 01:19 +0900, Chiwan Park wrote:
 Hi, If you use `partitionCustom()` method [1] with custom partitioner, you 
 can guarantee the order of partition.
 
 Regards,
 Chiwan Park
 
 [1] 
 https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/java/DataSet.html#partitionCustom(org.apache.flink.api.common.functions.Partitioner,%20int)
 
 On Jul 15, 2015, at 1:10 AM, Juan Fumero 
 juan.jose.fumero.alfo...@oracle.com wrote:
 
 Hi, 
 I am doing pure map computation with typical benchmarks like
 BlackScholes and NBody. 
 
 I am using local configuration with multiple threads. It seems like,
 inside the chuck (total size / numThreads) the order is correct. But the
 ordering between chunks is not correct, giving an incorrect result at
 the end.
 
 What I mean by the order is, the correct result in the correct position
 of the array. 
 
 Is there any way to guarantee the result? 
 
 Many thanks
 Juan
 
 
 
 
 
 
 






Re: UI for flink

2015-07-13 Thread Chiwan Park
If you would search a graphical interface for data analytics like Jupyter, you 
should look Apache Zeppelin [1].
Apache Zeppelin is a web-based notebook. It supports Scala, Spark and Flink.

Regards,
Chiwan Park

[1] https://zeppelin.incubator.apache.org

 On Jul 13, 2015, at 9:23 PM, Till Rohrmann trohrm...@apache.org wrote:
 
 Hi Hermann,
 
 when you start a Flink cluster, then also the web interface is started. It is 
 reachable under http://jobManagerURL:8081. The web interface tells you a 
 lot about the current state of your cluster and the currently executed Flink 
 jobs.
 
 Additionally, you can start the web client via ./start-webclient.sh, which 
 you can find in the bin directory. The web client, which is reachable under 
 port 8080, allows you to submit Flink jobs to your cluster via a browser.
 
 Cheers,
 Till
 
 
 On Mon, Jul 13, 2015 at 2:07 PM, Hermann Azong hermann.az...@gmail.com 
 wrote:
 Hello Flinkers,
 
 I'm wondering if a UI Solution for Flink already exist when starting
 
 Sincerly,
 
 Hermann 
 







Re: error when use Broadcast Variables cannot find symbol getRuntimeContext()

2015-07-12 Thread Chiwan Park
Hi, you should use RichMapFunction not MapFunction. The difference between
RichMapFunction and MapFunction is described in Flink documentation [1].

Regards,
Chiwan Park

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/apis/programming_guide.html#rich-functions

 On Jul 12, 2015, at 7:51 AM, hagersaleh loveallah1...@yahoo.com wrote:
 
 import java.util.Collection;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 
 
   ExecutionEnvironment env =
 ExecutionEnvironment.getExecutionEnvironment();
 
DataSetInteger toBroadcast = env.fromElements(1, 2, 3);
 
   DataSetString data = env.fromElements(a, b);
  data.map(new MapFunctionString, String() {
@Override
public void open(Configuration parameters) throws Exception {
  // 3. Access the broadcasted DataSet as a Collection
  CollectionInteger broadcastSet =
 getRuntimeContext().getBroadcastVariable(broadcastSetName);
}
 
 
@Override
public String map(String value) throws Exception {
return hager;
 }
 
 }).withBroadcastSet(toBroadcast, broadcastSetName);
 
 
 display error
 cannot find symbol
 CollectionInteger broadcastSet =
 getRuntimeContext().getBroadcastVariable(broadcastSetName);
  symbol: method getRuntimeContext()
 
 method does not override or implement a method from a supertype
@Override
 
 
 
 --
 View this message in context: 
 http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/error-when-use-Broadcast-Variables-cannot-find-symbol-getRuntimeContext-tp2010.html
 Sent from the Apache Flink User Mailing List archive. mailing list archive at 
 Nabble.com.







Re: how can handles Any , All query on flink

2015-07-12 Thread Chiwan Park
Because there is no default implementations like forany in scala, I use forall
method. Note that ANY (condition) is equivalent as NOT ALL (NOT condition).

Regards,
Chiwan Park

 On Jul 12, 2015, at 5:39 AM, hagersaleh loveallah1...@yahoo.com wrote:
 
 why in this use ! and = in handle Any
override def filter(value: Product): Boolean = !bcSet.forall(value.model
 = _)
  }).withBroadcastSet(pcModels, pcModels).distinct(maker).map(_.maker)
 
 
 
 
 
 --
 View this message in context: 
 http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/how-can-handles-Any-All-query-on-flink-tp1997p2009.html
 Sent from the Apache Flink User Mailing List archive. mailing list archive at 
 Nabble.com.







Re: how can handles Any , All query on flink

2015-07-11 Thread Chiwan Park
Hi, I wrote a example including queries which you want [1].
The example uses only Flink Scala API, but I think It would be better to use 
Table API.
I used broadcast set [2] to perform subqueries in your given query.

Flink has many functions to handle data and the great documentation to explain 
the functions.
I think that It is good to read Batch API section of Flink documentation for 
you.

If you have a question for the example, please reply mail to user mailing list.

Regards,
Chiwan Park

[1] https://gist.github.com/chiwanpark/5e2a6ac00b7e0bf85444
[2] 
https://ci.apache.org/projects/flink/flink-docs-master/apis/programming_guide.html#broadcast-variables

 On Jul 11, 2015, at 5:00 AM, hagersaleh loveallah1...@yahoo.com wrote:
 
 please help
 
 
 
 --
 View this message in context: 
 http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/how-can-handles-Any-All-query-on-flink-tp1997p2005.html
 Sent from the Apache Flink User Mailing List archive. mailing list archive at 
 Nabble.com.





Re: Open method is not called with custom implementation RichWindowMapFunction

2015-07-03 Thread Chiwan Park
I found that the patch had been merged to upstream. [1] :)

Regards,
Chiwan Park

[1] https://github.com/apache/flink/pull/855

 On Jul 3, 2015, at 5:26 PM, Welly Tambunan if05...@gmail.com wrote:
 
 Thanks Chiwan, 
 
 
 Glad to hear that. 
 
 
 Cheers
 
 On Fri, Jul 3, 2015 at 3:24 PM, Chiwan Park chiwanp...@apache.org wrote:
 Hi tambunanw,
 
 The issue is already known and we’ll patch soon. [1]
 In next release (maybe 0.9.1), the problem will be solved.
 
 Regards,
 Chiwan Park
 
 [1] https://issues.apache.org/jira/browse/FLINK-2257
 
  On Jul 3, 2015, at 4:57 PM, tambunanw if05...@gmail.com wrote:
 
  Hi All,
 
  I'm trying to create some experiment with rich windowing function and
  operator state. I modify the streaming stock prices from
 
  https://github.com/mbalassi/flink/blob/stockprices/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/StockPrices.scala
 
  I create the simple windowing function like below
 
  class MyWindowFunction extends RichWindowMapFunction[StockPricex,
  StockPricex] {
   println(created)
   private var counter = 0
 
   override def open(conf: Configuration): Unit = {
 println(opened)
   }
 
   override def mapWindow(values: Iterable[StockPricex], out:
  Collector[StockPricex]): Unit = {
 // if not initialized ..
 
 println(counter)
 println(values)
 counter = counter + 1
 
   }
  }
 
  However the open() method is not invoked when i try to run this code on my
  local environment
 
 spx
   .groupBy(x = x.symbol)
   .window(Time.of(5, TimeUnit.SECONDS)).every(Time.of(1,
  TimeUnit.SECONDS))
   .mapWindow(new MyWindowFunction)
 
  Any thought on this one ?
 
 
  Cheers
 
 
 
  --
  View this message in context: 
  http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Open-method-is-not-called-with-custom-implementation-RichWindowMapFunction-tp1924.html
  Sent from the Apache Flink User Mailing List archive. mailing list archive 
  at Nabble.com.
 
 
 
 
 
 
 
 
 -- 
 Welly Tambunan
 Triplelands 
 
 http://weltam.wordpress.com
 http://www.triplelands.com



Re: Flink 0.9 built with Scala 2.11

2015-07-02 Thread Chiwan Park
@Alexander I’m happy to hear that you want to help me. If you help me, I really 
appreciate. :)

Regards,
Chiwan Park


 On Jul 2, 2015, at 2:57 PM, Alexander Alexandrov 
 alexander.s.alexand...@gmail.com wrote:
 
 @Chiwan: let me know if you need hands-on support. I'll be more then happy to 
 help (as my downstream project is using Scala 2.11).
 
 2015-07-01 17:43 GMT+02:00 Chiwan Park chiwanp...@apache.org:
 Okay, I will apply this suggestion.
 
 Regards,
 Chiwan Park
 
  On Jul 1, 2015, at 5:41 PM, Ufuk Celebi u...@apache.org wrote:
 
 
  On 01 Jul 2015, at 10:34, Stephan Ewen se...@apache.org wrote:
 
  +1, like that approach
 
  +1
 
  I like that this is not breaking for non-Scala users :-)
 
 
 
 



Re: Flink documentation is offline

2015-06-30 Thread Chiwan Park
Hi, 

We already know this issue. There are some problems in Apache Infrastructure.
Infra Team is working on this issue. You can see progress via a blog post [1].
It will be okay soon.

Regards,
Chiwan Park

[1] https://blogs.apache.org/infra/entry/buildbot_master_currently_off_line


 On Jun 30, 2015, at 6:23 PM, LINZ, Arnaud al...@bouyguestelecom.fr wrote:
 
 Hello,
 
  
 
 You are probably aware of the issue, but currently every access to the 
 documentation fromhttps://flink.apache.org  
 (http://ci.apache.org/projects/flink) leads to a “No Such Resource” page.
 
  
 
 Best regards,
 
 Arnaud
 
  
 
  
 
 
 
 L'intégrité de ce message n'étant pas assurée sur internet, la société 
 expéditrice ne peut être tenue responsable de son contenu ni de ses pièces 
 jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous 
 n'êtes pas destinataire de ce message, merci de le détruire et d'avertir 
 l'expéditeur.
 
 The integrity of this message cannot be guaranteed on the Internet. The 
 company that sent this message cannot therefore be held liable for its 
 content nor attachments. Any unauthorized use or dissemination is prohibited. 
 If you are not the intended recipient of this message, then please delete it 
 and notify the sender.







Re: writeAsCsv on HDFS

2015-06-25 Thread Chiwan Park
It represents the folder containing the hadoop config files. :)

Regards,
Chiwan Park


 On Jun 25, 2015, at 10:07 PM, Flavio Pompermaier pomperma...@okkam.it wrote:
 
 fs.hdfs.hadoopconf represents the folder containing the hadoop config files 
 (*-site.xml) or just one specific hadoop config file (e.g. core-site.xml or 
 the hdfs-site.xml)?
 
 On Thu, Jun 25, 2015 at 3:04 PM, Robert Metzger rmetz...@apache.org wrote:
 Hi Flavio,
 
 there is a file called conf/flink-conf.yaml
 Add a new line in the file with the following contents:
 
 fs.hdfs.hadoopconf: /path/to/your/hadoop/config
 
 This should fix the problem.
 Flink can not load the configuration file from the jar containing the user 
 code, because the file system is initialized independent of the the job. So 
 there is (currently) no way of initializing the file system using the user 
 code classloader.
 
 What you can do is making the configuration file available to Flink's system 
 classloader. For example by putting your user jar into the lib/ folder of 
 Flink. You can also add the path to the Hadoop configuration files into the 
 CLASSPATH of Flink (but you need to do that on all machines).
 
 I think the easiest approach is using Flink's configuration file.
 
 
 On Thu, Jun 25, 2015 at 2:59 PM, Flavio Pompermaier pomperma...@okkam.it 
 wrote:
 Could you describe it better with an example please? Why Flink doesn't load 
 automatically the properties of the hadoop conf files within the jar?
 
 On Thu, Jun 25, 2015 at 2:55 PM, Robert Metzger rmetz...@apache.org wrote:
 Hi,
 
 Flink is not loading the Hadoop configuration from the classloader. You have 
 to specify the path to the Hadoop configuration in the flink configuration 
 fs.hdfs.hadoopconf
 
 On Thu, Jun 25, 2015 at 2:50 PM, Flavio Pompermaier pomperma...@okkam.it 
 wrote:
 Hi to all,
 I'm experiencing some problem in writing a file as csv on HDFS with flink 
 0.9.0.
 The code I use is 
   myDataset.writeAsCsv(new Path(hdfs:///tmp, myFile.csv).toString());
 
 If I run the job from Eclipse everything works fine but when I deploy the job 
 on the cluster (cloudera 5.1.3) I obtain the following exception:
 
 Caused by: java.io.IOException: The given HDFS file URI 
 (hdfs:///tmp/myFile.csv) did not describe the HDFS NameNode. The attempt to 
 use a default HDFS configuration, as specified in the 'fs.hdfs.hdfsdefault' 
 or 'fs.hdfs.hdfssite' config parameter failed due to the following problem: 
 Either no default file system was registered, or the provided configuration 
 contains no valid authority component (fs.default.name or fs.defaultFS) 
 describing the (hdfs namenode) host and port.
   at 
 org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.initialize(HadoopFileSystem.java:291)
   at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:258)
   at org.apache.flink.core.fs.Path.getFileSystem(Path.java:309)
   at 
 org.apache.flink.api.common.io.FileOutputFormat.initializeGlobal(FileOutputFormat.java:273)
   at 
 org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:84)
   at 
 org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:520)
   ... 25 more
 
 The core-site.xml is present in the fat jar and contains the property
 
 property
 namefs.defaultFS/name
 valuehdfs://myServerX:8020/value
   /property
 
 I compiled flink with the following command:
 
  mvn clean  install -Dhadoop.version=2.3.0-cdh5.1.3 
 -Dhbase.version=0.98.1-cdh5.1.3 -Dhadoop.core.version=2.3.0-mr1-cdh5.1.3 
 -DskipTests -Pvendor-repos
 
 How can I fix that?
 
 Best,
 Flavio
 



Re: Flink 0.9 built with Scala 2.11

2015-06-21 Thread Chiwan Park
I’m interested in working on this. :) I’ll assign to me.

Regards,
Chiwan Park


 On Jun 21, 2015, at 8:22 AM, Robert Metzger rmetz...@apache.org wrote:
 
 Okay, it seems like we have consensus on this. Who is interested in working 
 on this? https://issues.apache.org/jira/browse/FLINK-2200
 
 On Mon, Jun 15, 2015 at 1:26 AM, Till Rohrmann trohrm...@apache.org wrote:
 +1 for giving only those modules a version suffix which depend on Scala.
 
 
 On Sun, Jun 14, 2015 at 8:03 PM Robert Metzger rmetz...@apache.org wrote:
 There was already a discussion regarding the two options here [1], back then 
 we had a majority for giving all modules a scala suffix.
 
 I'm against giving all modules a suffix because we force our users to migrate 
 the name and its confusing for Java users (I was confused myself when I was 
 trying out Spark two years ago (back then I didn't know anything about Scala 
 ;) ))
 
 [1] https://github.com/apache/flink/pull/477#issuecomment-82266786
 
 On Sun, Jun 14, 2015 at 3:47 AM, Stephan Ewen se...@apache.org wrote:
 Good idea, Chiwan!
 
 On Sat, Jun 13, 2015 at 6:25 PM, Chiwan Park chiwanp...@icloud.com wrote:
 Hi. I think that we don’t need deploy all modules with scala variation. The 
 pure java-based modules such as flink-java, flink-core, flink-optimizers, …, 
 etc. don’t need to be deployed with scala version variation. We need only 
 scala related modules such as flink-ml, flink-runtime, flink-scala, …, etc. 
 with version variation.
 
 So we can reduce a number of deployed modules.
 
 Regards,
 Chiwan Park
 
  On Jun 13, 2015, at 9:17 AM, Robert Metzger rmetz...@apache.org wrote:
 
  I agree that we should ship a 2.11 build of Flink if downstream projects 
  need that.
 
  The only thing that we should keep in mind when doing this is that the 
  number of jars we're pushing to maven will explode (but that is fine)
  We have currently 46 maven modules and we would create 4 versions of each 
  of the modules (hd1,hd2 x scala210,scala211) so we end up with 184 jars per 
  release ;)
 
  The other big question that I have regarding this is how we want to name 
  the modules.
  We could add the scala version to all the modules, like flink-java_2.10, 
  which would mean that users have to change a bit more when upgrading to the 
  release supporting different scala versions.
 
  If we all agree on that, we can move on changing our maven setup.
 
 
  On Wed, Jun 10, 2015 at 8:34 AM, Kostas Tzoumas ktzou...@apache.org wrote:
  Please do ping this list if you encounter any problems with Flink during 
  your project (you have done so already :-), but also if you find that the 
  Flink API needs additions to map Pig well to Flink
 
  On Wed, Jun 10, 2015 at 3:47 PM, Philipp Goetze 
  philipp.goe...@tu-ilmenau.de wrote:
  Done. Can be found here: https://issues.apache.org/jira/browse/FLINK-2200
 
  Best Regards,
  Philipp
 
 
 
  On 10.06.2015 15:29, Chiwan Park wrote:
  But I think uploading Flink API with scala 2.11 to maven repository is nice 
  idea.
  Could you create a JIRA issue?
 
  Regards,
  Chiwan Park
 
  On Jun 10, 2015, at 10:23 PM, Chiwan Park chiwanp...@icloud.com wrote:
 
  No. Currently, there are no Flink binaries with scala 2.11 which are 
  downloadable.
 
  Regards,
  Chiwan Park
 
  On Jun 10, 2015, at 10:18 PM, Philipp Goetze philipp.goe...@tu-ilmenau.de 
  wrote:
 
  Thank you Chiwan!
 
  I did not know the master has a 2.11 profile.
 
  But there is no pre-built Flink with 2.11, which I could refer to in sbt or 
  maven, is it?
 
  Best Regards,
  Philipp
 
  On 10.06.2015 15:03, Chiwan Park wrote:
  Hi. You can build Flink with Scala 2.11 with scala-2.11 profile in master 
  branch.
  `mvn clean install -DskipTests -P \!scala-2.10,scala-2.11` command builds 
  Flink with Scala 2.11.
 
  Regards,
  Chiwan Park
 
  On Jun 10, 2015, at 9:56 PM, Flavio Pompermaier pomperma...@okkam.it 
  wrote:
 
  Nice!
 
  On 10 Jun 2015 14:49, Philipp Goetze philipp.goe...@tu-ilmenau.de wrote:
  Hi community!
 
  We started a new project called Piglet (https://github.com/ksattler/piglet).
  For that we use i.a. Flink as a backend. The project is based on Scala 
  2.11. Thus we need a 2.11 build of Flink.
 
  Until now we used the 2.11 branch of the stratosphere project and built 
  Flink ourselves. Unfortunately this branch is not up-to-date.
 
  Do you have an official repository for Flink 0.9 (built with Scala 2.11)?
 
  Best Regards,
  Philipp
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 







Re: Log messages - redirect

2015-06-19 Thread Chiwan Park
Hi, Flink uses slf4j and log4j for logging.

You can override default log4j configuration programmatically. [1]
Or you can use logback as a logging backend and override default logback 
configuration also. [2][3]

[1] http://stackoverflow.com/a/9003191
[2] http://ci.apache.org/projects/flink/flink-docs-master/internals/logging.html
[3] http://stackoverflow.com/a/3810936

Regards,
Chiwan Park


 On Jun 19, 2015, at 8:05 PM, Juan Fumero 
 juan.jose.fumero.alfo...@oracle.com wrote:
 
 Hi, 
  is there any option (from API level) to redirect the log messages to a
 file instead of stdout? 
 
 Thanks
 
 
 log4j:WARN No appenders could be found for logger
 (org.apache.flink.api.java.ExecutionEnvironment).
 log4j:WARN Please initialize the log4j system properly.
 log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for
 more info.
 06/19/2015 12:14:12 Job execution switched to status RUNNING.
 
 06/19/2015 12:14:12 DataSink (collect() sink)(1/1) switched to
 DEPLOYING 
 06/19/2015 12:14:12 DataSink (collect() sink)(1/1) switched to
 RUNNING 
 06/19/2015 12:14:12 DataSink (collect() sink)(1/1) switched to
 FINISHED 
 06/19/2015 12:14:12 Job execution switched to status FINISHED.
 
 





Re: Memory in local setting

2015-06-17 Thread Chiwan Park
Hi.

You can increase the memory given to Flink by increasing JVM Heap memory in 
local.
If you are using Eclipse as IDE, add “-XmxHEAPSIZE” option in run 
configuration. [1].
Although you are using IntelliJ IDEA as IDE, you can increase JVM Heap using 
the same way. [2]

[1] 
http://help.eclipse.org/luna/index.jsp?topic=%2Forg.eclipse.jdt.doc.user%2Ftasks%2Ftasks-java-local-configuration.htm
[2] 
https://www.jetbrains.com/idea/help/creating-and-editing-run-debug-configurations.html

Regards,
Chiwan Park

 On Jun 17, 2015, at 2:01 PM, Sebastian s...@apache.org wrote:
 
 Hi,
 
 Flink has memory problems when I run an algorithm from my local IDE on a 2GB 
 graph. Is there any way that I can increase the memory given to Flink?
 
 Best,
 Sebastian
 
 Caused by: java.lang.RuntimeException: Memory ran out. numPartitions: 32 
 minPartition: 4 maxPartition: 4 number of overflow segments: 151 bucketSize: 
 146 Overall memory: 14024704 Partition memory: 4194304
   at 
 org.apache.flink.runtime.operators.hash.CompactingHashTable.getNextBuffer(CompactingHashTable.java:784)
   at 
 org.apache.flink.runtime.operators.hash.CompactingHashTable.insertBucketEntryFromSearch(CompactingHashTable.java:668)
   at 
 org.apache.flink.runtime.operators.hash.CompactingHashTable.insertOrReplaceRecord(CompactingHashTable.java:538)
   at 
 org.apache.flink.runtime.operators.hash.CompactingHashTable.buildTableWithUniqueKey(CompactingHashTable.java:347)
   at 
 org.apache.flink.runtime.iterative.task.IterationHeadPactTask.readInitialSolutionSet(IterationHeadPactTask.java:209)
   at 
 org.apache.flink.runtime.iterative.task.IterationHeadPactTask.run(IterationHeadPactTask.java:270)
   at 
 org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
   at java.lang.Thread.run(Thread.java:745)








Re: Flink 0.9 built with Scala 2.11

2015-06-13 Thread Chiwan Park
Hi. I think that we don’t need deploy all modules with scala variation. The 
pure java-based modules such as flink-java, flink-core, flink-optimizers, …, 
etc. don’t need to be deployed with scala version variation. We need only scala 
related modules such as flink-ml, flink-runtime, flink-scala, …, etc. with 
version variation.

So we can reduce a number of deployed modules.

Regards,
Chiwan Park

 On Jun 13, 2015, at 9:17 AM, Robert Metzger rmetz...@apache.org wrote:
 
 I agree that we should ship a 2.11 build of Flink if downstream projects need 
 that.
 
 The only thing that we should keep in mind when doing this is that the number 
 of jars we're pushing to maven will explode (but that is fine)
 We have currently 46 maven modules and we would create 4 versions of each of 
 the modules (hd1,hd2 x scala210,scala211) so we end up with 184 jars per 
 release ;)
 
 The other big question that I have regarding this is how we want to name the 
 modules.
 We could add the scala version to all the modules, like flink-java_2.10, 
 which would mean that users have to change a bit more when upgrading to the 
 release supporting different scala versions.
 
 If we all agree on that, we can move on changing our maven setup.
 
 
 On Wed, Jun 10, 2015 at 8:34 AM, Kostas Tzoumas ktzou...@apache.org wrote:
 Please do ping this list if you encounter any problems with Flink during your 
 project (you have done so already :-), but also if you find that the Flink 
 API needs additions to map Pig well to Flink
 
 On Wed, Jun 10, 2015 at 3:47 PM, Philipp Goetze 
 philipp.goe...@tu-ilmenau.de wrote:
 Done. Can be found here: https://issues.apache.org/jira/browse/FLINK-2200
 
 Best Regards,
 Philipp
 
 
 
 On 10.06.2015 15:29, Chiwan Park wrote:
 But I think uploading Flink API with scala 2.11 to maven repository is nice 
 idea.
 Could you create a JIRA issue?
 
 Regards,
 Chiwan Park
 
 On Jun 10, 2015, at 10:23 PM, Chiwan Park chiwanp...@icloud.com wrote:
 
 No. Currently, there are no Flink binaries with scala 2.11 which are 
 downloadable.
 
 Regards,
 Chiwan Park
 
 On Jun 10, 2015, at 10:18 PM, Philipp Goetze philipp.goe...@tu-ilmenau.de 
 wrote:
 
 Thank you Chiwan!
 
 I did not know the master has a 2.11 profile.
 
 But there is no pre-built Flink with 2.11, which I could refer to in sbt or 
 maven, is it?
 
 Best Regards,
 Philipp
 
 On 10.06.2015 15:03, Chiwan Park wrote:
 Hi. You can build Flink with Scala 2.11 with scala-2.11 profile in master 
 branch.
 `mvn clean install -DskipTests -P \!scala-2.10,scala-2.11` command builds 
 Flink with Scala 2.11.
 
 Regards,
 Chiwan Park
 
 On Jun 10, 2015, at 9:56 PM, Flavio Pompermaier pomperma...@okkam.it wrote:
 
 Nice!
 
 On 10 Jun 2015 14:49, Philipp Goetze philipp.goe...@tu-ilmenau.de wrote:
 Hi community!
 
 We started a new project called Piglet (https://github.com/ksattler/piglet).
 For that we use i.a. Flink as a backend. The project is based on Scala 2.11. 
 Thus we need a 2.11 build of Flink.
 
 Until now we used the 2.11 branch of the stratosphere project and built Flink 
 ourselves. Unfortunately this branch is not up-to-date.
 
 Do you have an official repository for Flink 0.9 (built with Scala 2.11)?
 
 Best Regards,
 Philipp
 
 
 
 
 
 
 
 
 
 
 
 







Re: Flink 0.9 built with Scala 2.11

2015-06-10 Thread Chiwan Park
Hi. You can build Flink with Scala 2.11 with scala-2.11 profile in master 
branch.
`mvn clean install -DskipTests -P \!scala-2.10,scala-2.11` command builds Flink 
with Scala 2.11.

Regards,
Chiwan Park

 On Jun 10, 2015, at 9:56 PM, Flavio Pompermaier pomperma...@okkam.it wrote:
 
 Nice!
 
 On 10 Jun 2015 14:49, Philipp Goetze philipp.goe...@tu-ilmenau.de wrote:
 Hi community!
 
 We started a new project called Piglet (https://github.com/ksattler/piglet).
 For that we use i.a. Flink as a backend. The project is based on Scala 2.11. 
 Thus we need a 2.11 build of Flink.
 
 Until now we used the 2.11 branch of the stratosphere project and built Flink 
 ourselves. Unfortunately this branch is not up-to-date.
 
 Do you have an official repository for Flink 0.9 (built with Scala 2.11)?
 
 Best Regards,
 Philipp







Re: Flink 0.9 built with Scala 2.11

2015-06-10 Thread Chiwan Park
No. Currently, there are no Flink binaries with scala 2.11 which are 
downloadable.

Regards,
Chiwan Park

 On Jun 10, 2015, at 10:18 PM, Philipp Goetze philipp.goe...@tu-ilmenau.de 
 wrote:
 
 Thank you Chiwan!
 
 I did not know the master has a 2.11 profile.
 
 But there is no pre-built Flink with 2.11, which I could refer to in sbt or 
 maven, is it?
 
 Best Regards,
 Philipp
 
 On 10.06.2015 15:03, Chiwan Park wrote:
 Hi. You can build Flink with Scala 2.11 with scala-2.11 profile in master 
 branch.
 `mvn clean install -DskipTests -P \!scala-2.10,scala-2.11` command builds 
 Flink with Scala 2.11.
 
 Regards,
 Chiwan Park
 
 On Jun 10, 2015, at 9:56 PM, Flavio Pompermaier pomperma...@okkam.it 
 wrote:
 
 Nice!
 
 On 10 Jun 2015 14:49, Philipp Goetze philipp.goe...@tu-ilmenau.de wrote:
 Hi community!
 
 We started a new project called Piglet (https://github.com/ksattler/piglet).
 For that we use i.a. Flink as a backend. The project is based on Scala 
 2.11. Thus we need a 2.11 build of Flink.
 
 Until now we used the 2.11 branch of the stratosphere project and built 
 Flink ourselves. Unfortunately this branch is not up-to-date.
 
 Do you have an official repository for Flink 0.9 (built with Scala 2.11)?
 
 Best Regards,
 Philipp
 
 
 
 
 






Re: Flink 0.9 built with Scala 2.11

2015-06-10 Thread Chiwan Park
But I think uploading Flink API with scala 2.11 to maven repository is nice 
idea.
Could you create a JIRA issue?

Regards,
Chiwan Park

 On Jun 10, 2015, at 10:23 PM, Chiwan Park chiwanp...@icloud.com wrote:
 
 No. Currently, there are no Flink binaries with scala 2.11 which are 
 downloadable.
 
 Regards,
 Chiwan Park
 
 On Jun 10, 2015, at 10:18 PM, Philipp Goetze philipp.goe...@tu-ilmenau.de 
 wrote:
 
 Thank you Chiwan!
 
 I did not know the master has a 2.11 profile.
 
 But there is no pre-built Flink with 2.11, which I could refer to in sbt or 
 maven, is it?
 
 Best Regards,
 Philipp
 
 On 10.06.2015 15:03, Chiwan Park wrote:
 Hi. You can build Flink with Scala 2.11 with scala-2.11 profile in master 
 branch.
 `mvn clean install -DskipTests -P \!scala-2.10,scala-2.11` command builds 
 Flink with Scala 2.11.
 
 Regards,
 Chiwan Park
 
 On Jun 10, 2015, at 9:56 PM, Flavio Pompermaier pomperma...@okkam.it 
 wrote:
 
 Nice!
 
 On 10 Jun 2015 14:49, Philipp Goetze philipp.goe...@tu-ilmenau.de 
 wrote:
 Hi community!
 
 We started a new project called Piglet 
 (https://github.com/ksattler/piglet).
 For that we use i.a. Flink as a backend. The project is based on Scala 
 2.11. Thus we need a 2.11 build of Flink.
 
 Until now we used the 2.11 branch of the stratosphere project and built 
 Flink ourselves. Unfortunately this branch is not up-to-date.
 
 Do you have an official repository for Flink 0.9 (built with Scala 2.11)?
 
 Best Regards,
 Philipp
 
 
 
 
 
 
 
 
 






Re: Apache Flink transactions

2015-06-04 Thread Chiwan Park
Hi.

Flink is not DBMS. There is no equivalent operation of insert, update, remove.
But you can use map[1] or filter[2] operation to create modified dataset.

I recommend you some sildes[3][4] to understand Flink concepts.

Regards,
Chiwan Park

[1] 
http://ci.apache.org/projects/flink/flink-docs-master/apis/dataset_transformations.html#map
[2] 
http://ci.apache.org/projects/flink/flink-docs-master/apis/dataset_transformations.html#filter
[3] 
http://www.slideshare.net/robertmetzger1/introduction-to-apache-flink-palo-alto-meetup
[4] http://www.slideshare.net/dataArtisans/flink-training-dataset-api-basics


 On Jun 4, 2015, at 2:48 PM, Hawin Jiang hawin.ji...@gmail.com wrote:
 
 Hi  Admin
 
 
 
 Do we have insert, update and remove operations on Apache Flink?
 
 For example:  I have 10 million records in my test file.  I want to add one
 record, update one record and remove one record from this test file. 
 
 How to implement it by Flink?
 
 Thanks.
 
 
 
 
 
 
 
 
 
 Best regards
 
 Email: hawin.ji...@gmail.com
 






Re: flink terasort

2015-06-03 Thread Chiwan Park
There is a terasort implementation with deprecated API.
https://github.com/apache/flink/blob/master/flink-tests/src/test/java/org/apache/flink/test/recordJobs/sort/TeraSort.java

AFAIK, there is no implementation with current API.

Regards,
Chiwan Park



 On Jun 4, 2015, at 12:17 AM, Bill Sparks jspa...@cray.com wrote:
 
 Just asking, is there an implementation of terasort for flink? 
 
 Regards,
Bill.
 -- 
 Jonathan (Bill) Sparks
 Software Architecture
 Cray Inc.



Re: why when use groupBy(2).sortGroup(0, Order.DESCENDING); not group by and not sort

2015-06-02 Thread Chiwan Park
Note that sortPartition is implemented in 0.9. Following link shows the example 
of sortPartition.
http://ci.apache.org/projects/flink/flink-docs-master/apis/dataset_transformations.html#sort-partition

Regards,
Chiwan Park


 On Jun 2, 2015, at 5:51 PM, hagersaleh loveallah1...@yahoo.com wrote:
 
 I want example for use sortPartition()
 
 
 
 
 --
 View this message in context: 
 http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/why-when-use-groupBy-2-sortGroup-0-Order-DESCENDING-not-group-by-and-not-sort-tp1436p1439.html
 Sent from the Apache Flink User Mailing List archive. mailing list archive at 
 Nabble.com.



Re: why when use groupBy(2).sortGroup(0, Order.DESCENDING); not group by and not sort

2015-06-01 Thread Chiwan Park
Hi. The sortGroup API returns a SortedGrouping object and but you don’t use the 
result. I think that you are confused with groupBy and sortGroup API. You 
should use this API such as following (I assumed you are using 0.8 or 
0.9-milestone-1):

// select the first 10 data for each group.
DataSetCustomer sorted = customers.groupBy(2).sortGroup(0, 
Order.DESCENDING).first(10);
System.out.println(sorted.print());

Note that Flink does not support global sort (FLINK-598) but only support local 
sort currently. The sortGroup API means that sorting for each group.


Regards,
Chiwan Park

 On Jun 2, 2015, at 5:02 AM, hagersaleh loveallah1...@yahoo.com wrote:
 
 why when use groupBy(2).sortGroup(0, Order.DESCENDING); not group by and not
 sort
 
 I want sort DataSet How can I do that?
 
 customers = customers.filter(
new FilterFunctionCustomer() {
@Override
public boolean filter(Customer c) {
 
 
return
 Integer.parseInt(c.getField(0).toString())=5 ;
 
}
});
 
   customers.groupBy(2).sortGroup(0, Order.DESCENDING);
   System.out.println(customers.print()); 
   customers.writeAsCsv(/home/hadoop/Desktop/Dataset/output.csv, \n,
 |);
   env.execute();  
 
 
 public static class Customer extends
 Tuple5Long,String,String,String,String {
   
   }
private static DataSetCustomer
 getCustomerDataSet(ExecutionEnvironment env) {
   return 
 env.readCsvFile(/home/hadoop/Desktop/Dataset/customer.csv)
   .fieldDelimiter('|')
 
 .includeFields(11100110).ignoreFirstLine()
.tupleType(Customer.class);
   }
 
 the result not sort
 2 (1,Customer#1,IVhzIApeRb otcE,711.56,BUILDING)
 2 (2,Customer#2,XSTf4NCwDVaWNe6tEgvwfmRchLXak,121.65,AUTOMOBILE)
 2 (3,Customer#3,MG9kdTD2WBHm,7498.12,AUTOMOBILE)
 2 (4,Customer#4,XxVSJsLAGtn,2866.83,MACHINERY)
 2 (5,Customer#5,KvpyuHCplrB84WgAiGV6sYpZq7Tj,794.47,HOUSEHOLD)
 
 
 
 --
 View this message in context: 
 http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/why-when-use-groupBy-2-sortGroup-0-Order-DESCENDING-not-group-by-and-not-sort-tp1436.html
 Sent from the Apache Flink User Mailing List archive. mailing list archive at 
 Nabble.com.