Re: Compilation Error in WindowStream.fold()

2017-02-24 Thread nsengupta
Hello Aljoscha,

Many thanks for taking this up.

This is the modified code:
--
val uniqueVehicles = envDefault
  .fromCollection(readings)
  .map(e => MITSIMUtils.preparePositionReport(e))
  .assignAscendingTimestamps(e => e.timeOfReport)
  .keyBy(e => (
e.eWayCoordinates.eWayID,
e.eWayCoordinates.eWayDir,
e.eWayCoordinates.eWaySegment,
e.vehicleDetails.vehicleID))
  .windowAll(TumblingEventTimeWindows.of(Time.seconds(30)))
  .fold(
   // Seed
   Map[EWayCoordinates,Set[VehicleID]](),

   // FoldFunction
   folder,

   // WindowFunction
   windower

   // I have taken the TupleTypeInfo out, to see what the compiler
says!
   // Satisfying the compiler: 

   /*new TupleTypeInfo[Map[EWayCoordinates,Set[VehicleID]]](),
   new TupleTypeInfo[(EWayCoordinates,Int)]*/
  )
--

And, this is what the compiler says:
--
[INFO] Compiling 3 source files to
/home/nirmalya/Workspace-Flink/LinearRoad/target/test-classes at
1487991829901
[ERROR]
/home/nirmalya/Workspace-Flink/LinearRoad/src/test/scala/org/nirmalya/exercise/StreamStateTest.scala:137:
error: overloaded method value fold with alternatives:
[ERROR]   [ACC, R](initialValue: ACC, preAggregator: (ACC,
org.nirmalya.exercise.Elements.PositionReport) => ACC, windowFunction:
(org.apache.flink.streaming.api.windowing.windows.TimeWindow, Iterable[ACC],
org.apache.flink.util.Collector[R]) => Unit)(implicit evidence$7:
org.apache.flink.api.common.typeinfo.TypeInformation[ACC], implicit
evidence$8:
org.apache.flink.api.common.typeinfo.TypeInformation[R])org.apache.flink.streaming.api.scala.DataStream[R]

