Hi Chesnay,

> On Jun 19, 2019, at 6:05 AM, Chesnay Schepler <ches...@apache.org> wrote:
> 
> A (Rich)SourceFunction that does not implement RichParallelSourceFunction is 
> always run with a parallelism of 1.

RichSourceFunction 
<https://ci.apache.org/projects/flink/flink-docs-stable/api/java/index.html?org/apache/flink/streaming/api/functions/source/RichSourceFunction.html>
 says "Base class for implementing a parallel data source…” and also talks 
about (in a similar, but not identical way as RichParallelSourceFunction 
<https://ci.apache.org/projects/flink/flink-docs-stable/api/java/index.html?org/apache/flink/streaming/api/functions/source/RichSourceFunction.html>)
 use of getRuntimeContext() to determine the sub-task index.

But you’d always want to extend RichParallelSourceFunction to create a parallel 
data source, yes?

Seems confusing.

Thanks,

— Ken

> 
> On 19/06/2019 14:36, Flavio Pompermaier wrote:
>> My sourcefunction is intrinsically single-thread. Is there a way to force 
>> this aspect?
>> I can't find a real difference between a RichParallelSourceFunction and a 
>> RichSourceFunction.
>> Is this last (RichSourceFunction) implicitly using parallelism = 1?
>> 
>> On Wed, Jun 19, 2019 at 2:25 PM Chesnay Schepler <ches...@apache.org 
>> <mailto:ches...@apache.org>> wrote:
>> It returns a list of states so that state can be re-distributed if the 
>> parallelism changes.
>> 
>> If you hard-code the interface to return a single value then you're 
>> implicitly locking the parallelism.
>> When you reduce the parallelism you'd no longer be able to restore all 
>> state, since you have less instances than stored state.
>> 
>> On 19/06/2019 14:19, Flavio Pompermaier wrote:
>>> It's not clear to me why the source checkpoint returns a list of 
>>> object...when it could be useful to use a list instead of a single value?
>>> The documentation says The returned list should contain one entry for 
>>> redistributable unit of state" but this is not very clear to me..
>>> 
>>> Best,
>>> Flavio
>>> 
>>> On Wed, Jun 19, 2019 at 12:40 PM Chesnay Schepler <ches...@apache.org 
>>> <mailto:ches...@apache.org>> wrote:
>>> This looks fine to me.
>>> 
>>> What exactly were you worried about?
>>> 
>>> On 19/06/2019 12:33, Flavio Pompermaier wrote:
>>> > Hi to all,
>>> > in my use case I have to ingest data from a rest service, where I 
>>> > periodically poll the data (of course a queue would be a better choice 
>>> > but this doesn't depend on me).
>>> >
>>> > So I wrote a RichSourceFunction that starts a thread that poll for new 
>>> > data.
>>> > However, I'd like to restart from the last "from" value (in the case 
>>> > the job is stopped).
>>> >
>>> > My initial thought was to write somewhere the last used date and, on 
>>> > job restart, read that date (from a file for example). However, Flink 
>>> > stateful source should be a better choice here...am I wrong? So I 
>>> > made  my source function implementing ListCheckpointed<String>:
>>> >
>>> > @Override
>>> > public List<String> snapshotState(long checkpointId, long timestamp) 
>>> > throws Exception {
>>> >    return Collections.singletonList(pollingThread.getDateFromAsString());
>>> > }
>>> > @Override
>>> > public void restoreState(List<String> state) throws Exception {
>>> >     for (String dateFrom : state) {
>>> >          startDateStr = dateFrom;
>>> >      }
>>> > }
>>> >
>>> > @Override
>>> > public void run(SourceContext<MyEvent> ctx) throws Exception {
>>> >        final Object lock = ctx.getCheckpointLock();
>>> >        Client httpClient = getHttpClient();
>>> >        try {
>>> >               pollingThread = new MyPollingThread.Builder(baseUrl, 
>>> > httpClient)//
>>> >               .setStartDate(startDateStr, datePatternStr)//
>>> >               .build();
>>> >               // start the polling thread
>>> >               new Thread(pr).start();
>>> >         .... (etc)
>>> > }
>>> >
>>> > Is this the correct approach or did I misunderstood how stateful 
>>> > source functions work?
>>> >
>>> > Best,
>>> > Flavio
>>> 
>>> 
>>> 
>> 
>> 
> 

--------------------------
Ken Krugler
http://www.scaleunlimited.com
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr

Reply via email to