To unit test your function, have it accept a supplier with the default
supplier being the one that gives you a reference to the static instance
and another supplier for testing purposes.

On Fri, Jun 23, 2017 at 8:23 AM, Kevin Peterson <[email protected]> wrote:

> Hey Lukasz,
>
> I tried using the setup function, but since this a streaming pipeline, the
> batches tend to be pretty small. I could force the pipeline to batch things
> up, but that feels like something that shouldn't be needed. I was already
> caching between elements within a thread, the problem was at pipeline
> start, or when a new instance was started, since each thread has its own
> cache.
>
> Using a static cache worked!
>
> private static final LoadingCache<BlobId, TypesCache> CACHE =
>         CacheBuilder.newBuilder()
>                     .refreshAfterWrite(30, TimeUnit.MINUTES)
>                     .build(new CacheLoader());
>
> This has gotten me unblocked, but isn't a perfect solution. Because the
> cache is static, I can't set any parameters of it, meaning that it is very
> hard to unit test because it is hard coded to access cloud storage instead
> of a local file.
>
> I tried using a Singleton to hold the cache and fetch it from the DoFn,
> but it seems like the Singleton isn't shared amongst all of the threads. I
> can see from the logs that all of the DoFn calls are on the same worker
> instance and different threads, I see a log statement from inside my
> synchronized block for each thread, which shouldn't be possible.
>
> Thoughts?
>
>
> On Thu, Jun 15, 2017 at 6:26 AM, Lukasz Cwik <[email protected]> wrote:
>
>> Take a look at DoFn setup/teardown, called only once per DoFn instance
>> and not per element so it makes easier to write initialization code.
>>
>> Also if the schema map is shared, have you thought of using a single
>> static instance of Guava's LoadingCache shared amongst all the DoFn
>> instances?
>>
>> You can also refresh the data stored within the cache periodically.
>>
>> On Wed, Jun 14, 2017 at 10:39 PM, Kevin Peterson <[email protected]>
>> wrote:
>>
>>> Still gets stuck at the same place :/
>>>
>>> On Wed, Jun 14, 2017 at 9:45 PM, Tang Jijun(上海_中台研发部_数据平台部_基础数据部_唐觊隽) <
>>> [email protected]> wrote:
>>>
>>>>
>>>>
>>>> .triggering(
>>>>         
>>>> AfterProcessingTime.*pastFirstElementInPane*().plusDelayOf(Duration.*standardSeconds*(1)))
>>>>         .discardingFiredPanes().withAllowedLateness(Duration.*ZERO*));
>>>>
>>>>
>>>>
>>>> Try the trigger above
>>>>
>>>>
>>>>
>>>> *发件人:* Kevin Peterson [mailto:[email protected]]
>>>> *发送时间:* 2017年6月15日 2:39
>>>> *收件人:* [email protected]
>>>> *主题:* Fwd: Creating side input map with global window
>>>>
>>>>
>>>>
>>>> Hi all,
>>>>
>>>>
>>>>
>>>> I am working on a (streaming) pipeline which reads elements from
>>>> Pubsub, and schemas for those elements from a separate pubsub topic. I'd
>>>> like to be able to create a side input map from the schema topic, and have
>>>> that available to the main pipeline for parsing. Each message on the schema
>>>> pubsub topic contains all schemas I care about, so for every new message, I
>>>> want to generate a new map that will be available to the main pipeline
>>>> (eventual consistency is fine). I don't have any windows or triggers on the
>>>> main flow, since I really just want each element to be processed as it
>>>> arrives, using whatever the latest schema available is.
>>>>
>>>>
>>>>
>>>> I am currently trying this with:
>>>>
>>>>
>>>>
>>>> PCollection<KV<String, String>> schema = pipeline
>>>>         .apply("Read Schema",
>>>>                 PubsubIO.*readStrings*().fromTopic("topic_for_schema"))
>>>>         .apply(Window.<String>*into*(new GlobalWindows()).triggering(
>>>>                 
>>>> Repeatedly.*forever*(AfterPane.*elementCountAtLeast*(1))).discardingFiredPanes())
>>>>         .apply("Create Schema", ParDo.*of*(new 
>>>> SchemaDirectory.GenerateSchema()));  // outputs around 100 elements for 
>>>> each input
>>>>
>>>>
>>>>
>>>> PCollectionView<Map<String, String>> schemaView =
>>>>         schema.apply(View.<String, String>*asMap*());
>>>>
>>>> pipeline
>>>>         .apply("Read Elements", 
>>>> PubsubIO*.readStrings*().fromTopic("topic_for_elements")).apply("Parse 
>>>> Elements",
>>>>
>>>>         ParDo.*of*(new DoFn<String, TableRow>() {
>>>>             @ProcessElement
>>>>             public void processElement(ProcessContext c) {
>>>>
>>>>                 String name = getNameFromElement(c.element());
>>>>
>>>>
>>>>                 String schema = c.sideInput(schemaView).get(name);
>>>>
>>>>
>>>>                 c.output(parse(c, schema));
>>>>
>>>>             }
>>>>         }).withSideInputs(schemaView)).apply("Write to Table", 
>>>> BigQueryIO.*writeTableRows*()) // Other BQ options not copied.
>>>>
>>>> When running this pipeline, the View.AsMap/View.CreatePCol
>>>> lectionView/Combine.globally(Concatenate)/Combine.perKey(Concatenate)/GroupByKey
>>>> stage never emits any elements, and so the pipeline never progresses.
>>>> I can see the messages at the input stage, but nothing appears on the
>>>> output.
>>>>
>>>>
>>>>
>>>> Any advice?
>>>>
>>>>
>>>>
>>>> Thanks,
>>>>
>>>> -Kevin
>>>>
>>>>
>>>>
>>>
>>>
>>
>
  • Fwd: Crea... Kevin Peterson
    • Re: ... Eugene Kirpichov
      • ... Kevin Peterson
    • 答复: ... 上海_中台研发部_数据平台部_基础数据部_唐觊隽
      • ... Kevin Peterson
        • ... Lukasz Cwik
          • ... Kevin Peterson
            • ... Lukasz Cwik

Reply via email to