Hi Chesnay, > On Jun 19, 2019, at 6:05 AM, Chesnay Schepler <ches...@apache.org> wrote: > > A (Rich)SourceFunction that does not implement RichParallelSourceFunction is > always run with a parallelism of 1.
RichSourceFunction <https://ci.apache.org/projects/flink/flink-docs-stable/api/java/index.html?org/apache/flink/streaming/api/functions/source/RichSourceFunction.html> says "Base class for implementing a parallel data source…” and also talks about (in a similar, but not identical way as RichParallelSourceFunction <https://ci.apache.org/projects/flink/flink-docs-stable/api/java/index.html?org/apache/flink/streaming/api/functions/source/RichSourceFunction.html>) use of getRuntimeContext() to determine the sub-task index. But you’d always want to extend RichParallelSourceFunction to create a parallel data source, yes? Seems confusing. Thanks, — Ken > > On 19/06/2019 14:36, Flavio Pompermaier wrote: >> My sourcefunction is intrinsically single-thread. Is there a way to force >> this aspect? >> I can't find a real difference between a RichParallelSourceFunction and a >> RichSourceFunction. >> Is this last (RichSourceFunction) implicitly using parallelism = 1? >> >> On Wed, Jun 19, 2019 at 2:25 PM Chesnay Schepler <ches...@apache.org >> <mailto:ches...@apache.org>> wrote: >> It returns a list of states so that state can be re-distributed if the >> parallelism changes. >> >> If you hard-code the interface to return a single value then you're >> implicitly locking the parallelism. >> When you reduce the parallelism you'd no longer be able to restore all >> state, since you have less instances than stored state. >> >> On 19/06/2019 14:19, Flavio Pompermaier wrote: >>> It's not clear to me why the source checkpoint returns a list of >>> object...when it could be useful to use a list instead of a single value? >>> The documentation says The returned list should contain one entry for >>> redistributable unit of state" but this is not very clear to me.. >>> >>> Best, >>> Flavio >>> >>> On Wed, Jun 19, 2019 at 12:40 PM Chesnay Schepler <ches...@apache.org >>> <mailto:ches...@apache.org>> wrote: >>> This looks fine to me. >>> >>> What exactly were you worried about? >>> >>> On 19/06/2019 12:33, Flavio Pompermaier wrote: >>> > Hi to all, >>> > in my use case I have to ingest data from a rest service, where I >>> > periodically poll the data (of course a queue would be a better choice >>> > but this doesn't depend on me). >>> > >>> > So I wrote a RichSourceFunction that starts a thread that poll for new >>> > data. >>> > However, I'd like to restart from the last "from" value (in the case >>> > the job is stopped). >>> > >>> > My initial thought was to write somewhere the last used date and, on >>> > job restart, read that date (from a file for example). However, Flink >>> > stateful source should be a better choice here...am I wrong? So I >>> > made my source function implementing ListCheckpointed<String>: >>> > >>> > @Override >>> > public List<String> snapshotState(long checkpointId, long timestamp) >>> > throws Exception { >>> > return Collections.singletonList(pollingThread.getDateFromAsString()); >>> > } >>> > @Override >>> > public void restoreState(List<String> state) throws Exception { >>> > for (String dateFrom : state) { >>> > startDateStr = dateFrom; >>> > } >>> > } >>> > >>> > @Override >>> > public void run(SourceContext<MyEvent> ctx) throws Exception { >>> > final Object lock = ctx.getCheckpointLock(); >>> > Client httpClient = getHttpClient(); >>> > try { >>> > pollingThread = new MyPollingThread.Builder(baseUrl, >>> > httpClient)// >>> > .setStartDate(startDateStr, datePatternStr)// >>> > .build(); >>> > // start the polling thread >>> > new Thread(pr).start(); >>> > .... (etc) >>> > } >>> > >>> > Is this the correct approach or did I misunderstood how stateful >>> > source functions work? >>> > >>> > Best, >>> > Flavio >>> >>> >>> >> >> > -------------------------- Ken Krugler http://www.scaleunlimited.com custom big data solutions & training Hadoop, Cascading, Cassandra & Solr