To unit test your function, have it accept a supplier with the default supplier being the one that gives you a reference to the static instance and another supplier for testing purposes.
On Fri, Jun 23, 2017 at 8:23 AM, Kevin Peterson <[email protected]> wrote: > Hey Lukasz, > > I tried using the setup function, but since this a streaming pipeline, the > batches tend to be pretty small. I could force the pipeline to batch things > up, but that feels like something that shouldn't be needed. I was already > caching between elements within a thread, the problem was at pipeline > start, or when a new instance was started, since each thread has its own > cache. > > Using a static cache worked! > > private static final LoadingCache<BlobId, TypesCache> CACHE = > CacheBuilder.newBuilder() > .refreshAfterWrite(30, TimeUnit.MINUTES) > .build(new CacheLoader()); > > This has gotten me unblocked, but isn't a perfect solution. Because the > cache is static, I can't set any parameters of it, meaning that it is very > hard to unit test because it is hard coded to access cloud storage instead > of a local file. > > I tried using a Singleton to hold the cache and fetch it from the DoFn, > but it seems like the Singleton isn't shared amongst all of the threads. I > can see from the logs that all of the DoFn calls are on the same worker > instance and different threads, I see a log statement from inside my > synchronized block for each thread, which shouldn't be possible. > > Thoughts? > > > On Thu, Jun 15, 2017 at 6:26 AM, Lukasz Cwik <[email protected]> wrote: > >> Take a look at DoFn setup/teardown, called only once per DoFn instance >> and not per element so it makes easier to write initialization code. >> >> Also if the schema map is shared, have you thought of using a single >> static instance of Guava's LoadingCache shared amongst all the DoFn >> instances? >> >> You can also refresh the data stored within the cache periodically. >> >> On Wed, Jun 14, 2017 at 10:39 PM, Kevin Peterson <[email protected]> >> wrote: >> >>> Still gets stuck at the same place :/ >>> >>> On Wed, Jun 14, 2017 at 9:45 PM, Tang Jijun(上海_中台研发部_数据平台部_基础数据部_唐觊隽) < >>> [email protected]> wrote: >>> >>>> >>>> >>>> .triggering( >>>> >>>> AfterProcessingTime.*pastFirstElementInPane*().plusDelayOf(Duration.*standardSeconds*(1))) >>>> .discardingFiredPanes().withAllowedLateness(Duration.*ZERO*)); >>>> >>>> >>>> >>>> Try the trigger above >>>> >>>> >>>> >>>> *发件人:* Kevin Peterson [mailto:[email protected]] >>>> *发送时间:* 2017年6月15日 2:39 >>>> *收件人:* [email protected] >>>> *主题:* Fwd: Creating side input map with global window >>>> >>>> >>>> >>>> Hi all, >>>> >>>> >>>> >>>> I am working on a (streaming) pipeline which reads elements from >>>> Pubsub, and schemas for those elements from a separate pubsub topic. I'd >>>> like to be able to create a side input map from the schema topic, and have >>>> that available to the main pipeline for parsing. Each message on the schema >>>> pubsub topic contains all schemas I care about, so for every new message, I >>>> want to generate a new map that will be available to the main pipeline >>>> (eventual consistency is fine). I don't have any windows or triggers on the >>>> main flow, since I really just want each element to be processed as it >>>> arrives, using whatever the latest schema available is. >>>> >>>> >>>> >>>> I am currently trying this with: >>>> >>>> >>>> >>>> PCollection<KV<String, String>> schema = pipeline >>>> .apply("Read Schema", >>>> PubsubIO.*readStrings*().fromTopic("topic_for_schema")) >>>> .apply(Window.<String>*into*(new GlobalWindows()).triggering( >>>> >>>> Repeatedly.*forever*(AfterPane.*elementCountAtLeast*(1))).discardingFiredPanes()) >>>> .apply("Create Schema", ParDo.*of*(new >>>> SchemaDirectory.GenerateSchema())); // outputs around 100 elements for >>>> each input >>>> >>>> >>>> >>>> PCollectionView<Map<String, String>> schemaView = >>>> schema.apply(View.<String, String>*asMap*()); >>>> >>>> pipeline >>>> .apply("Read Elements", >>>> PubsubIO*.readStrings*().fromTopic("topic_for_elements")).apply("Parse >>>> Elements", >>>> >>>> ParDo.*of*(new DoFn<String, TableRow>() { >>>> @ProcessElement >>>> public void processElement(ProcessContext c) { >>>> >>>> String name = getNameFromElement(c.element()); >>>> >>>> >>>> String schema = c.sideInput(schemaView).get(name); >>>> >>>> >>>> c.output(parse(c, schema)); >>>> >>>> } >>>> }).withSideInputs(schemaView)).apply("Write to Table", >>>> BigQueryIO.*writeTableRows*()) // Other BQ options not copied. >>>> >>>> When running this pipeline, the View.AsMap/View.CreatePCol >>>> lectionView/Combine.globally(Concatenate)/Combine.perKey(Concatenate)/GroupByKey >>>> stage never emits any elements, and so the pipeline never progresses. >>>> I can see the messages at the input stage, but nothing appears on the >>>> output. >>>> >>>> >>>> >>>> Any advice? >>>> >>>> >>>> >>>> Thanks, >>>> >>>> -Kevin >>>> >>>> >>>> >>> >>> >> >