[ERROR]   [ACC, R](initialValue: ACC, preAggregator:
org.apache.flink.api.common.functions.FoldFunction[org.nirmalya.exercise.Elements.PositionReport,ACC],
windowFunction:
org.apache.flink.streaming.api.scala.function.AllWindowFunction[ACC,R,org.apache.flink.streaming.api.windowing.windows.TimeWindow])(implicit
evidence$5: org.apache.flink.api.common.typeinfo.TypeInformation[ACC],
implicit evidence$6:
org.apache.flink.api.common.typeinfo.TypeInformation[R])org.apache.flink.streaming.api.scala.DataStream[R]
[ERROR]  cannot be applied to
(scala.collection.immutable.Map[org.nirmalya.exercise.Elements.EWayCoordinates,Set[org.nirmalya.exercise.Elements.VehicleID]],
org.apache.flink.api.common.functions.FoldFunction[org.nirmalya.exercise.Elements.PositionReport,Map[org.nirmalya.exercise.Elements.EWayCoordinates,Set[Int]]],
org.apache.flink.streaming.api.functions.windowing.AllWindowFunction[Map[org.nirmalya.exercise.Elements.EWayCoordinates,Set[org.nirmalya.exercise.Elements.VehicleID]],(org.nirmalya.exercise.Elements.EWayCoordinates,
Int),org.apache.flink.streaming.api.windowing.windows.Window])
[ERROR]   .fold(
[ERROR]^
[ERROR] one error found
[INFO]

[INFO] BUILD FAILURE
[INFO]


--



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Compilation-Error-in-WindowStream-fold-tp11830p11910.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Compilation Error in WindowStream.fold()

2017-02-23 Thread nsengupta
For reasons I cannot grasp, I am unable to move ahead.

Here's the code:
-


import org.apache.flink.api.common.functions.FoldFunction
import org.apache.flink.api.java.typeutils.TupleTypeInfo
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
import org.apache.flink.streaming.api.scala._
import
org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.{TimeWindow, Window}
import org.apache.flink.util.Collector
import org.nirmalya.exercise.Elements.{EWayCoordinates, PositionReport,
RawMITSIMTuple, VehicleID}

case class EWayCoordinates(eWayID: Int, eWayDir: Int, eWayLane: Int,
eWaySegment: Int)

case class VehicleDetails(vehicleID: Int, vehicleSpeed: Int, vehiclePos:
Int)

case class PositionReport(
  // tupletype: Int,
  timeOfReport: Int,
  eWayCoordinates: EWayCoordinates,
  vehicleDetails: VehicleDetails
   )


// 


val envDefault = StreamExecutionEnvironment.createLocalEnvironment(4)
  envDefault
.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
 
  val readings = IndexedSeq [RawMITSIMTuple] (
RawMITSIMTuple(0,0,107,32,0,0,0,10, 53320,-1,-1,-1,-1,-1,-1),
RawMITSIMTuple(0,0,109,20,0,0,0,10,100644,-1,-1,-1,-1,-1,-1),
RawMITSIMTuple(0,0,106,28,0,0,0,10,137745,-1,-1,-1,-1,-1,-1),
RawMITSIMTuple(0,1,107,32,0,0,0,67,354281,-1,-1,-1,-1,-1,-1),
RawMITSIMTuple(0,1,105,30,0,0,1,94,501089,-1,-1,-1,-1,-1,-1),
RawMITSIMTuple(0,1,110,26,0,0,1,95,495898,-1,-1,-1,-1,-1,-1),
RawMITSIMTuple(0,1,102,30,0,0,1,85,453562,-1,-1,-1,-1,-1,-1),
RawMITSIMTuple(0,2,104,32,0,0,1,80,427144,-1,-1,-1,-1,-1,-1),
RawMITSIMTuple(0,2,102,20,0,0,1,73,390383,-1,-1,-1,-1,-1,-1),
RawMITSIMTuple(0,2,111,27,1,0,1,69,369135,-1,-1,-1,-1,-1,-1),
RawMITSIMTuple(0,2,112,28,1,0,0, 1,  5757,-1,-1,-1,-1,-1,-1)
  )

val folder = new FoldFunction[PositionReport, Map[EWayCoordinates,Set[Int]]]
{
  override
def fold(
  t: Map[EWayCoordinates, Set[VehicleID]], o: PositionReport
): Map[EWayCoordinates, Set[VehicleID]] = {
t + (o.eWayCoordinates -> (t.getOrElse(o.eWayCoordinates,Set.empty)
+ (o.vehicleDetails.vehicleID)))
  }
}

val windower = new AllWindowFunction[Map[EWayCoordinates,
Set[VehicleID]],(EWayCoordinates,Int),Window] {
  override
  def apply(
   w: Window,
   bunch: Iterable[Map[EWayCoordinates, Set[VehicleID]]],
   collector: Collector[(EWayCoordinates, VehicleID)]): Unit = {

val allVehiclesInLast30Mins = bunch.iterator().next.mapValues(e =>
e.size)

allVehiclesInLast30Mins.foreach(e => println(e))

collector.collect((EWayCoordinates(-1,-1,-1,-1),0))

  }
}

val uniqueVehicles = envDefault
  .fromCollection(readings)
  .map(e => MITSIMUtils.preparePositionReport(e))
  .assignAscendingTimestamps(e => e.timeOfReport)
  .keyBy(e => (
e.eWayCoordinates.eWayID,
e.eWayCoordinates.eWayDir,
e.eWayCoordinates.eWaySegment,
e.vehicleDetails.vehicleID))
  .windowAll(TumblingEventTimeWindows.of(Time.seconds(30)))
  .fold(
   // Seed
   Map[EWayCoordinates,Set[VehicleID]](),

   // FoldFunction
   folder,

   // WindowFunction
   windower,

   // Satisfying the compiler
   new TupleTypeInfo[Map[EWayCoordinates,Set[VehicleID]]](),
   new TupleTypeInfo[(EWayCoordinates,Int)]
  )

-

The compiler is unhappy:

[ERROR]
/home/nirmalya/Workspace-Flink/LinearRoad/src/test/scala/org/nirmalya/exercise/StreamStateTest.scala:136:
error: missing argument list for method fold in class AllWindowedStream
[ERROR] Unapplied methods are only converted to functions when a function
type is expected.
[ERROR] You can make this conversion explicit by writing `fold _` or
`fold(_)(_)(_)` instead of `fold`.
[ERROR]   .fold(
[ERROR]^
[ERROR] one error found



I understand why is the compiler unhappy, but I am unsure if I have to go
through all the *devilry*. In no Flink example, I see some such thing being
prescribed. But, then, perhaps I am missing an important point.

I have been through this  comment

Re: Clarification: use of AllWindowedStream.apply() function

2017-02-16 Thread nsengupta
Thanks, Aljoscha for the clarification.

I understand that instead of using a flatMap() in the way I am using, I am
better off using :
* a fold (init, fold_func, window_func) first and then
* map to a different type of my choice, inside the window_func,
parameterised above

I hope I am correct. If so, you don't need to spend time to comment;
☺otherwise, please give a hint.

-- Nirmalya

-

On Thu, Feb 16, 2017 at 4:12 PM, Aljoscha Krettek [via Apache Flink User
Mailing List archive.] <ml-node+s2336050n11665...@n4.nabble.com> wrote:

> Hi,
> you would indeed use apply(), or better fold(,
> , ) to map the result of folding your
> window to some other data type. If you will, a WindowFunction allows
> "mapping" the result of your windowing to a different type.
>
> Best,
> Aljoscha
>
> On Wed, 15 Feb 2017 at 06:14 nsengupta <[hidden email]
> <http:///user/SendEmail.jtp?type=node=11665=0>> wrote:
>
>> I have gone through this  post
>> <http://apache-flink-user-mailing-list-archive.2336050.
>> n4.nabble.com/WindowedStream-operation-questions-td6006.html>
>> , where Aljoscha explains that /mapping/ on WindowedStream is /not/
>> allowed.
>>
>> So, I think I haven't asked the question properly. Here is (hopefully) a
>> better and easier version:
>>
>> 1.I begin with records of type RawMITSIMTuple.
>> 2.When I group them using a Window, I get an
>> AllWindowedStream[RawMITSIMTuple].
>> 3.I /fold/ the tuples obtained in the Window, which gives me a
>> DataStream[Vector[RawMITSIMTuple].
>> 4.What I need is a DataStream[PositionReport]. So, I need to flatMap
>> the
>> output of previous step, where I first get hold of each of the
>> RawMITSIMTuple and map that to PositionReport.
>>
>> val positionReportStream = this
>>   .readRawMITSIMTuplesInjected(envDefault,args(0))
>>   .assignAscendingTimestamps(e => e.timeOfReport)
>>   .windowAll(TumblingEventTimeWindows.of(Time.seconds(30)))
>>   .fold(Vector[RawMITSIMTuple]())((collectorBin,rawRecord) => {
>>   collectorBin :+ rawRecord)
>> })
>>   .flatMap(r => r.map(e => this.preparePositionReport(e)))
>>
>> This gives me what I want, but I feel this is verbose and inefficient. Am
>> I
>> thinking correctly? If so, what is a better idiom to use in such cases?
>>
>> -- Nirmalya
>>
>>
>>
>>
>> --
>> View this message in context: http://apache-flink-user-
>> mailing-list-archive.2336050.n4.nabble.com/Clarification-
>> use-of-AllWindowedStream-apply-function-tp11627p11630.html
>> Sent from the Apache Flink User Mailing List archive. mailing list
>> archive at Nabble.com.
>>
>
>
> --
> If you reply to this email, your message will be added to the discussion
> below:
> http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/Clarification-use-of-AllWindowedStream-
> apply-function-tp11627p11665.html
> To unsubscribe from Clarification: use of AllWindowedStream.apply()
> function, click here
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code=11627=c2VuZ3VwdGEubmlybWFseWFAZ21haWwuY29tfDExNjI3fC01NzQyMjQyNDk=>
> .
> NAML
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer=instant_html%21nabble%3Aemail.naml=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>



-- 
Software Technologist
http://www.linkedin.com/in/nirmalyasengupta
"If you have built castles in the air, your work need not be lost. That is
where they should be.
Now put the foundation under them."




--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Clarification-use-of-AllWindowedStream-apply-function-tp11627p11677.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.

Re: Clarification: use of AllWindowedStream.apply() function

2017-02-14 Thread nsengupta
I have gone through this  post

 
, where Aljoscha explains that /mapping/ on WindowedStream is /not/ allowed. 

So, I think I haven't asked the question properly. Here is (hopefully) a
better and easier version:

1.I begin with records of type RawMITSIMTuple. 
2.When I group them using a Window, I get an
AllWindowedStream[RawMITSIMTuple].
3.I /fold/ the tuples obtained in the Window, which gives me a
DataStream[Vector[RawMITSIMTuple].
4.What I need is a DataStream[PositionReport]. So, I need to flatMap the
output of previous step, where I first get hold of each of the
RawMITSIMTuple and map that to PositionReport.

val positionReportStream = this
  .readRawMITSIMTuplesInjected(envDefault,args(0))
  .assignAscendingTimestamps(e => e.timeOfReport)
  .windowAll(TumblingEventTimeWindows.of(Time.seconds(30)))
  .fold(Vector[RawMITSIMTuple]())((collectorBin,rawRecord) => {
  collectorBin :+ rawRecord)
})
  .flatMap(r => r.map(e => this.preparePositionReport(e)))

This gives me what I want, but I feel this is verbose and inefficient. Am I
thinking correctly? If so, what is a better idiom to use in such cases?

-- Nirmalya




--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Clarification-use-of-AllWindowedStream-apply-function-tp11627p11630.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Clarification: use of AllWindowedStream.apply() function

2017-02-14 Thread nsengupta
I am trying to understand if the AllWindowedStream.apply() function can be
used for creating a DataStream of new types. 

Here is a portion of the code:


case class RawMITSIMTuple(
 tupletype: Int,  timeOfReport: Int,
vehicleID: Int,   vehicleSpeed: Int,
 expressWayID: Int,   vehicleLane: Int, 
vehicleDir: Int,
 vehicleSegment: Int, vehiclePos: Int,   queyID:
Int,
 segmentInit: Int,segmentEnd: Int ,
 dayOfWeek: Int,  timeOfDay: Int,dayID:
Int
 )

  case class EWayCoordinates(eWayID: Int, eWayDir: Int, eWayLane: Int,
eWaySegment: Int)

  case class VehicleDetails(vehicleID: Int, vehicleSpeed: Int, vehiclePos:
Int)

  case class PositionReport(
  tupletype: Int, timeOfReport: Int,
  eWayCoordinates: EWayCoordinates,
  vehicleDetails: VehicleDetails
   )

val envDefault = StreamExecutionEnvironment.getExecutionEnvironment
envDefault.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

// ...

val positionReportStream = this
  .readRawMITSIMTuplesInjected(envDefault,args(0))
  .assignAscendingTimestamps(e => e.timeOfReport)
  .windowAll(TumblingEventTimeWindows.of(Time.seconds(30)))



positionReportStream above is of type *AllWindowedStream*. As such, I cannot
use it as a DataStream[PositionReport]: I cannot segregate it by some kind
of KeySelection and use it further down. 

I have been thinking of using a FoldFunction on it, but that gives a
collection of PositionReport. So, I get a DataStream[Vector[PositionReport]]
(Vector is just an example).

The other alternative is to use an AllWindowedStream.apply() function, where
I can emit a DataStream[PositionReport]. But, that will mean that I am using
the apply function more as a *mapper*. Is that the right way to use it?

Could someone please push me to the correct way to deal with it?

-- Nirmalya
 



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Clarification-use-of-AllWindowedStream-apply-function-tp11627.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Table API: java.sql.DateTime is not supported;

2017-02-06 Thread nsengupta
Hello Timo,

Thanks for the clarification.

This means that I *cannot use CsvTableSource*, as I have, in the example.
Instead, I should:

 *   Write custom Scalar function to convert STRINGs to other datatypes as
required
 *   Read the file as CsvInput, with all fields as STRINGs
 *   Apply the Scalar function as approrpiate and Map() to a desired a
*DataSet* type
 *   /Convert/ the DataSet to a Table
 *Use SQL to access the Table 

Is my understanding correct?

-- Nirmalya



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Table-API-java-sql-DateTime-is-not-supported-tp11439p11480.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Compiler error while using 'CsvTableSource'

2017-02-06 Thread nsengupta
Thanks, Timo.

Do I need to add anything to the ticket? Please let me know. I will do the
needful.

-- N

On Mon, Feb 6, 2017 at 2:25 PM, Timo Walther [via Apache Flink User Mailing
List archive.] <ml-node+s2336050n11452...@n4.nabble.com> wrote:

> I created an issue to make this a bit more user-friendly in the future.
>
> https://issues.apache.org/jira/browse/FLINK-5714
>
> Timo
>
>
> Am 05/02/17 um 06:08 schrieb nsengupta:
>
> Thanks, Till, for taking time to share your understanding.
>
> -- N
>
> On Sun, Feb 5, 2017 at 12:49 AM, Till Rohrmann [via Apache Flink User
> Mailing List archive.] <[hidden email]
> <http:///user/SendEmail.jtp?type=node=11443=0>> wrote:
>
>> I think the problem is that there are actually two constructors with the
>> same signature. The one is defined with default arguments and the other has
>> the same signature as the one with default arguments when you leave all
>> default arguments out. I assume that this confuses the Scala compiler and
>> only works if you've specified the right types or at least one of the
>> parameters with a default argument.
>>
>> Cheers,
>> Till
>>
>> On Fri, Feb 3, 2017 at 12:49 PM, nsengupta <[hidden email]
>> <http:///user/SendEmail.jtp?type=node=11441=0>> wrote:
>>
>>> Till,
>>>
>>> Many thanks. Just to confirm that it is working fine at my end, here's a
>>> screenshot.
>>>
>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.na
>>> bble.com/file/n11427/Selection_258.png>
>>>
>>> This is Flink 1.1.4 but Flink-1.2/Flink-1.3 shouldn't be any problem.
>>>
>>> It never struck me that lack of covariance in Scala Arrays was the
>>> source of
>>> the problem. Bravo!
>>>
>>> BTW, I am just curious to know how the Testcases worked: just to add to
>>> my
>>> knowledge of Scala. We didn't pass any /typehint/ to the compiler there!
>>>
>>> Could you please put a hint of a line or two? TIA.
>>>
>>>
>>>
>>>
>>>
>>> --
>>> View this message in context: http://apache-flink-user-maili
>>> ng-list-archive.2336050.n4.nabble.com/Compiler-error-while-
>>> using-CsvTableSource-tp11412p11427.html
>>> Sent from the Apache Flink User Mailing List archive. mailing list
>>> archive at Nabble.com.
>>>
>>
>>
>>
>> --
>> If you reply to this email, your message will be added to the discussion
>> below:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.
>> nabble.com/Compiler-error-while-using-CsvTableSource-tp11412p11441.html
>> To unsubscribe from Compiler error while using 'CsvTableSource', click
>> here.
>> NAML
>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer=instant_html%21nabble%3Aemail.naml=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>>
>
>
>
> --
> Software Technologist
> http://www.linkedin.com/in/nirmalyasengupta
> "If you have built castles in the air, your work need not be lost. That is
> where they should be.
> Now put the foundation under them."
>
> --
> View this message in context: Re: Compiler error while using
> 'CsvTableSource'
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Compiler-error-while-using-CsvTableSource-tp11412p11443.html>
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/> at
> Nabble.com.
>
>
>
>
> --
> If you reply to this email, your message will be added to the discussion
> below:
> http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/Compiler-error-while-using-CsvTableSource-tp11412p11452.html
> To unsubscribe from Compiler error while using 'CsvTableSource', click
> here
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code=11412=c2VuZ3VwdGEubmlybWFseWFAZ21haWwuY29tfDExNDEyfC01NzQyMjQyNDk=>
> .
> NA

Re: Compiler error while using 'CsvTableSource'

2017-02-04 Thread nsengupta
Thanks, Till, for taking time to share your understanding.

-- N

On Sun, Feb 5, 2017 at 12:49 AM, Till Rohrmann [via Apache Flink User
Mailing List archive.] <ml-node+s2336050n11441...@n4.nabble.com> wrote:

> I think the problem is that there are actually two constructors with the
> same signature. The one is defined with default arguments and the other has
> the same signature as the one with default arguments when you leave all
> default arguments out. I assume that this confuses the Scala compiler and
> only works if you've specified the right types or at least one of the
> parameters with a default argument.
>
> Cheers,
> Till
>
> On Fri, Feb 3, 2017 at 12:49 PM, nsengupta <[hidden email]
> <http:///user/SendEmail.jtp?type=node=11441=0>> wrote:
>
>> Till,
>>
>> Many thanks. Just to confirm that it is working fine at my end, here's a
>> screenshot.
>>
>> <http://apache-flink-user-mailing-list-archive.2336050.n4.
>> nabble.com/file/n11427/Selection_258.png>
>>
>> This is Flink 1.1.4 but Flink-1.2/Flink-1.3 shouldn't be any problem.
>>
>> It never struck me that lack of covariance in Scala Arrays was the source
>> of
>> the problem. Bravo!
>>
>> BTW, I am just curious to know how the Testcases worked: just to add to my
>> knowledge of Scala. We didn't pass any /typehint/ to the compiler there!
>>
>> Could you please put a hint of a line or two? TIA.
>>
>>
>>
>>
>>
>> --
>> View this message in context: http://apache-flink-user-maili
>> ng-list-archive.2336050.n4.nabble.com/Compiler-error-whil
>> e-using-CsvTableSource-tp11412p11427.html
>> Sent from the Apache Flink User Mailing List archive. mailing list
>> archive at Nabble.com.
>>
>
>
>
> --
> If you reply to this email, your message will be added to the discussion
> below:
> http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/Compiler-error-while-using-CsvTableSource-tp11412p11441.html
> To unsubscribe from Compiler error while using 'CsvTableSource', click
> here
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code=11412=c2VuZ3VwdGEubmlybWFseWFAZ21haWwuY29tfDExNDEyfC01NzQyMjQyNDk=>
> .
> NAML
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer=instant_html%21nabble%3Aemail.naml=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>



-- 
Software Technologist
http://www.linkedin.com/in/nirmalyasengupta
"If you have built castles in the air, your work need not be lost. That is
where they should be.
Now put the foundation under them."




--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Compiler-error-while-using-CsvTableSource-tp11412p11443.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.

Table API: java.sql.DateTime is not supported;

2017-02-04 Thread nsengupta
I am reading a bunch of records from a CSV file. A record looks like this:

"4/1/2014 0:11:00",40.769,-73.9549,"B02512"

I intend to treat these records as SQL Rows and then process.

Here's the code:

package org.nirmalya.exercise

import java.time.LocalDateTime

import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.table.TableEnvironment
import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
import org.apache.flink.api.table._
import org.apache.flink.api.table.sources.CsvTableSource
import org.apache.flink.api.scala.table.TableConversions
import org.apache.flink.api.scala._
/**
  * Created by nirmalya on 4/2/17.
  */
object TrafficDataTrainer {

  def main(args: Array[String]): Unit = {

case class Trip(timeOfPickUp: LocalDateTime, lat: Double, lon: Double,
base: String)

val env = ExecutionEnvironment.getExecutionEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

val myDataStorePath = "/home/nirmalya/Downloads/Traffic"

val csvTableSource = new CsvTableSource(
  myDataStorePath + "/traffic-raw-data-apr14.csv",
  Array("timeOfPickUp", "lat", "lon", "base"),
  (
Array[org.apache.flink.api.common.typeinfo.TypeInformation[_]](
  Types.TIMESTAMP,
  Types.DOUBLE,
  Types.DOUBLE,
  Types.STRING
)
  )
)

tableEnv.registerTableSource("TrafficData",csvTableSource)

val trafficTable = tableEnv.scan("TrafficData")

val result = trafficTable.select("timeOfPickUp,lat,lon,base")

val trafficDataSet = new TableConversions(result).toDataSet[Trip]

trafficDataSet.collect().take(10).foreach(println)
  }
}


At run time, the exception that is thrown is:

--
Exception in thread "main" java.lang.IllegalArgumentException: The type
'java.sql.Date' is not supported for the CSV input format.
at
org.apache.flink.api.common.io.GenericCsvInputFormat.setFieldsGeneric(GenericCsvInputFormat.java:306)
at
org.apache.flink.api.table.runtime.io.RowCsvInputFormat.(RowCsvInputFormat.scala:52)
at
org.apache.flink.api.table.sources.CsvTableSource.createCsvInput(CsvTableSource.scala:99)
at
org.apache.flink.api.table.sources.CsvTableSource.getDataSet(CsvTableSource.scala:78)
at
org.apache.flink.api.table.plan.nodes.dataset.BatchTableSourceScan.translateToPlan(BatchTableSourceScan.scala:55)
at
org.apache.flink.api.table.BatchTableEnvironment.translate(BatchTableEnvironment.scala:274)
at
org.apache.flink.api.scala.table.BatchTableEnvironment.toDataSet(BatchTableEnvironment.scala:139)
at
org.apache.flink.api.scala.table.TableConversions.toDataSet(TableConversions.scala:41)
at org.nirmalya.exercise.UberDataTrainer$.main(UberDataTrainer.scala:45)
at org.nirmalya.exercise.UberDataTrainer.main(UberDataTrainer.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)

--

I see that in org.apache.flink.api.common.io.GenericCsvInputFormat:303, the
check fails because the stated type 
isn't a part of known types. However, the constructor of *CsvTableSource*
accepts a /Types.DATE/ as well /Types.TIMESTAMP/ (I tried with both of them,
and the exception is the same).

Could someone please point out where I am going wrong?

-- Nirmalya







--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Table-API-java-sql-DateTime-is-not-supported-tp11439.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Compiler error while using 'CsvTableSource'

2017-02-03 Thread nsengupta
Till,

Many thanks. Just to confirm that it is working fine at my end, here's a
screenshot. 


 

This is Flink 1.1.4 but Flink-1.2/Flink-1.3 shouldn't be any problem.

It never struck me that lack of covariance in Scala Arrays was the source of
the problem. Bravo! 

BTW, I am just curious to know how the Testcases worked: just to add to my
knowledge of Scala. We didn't pass any /typehint/ to the compiler there! 

Could you please put a hint of a line or two? TIA.





--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Compiler-error-while-using-CsvTableSource-tp11412p11427.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Compiler error while using 'CsvTableSource'

2017-02-02 Thread nsengupta
Til,

FWIW, I have fired the entire testsuite for Flink latest Snapshot.

Almost all testcases passed, particularly this one:


 

This case uses a bulit-in loaded CSV (in
org.apache.flink.table.api.scala.batch.TableSourceTest.scala):


 

Because this testcase runs, I presume that compiler is able to sort out the
TypeInformation while building Flink-libraries, but while running, it is
failing to do so, for some reason.

-- Nirmalya





--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Compiler-error-while-using-CsvTableSource-tp11412p11422.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Compiler error while using 'CsvTableSource'

2017-02-02 Thread nsengupta
Hello Till,

Many thanks for a quick reply. 

I have tried to follow your suggestion, with no luck:

 

Just to give it a shot, I have tried this too (following Flink
Documentation):


 


I took, a cursory look at the source code and stumbled upon this file:
./flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/*CommonTestData.scala*

Here, I see that you are using *BasicTypeInfo* while creating a
*CsvTableSource* and I have tried that too:


 

Because I ordered maven to skipTests, while building Flink binaries, I don't
know if this particular testcase passed or not. If it did, then I think we
can conclude that it is a case of Scala's failure to resolve type, perhaps
due to an implicit. It it didn't, then we perhaps have an easier problem to
solve.

Do you think I should run all the tests in my environment? It may take time,
but I can do that if it helps us.

-- Nirmalya







--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Compiler-error-while-using-CsvTableSource-tp11412p11421.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Compiler error while using 'CsvTableSource'

2017-02-02 Thread nsengupta
I am using *flink-shell*, available with flink-1.2-SNAPSHOT.

While loading a CSV file into a CsvTableSource - following the example given
with the documents - I get this error.


 

I am not sure what the reason is! Am I missing import of an *implicit*
somewhere?

Any help, appreciated.

-- Nirmalya













--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Compiler-error-while-using-CsvTableSource-tp11412.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Sharing State between Operators

2016-05-13 Thread nsengupta
Hello Flinksters


Alright. So, I had a fruitful exchange of messages with Balaji earlier
today, on this topic. I moved ahead with the understanding derived from the
exchange (thanks, Balaji) at the time. But, now I am back because I think my
approach is unclean, if not incorrect. There probably is a smarter way to
achieve the same but I can't figure it out.

Here's the problem:

A building has 4 walls (0,1,2,3). On each wall, a number of devices has been
planted to capture some physical attribute: let's say temperature at that
spot. Every device has a unique ID. 

A typical tuple looks like this (Reading ==> Temperature as an Integer):
(TupleType,Time,WallID,DeviceID,Reading) 

The system works on the basis of records arriving in a time-window of 60
seconds. We can consider this to be a Tumbling Window. The time (and Window
assignment etc.) is not the issue here. The 'Time' field increases
monotonically.

If TupleType == 0, I need to compute and update my data structures from the
stream

If TupleType == 1, I need to emit the maximum temperature recorded by the
DeviceID out of last 5 readings.

If TupleType == 2, I need to emit the number of readings so far arrived from
the particular wall. Obviously, in this case, we will ignore the value of
fields 'DeviceID' and 'Reading' in the tuple.

The Application generates output for TupleType 1 and TupleType 2. 

The TupleTypes can arrive in any order. For example, TupleType 1 may arrive
with a DeviceID which the application hasn't seen before (no corresponding
TupleType 0 has arrived earlier with that DeviceID). Let us assume that we
have a fallback value to be emitted for such cases, to keep things simple.

In my mind, the implementation should be along this line:

- Split the incoming Stream in three separate substreams using SplitStream,
based upon TupleType
- For StreamOFTupleType0,
  - KeyBy(DeviceID)
  - Apply a Mapper
 - Update a Map [DeviceID, [Tuple2(MaxReadingSoFar,
FixedSizeList[Reading])] somewhere
  - Apply (next) Mapper
 - Calculate the total count of reading the Wall so far
 - Update a Map [WallID, Count]

- For StreamOFTupleType1
 - Access the Map created/updated through the first Mapper above
 - Emit

- For StreamOFTupleType2
- Access the Map created/updated through the second Mapper above. 
- Emit

I have hit a wall to decide how the live data structures should be created,
updated and accessed, correctly and efficiently  in a situation like above.
More importantly, how will they be shared between operators, across
partitions (nodes).

I can't broadcast the Maps because they are not READONLY (/aka/ LookUp
only).

I can't create RichMapFunction local data structures because they are not
shared between partitions (my understanding). They will be blind to the
effect of accumulation. Each will begin with an empty Map.

I have done a bit of exploration and I have found this  thread

  
in the forum. I have understood what  Stephano

  
is suggesting ('..State is moved along pipeline ..')  but then, failed to
figure out how to apply in my case, if at all possible.

I have been thinking about using an external DB-like datastore but I want to
be sure about the inevitability of that decision. If I use a DB, then the
focus may go to the INSERT/SELECT like queries. My application then becomes
more of a distributed DB application rather than a lean Streaming
application. That thought doesn't make me happy! :-)

Please make me wiser (by pointing out gaps in understanding where they
exist). If any more specific information helps you, please ask me.

My primary aim is to have a clarity of the recipe of a UseCase like this.

-- Nirmalya
  






--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Sharing-State-between-Operators-tp6911.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Availability of OrderedKeyedDataStream

2016-05-13 Thread nsengupta
Hello Flinksters,

Have you decided to do away with the 'OrderedKeyedDataStream' type
altogether? I didn't find it in the API  documents
  .
It is mentioned and elaborated  here

  
and I think it can be a good fit for an UseCase I am trying to implement.

Could someone please confirm? If it exists but with a different name or if
the same behaviour can be achieved using other methods, I would like to be
wiser.

-- Nirmalya




--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Availability-of-OrderedKeyedDataStream-tp6903.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Count of Grouped DataSet

2016-05-01 Thread nsengupta
Hello all,

This is how I have moved ahead with the implementation of finding count of a
GroupedDataSet:

*val k = envDefault
  .fromElements((1,1,2,"A"),(1,1,2,"B"),(2,1,3,"B"),(3,1,4,"C"))
  .groupBy(1,2)
  .reduceGroup(nextGroup => {
val asList = nextGroup.toList
(asList.head._2,asList.head._3,asList.size)
  })

k.print()*

While this produces the expected output alright, I am not sure if this the
ideal, idiomatic way to implement what I need. Could you please confirm? If
there is a better way, I would like to be wiser of course.

-- Nirmalya



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Count-of-Grouped-DataSet-tp6592p6594.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Discarding header from CSV file

2016-04-29 Thread nsengupta
Hello Chiwan,

Sorry for the late reply. I have been into other things for some time.

Yes, you are right. I have been assuming that field to be Integer, wrongly.

I will fix it and give it a go again.

Many thanks again.

-- Nirmalya



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Discarding-header-from-CSV-file-tp6474p6577.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Problem in creating quickstart project using archetype (Scala)

2016-04-28 Thread nsengupta
Hello all,

I don't know if anyone else has faced his; I haven't so far.

When I try to create a new project template following the instructions  here

 
, it fails.

This is what happens (along with the command I give):

nirmalya@Cheetah:~/Workspace-Flink$ mvn archetype:generate  

\
>   -DarchetypeGroupId=org.apache.flink  \
>   -DarchetypeArtifactId=flink-quickstart-scala \
>   -DarchetypeVersion=1.1-SNAPSHOT
[INFO] Scanning for projects...
[INFO] 
[INFO]

[INFO] Building Maven Stub Project (No POM) 1
[INFO]

[INFO] 
[INFO] >>> maven-archetype-plugin:2.3:generate (default-cli) >
generate-sources @ standalone-pom >>>
[INFO] 
[INFO] <<< maven-archetype-plugin:2.3:generate (default-cli) <
generate-sources @ standalone-pom <<<
[INFO] 
[INFO] --- maven-archetype-plugin:2.3:generate (default-cli) @
standalone-pom ---
[INFO] Generating project in Interactive mode
[INFO] Archetype repository not defined. Using the one from
[org.apache.flink:flink-quickstart-scala:1.0.2] found in catalog remote
[INFO]

[INFO] BUILD FAILURE
[INFO]

[INFO] Total time: 01:15 min
[INFO] Finished at: 2016-04-28T22:22:57+05:30
[INFO] Final Memory: 14M/226M
[INFO]

[ERROR] Failed to execute goal
org.apache.maven.plugins:maven-archetype-plugin:2.3:generate (default-cli)
on project standalone-pom: The desired archetype does not exist
(org.apache.flink:flink-quickstart-scala:1.1-SNAPSHOT) -> [Help 1]
[ERROR] 
[ERROR] To see the full stack trace of the errors, re-run Maven with the -e
switch.
[ERROR] Re-run Maven using the -X switch to enable full debug logging.
[ERROR] 
[ERROR] For more information about the errors and possible solutions, please
read the following articles:
[ERROR] [Help 1]
http://cwiki.apache.org/confluence/display/MAVEN/MojoFailureException
nirmalya@Cheetah:~/Workspace-Flink$ 


Could someone please point out the mistake? 

-- Nirmalya



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Problem-in-creating-quickstart-project-using-archetype-Scala-tp6560.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Discarding header from CSV file

2016-04-27 Thread nsengupta
Hello Chiwan,

Yes, that's an oversight on my part. In my hurry, I didn't even try to
explore the source of that /Exception/. Thanks, again.

However, I still don't know why I am not being able to read the CSV file. As
the output shows, using standard IO routines, I can read the same file
anyway. 

Could you spot my mistake?

-- Nirmalya



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Discarding-header-from-CSV-file-tp6474p6519.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Discarding header from CSV file

2016-04-27 Thread nsengupta
Till,

Thanks for looking into this.

I have removed the toList() from the collect() function, to align the code
with what I generally do in a Flink application. It throws an Exception, and
I can't figure out why.

*Here's my code (shortened for brevity):*

case class BuildingInformation(buildingID: Int, buildingManager: Int,
buildingAge: Int, productID: String, country: String)

object HVACReadingsAnalysis {

  def main(args: Array[String]): Unit = {

val envDefault = ExecutionEnvironment.getExecutionEnvironment

val buildings =
readBuildingInfo(envDefault,"./SensorFiles/building.csv")

buildings.print

envDefault.execute("HVAC Simulation")
  }

  private def readBuildingInfo(env: ExecutionEnvironment, inputPath: String)
= {

   // [NS]: I can see the lines, read correctly from the CSV file here
println("As read from CSV file")
println(Source.fromFile(inputPath).getLines.toList.mkString("#\n"))

// [NS]: Then, I read the same file using the library function
   env.readCsvFile [BuildingInformation] (
  inputPath,
  ignoreFirstLine = true,
  pojoFields =
Array("buildingID","buildingManager","buildingAge","productID","country")
)
  }


*Relevant portion of the output:
*
As read from CSV file
BuildingID,BuildingMgr,BuildingAge,HVACproduct,Country#
1,M1,25,AC1000,USA#
2,M2,27,FN39TG,France#
3,M3,28,JDNS77,Brazil#
4,M4,17,GG1919,Finland#
5,M5,3,ACMAX22,Hong Kong#
6,M6,9,AC1000,Singapore#
7,M7,13,FN39TG,South Africa#
8,M8,25,JDNS77,Australia#
9,M9,11,GG1919,Mexico#
10,M10,23,ACMAX22,China#
11,M11,14,AC1000,Belgium#
12,M12,26,FN39TG,Finland#
13,M13,25,JDNS77,Saudi Arabia#
14,M14,17,GG1919,Germany#
15,M15,19,ACMAX22,Israel#
16,M16,23,AC1000,Turkey#
17,M17,11,FN39TG,Egypt#
18,M18,25,JDNS77,Indonesia#
19,M19,14,GG1919,Canada#
20,M20,19,ACMAX22,Argentina
15:34:18,914 INFO  org.apache.flink.api.java.ExecutionEnvironment   
- The job has 0 registered types and 0 default Kryo serializers
15:34:19,104 INFO  org.apache.flink.runtime.minicluster.FlinkMiniCluster
- Starting FlinkMiniCluster.
15:34:19,912 INFO  akka.event.slf4j.Slf4jLogger 
- Slf4jLogger started


// ..
// ... more log statements
// ..

Exception in thread "main" java.lang.RuntimeException: No new data sinks
have been defined since the last execution. The last execution refers to the
latest call to 'execute()', 'count()', 'collect()', or 'print()'.
at
org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:979)
at
org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:961)
at
org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:84)
at
org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:652)
at
main.scala.hortonworks.tutorial.HVACReadingsAnalysis$.main(HVACReadingsAnalysis.scala:60)
at
main.scala.hortonworks.tutorial.HVACReadingsAnalysis.main(HVACReadingsAnalysis.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)

Process finished with exit code 1




--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Discarding-header-from-CSV-file-tp6474p6494.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Discarding header from CSV file

2016-04-26 Thread nsengupta
Chiwan and other Flinksters,I am stuck with the following. Somehow, I am an
unable to spot the error, if any! Please help.*I have this case class*:case
class BuildingInformation(buildingID: Int, buildingManager: Int,
buildingAge: Int, productID: String, country: String)*I intend to read from
a CSV file which has a one-line
header*:BuildingID,BuildingMgr,BuildingAge,HVACproduct,Country*I attempt to
read the file in this manner*:private def readBuildingInfo(env:
ExecutionEnvironment, inputPath: String) = {env.readCsvFile
[BuildingInformation] (  inputPath,  ignoreFirstLine = true, 
pojoFields =
Array("buildingID","buildingManager","buildingAge","productID","country")   
)}*Then, I use this function in the driver's main()*:val envDefault =
ExecutionEnvironment.getExecutionEnvironmentval buildings =
readBuildingInfo(envDefault,"./SensorFiles/building.csv").collect().toListThe
'buildings' list is always *empty*!I fail to figure out, why! I have checked
that the path of the CSV file is correct and accessible. Also, I can read
the same stuff by following the usual method of reading as a text-line,
parsing the commas and creating the POJOs (case-classes).-- Nirmalya



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Discarding-header-from-CSV-file-tp6474p6477.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.

Re: Discarding header from CSV file

2016-04-26 Thread nsengupta
Hello Chiwan,

I was just about to post to declare my ignorance, because I searched again
and realized that I failed to spot ReadCsvFile ! :-) You have been faster
than me!

Yes, I should use ReadCsvFile so that I get all the facilities built in.

Many thanks for pointing out.

-- N


[image: --]

Nirmalya Sengupta
[image: https://]about.me/sengupta.nirmalya
<https://about.me/sengupta.nirmalya?promo=email_sig_source=email_sig_medium=external_link_campaign=chrome_ext>


On Wed, Apr 27, 2016 at 7:19 AM, Chiwan Park-2 [via Apache Flink User
Mailing List archive.] <ml-node+s2336050n6475...@n4.nabble.com> wrote:

> Hi, Nirmalya
>
> I recommend readCsvFile() method rather than readTextFile() to read CSV
> file. readCsvFile() provides some features for CSV file such as
> ignoreFirstLine() (what you are looking for), ignoreComments(), and etc.
>
> If you have to use readTextFile() method, I think, you can ignore column
> headers by calling zipWithIndex method and filtering it based on the index.
>
> Regards,
> Chiwan Park
>
> > On Apr 27, 2016, at 10:32 AM, nsengupta <[hidden email]
> <http:///user/SendEmail.jtp?type=node=6475=0>> wrote:
> >
> > What is the recommended way of discarding the Column Header(s) from a
> CSV
> > file, if I am using
> >
> > /environment.readTextFile()
> > /
> > facility? Obviously, we don't know beforehand, which of the nodes will
> read
> > the Header(s)? So, we cannot use usual tricks like drop(1)?
> >
> > I don't recall well: has this been discussed and closed earlier in this
> > forum? If so, can someone point that out to me please?
> >
> > -- Nirmalya
> >
> >
> >
> > --
> > View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Discarding-header-from-CSV-file-tp6474.html
> > Sent from the Apache Flink User Mailing List archive. mailing list
> archive at Nabble.com.
>
>
>
> --
> If you reply to this email, your message will be added to the discussion
> below:
>
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Discarding-header-from-CSV-file-tp6474p6475.html
> To unsubscribe from Discarding header from CSV file, click here
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code=6474=c2VuZ3VwdGEubmlybWFseWFAZ21haWwuY29tfDY0NzR8LTU3NDIyNDI0OQ==>
> .
> NAML
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer=instant_html%21nabble%3Aemail.naml=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>



-- 
Software Technologist
http://www.linkedin.com/in/nirmalyasengupta
"If you have built castles in the air, your work need not be lost. That is
where they should be.
Now put the foundation under them."




--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Discarding-header-from-CSV-file-tp6474p6476.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.

Discarding header from CSV file

2016-04-26 Thread nsengupta
What is the recommended way of discarding the Column Header(s) from a CSV
file, if I am using

/environment.readTextFile()
/
facility? Obviously, we don't know beforehand, which of the nodes will read
the Header(s)? So, we cannot use usual tricks like drop(1)?

I don't recall well: has this been discussed and closed earlier in this
forum? If so, can someone point that out to me please?

-- Nirmalya



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Discarding-header-from-CSV-file-tp6474.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.