Hey Lukasz,
I tried using the setup function, but since this a streaming pipeline, the
batches tend to be pretty small. I could force the pipeline to batch things
up, but that feels like something that shouldn't be needed. I was already
caching between elements within a thread, the problem was at pipeline
start, or when a new instance was started, since each thread has its own
cache.
Using a static cache worked!
private static final LoadingCache<BlobId, TypesCache> CACHE =
CacheBuilder.newBuilder()
.refreshAfterWrite(30, TimeUnit.MINUTES)
.build(new CacheLoader());
This has gotten me unblocked, but isn't a perfect solution. Because the
cache is static, I can't set any parameters of it, meaning that it is very
hard to unit test because it is hard coded to access cloud storage instead
of a local file.
I tried using a Singleton to hold the cache and fetch it from the DoFn, but
it seems like the Singleton isn't shared amongst all of the threads. I can
see from the logs that all of the DoFn calls are on the same worker
instance and different threads, I see a log statement from inside my
synchronized block for each thread, which shouldn't be possible.
Thoughts?
On Thu, Jun 15, 2017 at 6:26 AM, Lukasz Cwik <[email protected]> wrote:
> Take a look at DoFn setup/teardown, called only once per DoFn instance and
> not per element so it makes easier to write initialization code.
>
> Also if the schema map is shared, have you thought of using a single
> static instance of Guava's LoadingCache shared amongst all the DoFn
> instances?
>
> You can also refresh the data stored within the cache periodically.
>
> On Wed, Jun 14, 2017 at 10:39 PM, Kevin Peterson <[email protected]>
> wrote:
>
>> Still gets stuck at the same place :/
>>
>> On Wed, Jun 14, 2017 at 9:45 PM, Tang Jijun(上海_中台研发部_数据平台部_基础数据部_唐觊隽) <
>> [email protected]> wrote:
>>
>>>
>>>
>>> .triggering(
>>>
>>> AfterProcessingTime.*pastFirstElementInPane*().plusDelayOf(Duration.*standardSeconds*(1)))
>>> .discardingFiredPanes().withAllowedLateness(Duration.*ZERO*));
>>>
>>>
>>>
>>> Try the trigger above
>>>
>>>
>>>
>>> *发件人:* Kevin Peterson [mailto:[email protected]]
>>> *发送时间:* 2017年6月15日 2:39
>>> *收件人:* [email protected]
>>> *主题:* Fwd: Creating side input map with global window
>>>
>>>
>>>
>>> Hi all,
>>>
>>>
>>>
>>> I am working on a (streaming) pipeline which reads elements from Pubsub,
>>> and schemas for those elements from a separate pubsub topic. I'd like to be
>>> able to create a side input map from the schema topic, and have that
>>> available to the main pipeline for parsing. Each message on the schema
>>> pubsub topic contains all schemas I care about, so for every new message, I
>>> want to generate a new map that will be available to the main pipeline
>>> (eventual consistency is fine). I don't have any windows or triggers on the
>>> main flow, since I really just want each element to be processed as it
>>> arrives, using whatever the latest schema available is.
>>>
>>>
>>>
>>> I am currently trying this with:
>>>
>>>
>>>
>>> PCollection<KV<String, String>> schema = pipeline
>>> .apply("Read Schema",
>>> PubsubIO.*readStrings*().fromTopic("topic_for_schema"))
>>> .apply(Window.<String>*into*(new GlobalWindows()).triggering(
>>>
>>> Repeatedly.*forever*(AfterPane.*elementCountAtLeast*(1))).discardingFiredPanes())
>>> .apply("Create Schema", ParDo.*of*(new
>>> SchemaDirectory.GenerateSchema())); // outputs around 100 elements for
>>> each input
>>>
>>>
>>>
>>> PCollectionView<Map<String, String>> schemaView =
>>> schema.apply(View.<String, String>*asMap*());
>>>
>>> pipeline
>>> .apply("Read Elements",
>>> PubsubIO*.readStrings*().fromTopic("topic_for_elements")).apply("Parse
>>> Elements",
>>>
>>> ParDo.*of*(new DoFn<String, TableRow>() {
>>> @ProcessElement
>>> public void processElement(ProcessContext c) {
>>>
>>> String name = getNameFromElement(c.element());
>>>
>>>
>>> String schema = c.sideInput(schemaView).get(name);
>>>
>>>
>>> c.output(parse(c, schema));
>>>
>>> }
>>> }).withSideInputs(schemaView)).apply("Write to Table",
>>> BigQueryIO.*writeTableRows*()) // Other BQ options not copied.
>>>
>>> When running this pipeline, the View.AsMap/View.CreatePCol
>>> lectionView/Combine.globally(Concatenate)/Combine.perKey(Concatenate)/GroupByKey
>>> stage never emits any elements, and so the pipeline never progresses. I
>>> can see the messages at the input stage, but nothing appears on the output.
>>>
>>>
>>>
>>> Any advice?
>>>
>>>
>>>
>>> Thanks,
>>>
>>> -Kevin
>>>
>>>
>>>
>>
>>
>