How to get past "bad" Kafka message, restart, keep state

2018-06-19 Thread chrisr123
First time I'm trying to get this to work so bear with me. I'm trying to learn checkpointing with Kafka and handling "bad" messages, restarting without losing state. Use Case: Use checkpointing. Read a stream of integers from Kafka, keep a running sum. If a "bad" Kafka message read, restart app,

What does the number in front of ">" mean when I print a DataStream

2018-06-13 Thread chrisr123
What does the number in front of the ">" character mean when call print() on a dataset? For example I may have this in my source where I am reading a socket stream of sensor data: DataStream> simpleStream = env

Debugging window processing: can I output window start/end times, prove correctness?

2018-05-28 Thread chrisr123
I am learning the tumbling and rolling window API and I was wondering what API calls people use to determine if their events are being assigned to windows as they expect? For example, is there a way to print out the window start and and times for windows as they are being processed, and what

Writing Table API results to a csv file

2018-05-27 Thread chrisr123
I'm using Flink 1.4.0 I'm trying to save the results of a Table API query to a CSV file, but I'm getting an error. Here are the details: My Input file looks like this: id,species,color,weight,name 311,canine,golden,75,dog1 312,canine,brown,22,dog2 313,feline,gray,8,cat1 I run a query on this to

Re: Debugging window processing: can I output window start/end times, prove correctness?

2018-05-29 Thread chrisr123
I'm not sure if this is a "best practice" for debugging, but I found that if use apply() one of the parameters passed into the WindowFunction that I must implement contains a TimeWindow object, that has start and end times: private static class MyApplyWindowFunction implements WindowFunction,

guidelines for setting parallelism in operations/job?

2018-06-05 Thread chrisr123
Hello, I'm trying to get some simple rules or guidelines for what values to set for operator or job parallelism. It would seem to me that it should be a number <= the number of available task slots? For example, suppose I have 2 task manager machines, each with 4 task slots. Assuming no other

Passing type information to JDBCAppendTableSink

2018-07-01 Thread chrisr123
I'm trying to determine if I'm specifying type information properly when doing an INSERT using the JDBCAppendTableSink API. Specifically, how do I specify timestamp and date types? It looks like I need to use Type.SQL_TIMESTAMP for a timestamp but BasicTypeInfo for types like varchar, etc? I

Re: Passing type information to JDBCAppendTableSink

2018-07-01 Thread chrisr123
Full Source except for mapper and timestamp assigner. Sample Input Stream record: 1530447316589,Mary,./home What are the correct parameters to pass for data types in the JDBCAppendTableSink? Am I doing this correctly? // Get Execution Environment

Simple stdout sink for testing Table API?

2018-06-23 Thread chrisr123
Is there a simple way to output the first few rows of a Flink table to stdout when developing an application? I just want to see the first 10-20 rows on screen during development to make sure my logic is correct. There doesnt seem to be something like print(10) in the API to see the first n

Re: Simple stdout sink for testing Table API?

2018-06-24 Thread chrisr123
Thanks Hequn! -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Flink 1.5 TooLongFrameException in cluster mode?

2018-06-21 Thread chrisr123
This looks related to using the -m option on CLI: This works: $FLINK_HOME/bin/flink run -c $CLASS $JARFILE but this causes the error: $FLINK_HOME/bin/flink run -m jobmanagermachine:6123 -c $CLASS $JARFILE I found this thread here from April 27

Flink 1.5 TooLongFrameException in cluster mode?

2018-06-20 Thread chrisr123
I download Flink 1.5 and I'm trying to run it in standalone mode. 1 job manager, 2 task managers. I can run flink job when I run in local mode: 1 machine as both job manager and task manager. But when I add 2 remote machines as slaves and try to run, I am seeing this error in the log and the

env.setStateBackend deprecated in 1.5 for FsStateBackend

2018-06-26 Thread chrisr123
I upgraded from Flink 1.4 to 1.5 and now this call is being flagged as deprecated. What should I change this code to for 1.5 to get rid of the deprecation warning? Thanks // deprecated env.setStateBackend(new FsStateBackend("hdfs://myhdfsmachine:9000/flink/checkpoints")); -- Sent from:

Custom Window example (data-based)

2018-07-27 Thread chrisr123
I want to get some experience implementing a custom window assigner, trigger, evictor, etc. Does anyone have an example of a custom window implementation that I could look at, or an idea for one to implement? The goal is to learn the custom window API. I'm looking for something besides a time or

Error on SQL orderBy Error

2018-08-19 Thread chrisr123
Use Case: I have a CSV file with data that I want to do a SELECT with orderBy. I'm getting this error below. What am I doing incorrectly? Thanks! *Expression (('id).asc).asc failed on input check: Sort should only based on field reference * *Input File structure:*

SingleOutputStreamOperator vs DataStream?

2018-07-24 Thread chrisr123
I'm trying to get a list of late elements in my Tumbling Windows application and I noticed that I need to use SingleOutputStreamOperator instead of DataStream to get access to the .sideOutputLateData(...) method. Can someone explain what the difference is between SingleOutputStreamOperator and

specifying prefix for print(), printToErr() ?

2018-07-14 Thread chrisr123
The documentation states that there is a way to specify a prefix msg to distinguish between different calls to print() (see below), but I have not found a way to do this? Can anyone show me how I would code this? What I'd like to do conceptually, and have the prefix msg show up in the output so

understanding purpose of TextInputFormat

2018-07-14 Thread chrisr123
I'm building a streaming app that continuously monitors a directory for new files and I'm confused about why I have to specify a TextInputFormat - see source code below. It seems redundant but it is a required parameter. It makes perfect sense to specify the directory I want to monitor, but what

Data Type of timestamp in Streaming SQL Result? Long instead of timestamp?

2018-06-28 Thread chrisr123
I am trying to understand how to use streaming sql, very similar to the example from the documentation: count the number of pageclicks in a certain period of time for each user. I'm trying to solve the problem using both the SQL API and the table API My input sample stream looks like this:

Re: Data Type of timestamp in Streaming SQL Result? Long instead of timestamp?

2018-06-29 Thread chrisr123
Thank you Hequn, I got it working. Here is the tumbling window query, in both SQL and Table API. I'm getting same results with these: SQL API tableEnvironment.registerDataStream("pageViews", eventStream, "pageViewTime.rowtime, username, url"); String continuousQuery = "SELECT

Reading Data from zip/gzip

2018-10-22 Thread chrisr123
I'm able to read normal txt or csv files using Flink, but what would I need to do in order to read them if they are given to me in zip or gzip format? Assuming I do not want to have to unzip them. Thanks! -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Restart from checkpoint after program failure

2018-10-17 Thread chrisr123
Hi Folks, I'm trying to restart my program with restored state from a checkpoint after a program failure (restart strategies tried but exhausted), but I'm not picking up the restored state. What am I doing wrong here? *Summary* I'm using a very simple app on 1 node just to learn checkpointing.