Hi Alexey,

Thank you very much for your email.
Here is sample code which I used to create views and this view pass as side
input for pipeline. Wondering same piece of code will work using
FlinkRunner but in SparkRunner it is failed.

private static PCollectionView<Map<Map<String, String>, List<String>>>
      getCompositeRuleExpressions(
          int refreshInSeconds,
          String apiUrl,
          Pipeline p,
          String appMappingUrl)
          throws Exception {

    PCollection<Long> counter =
        p.apply(GenerateSequence.from(0).withRate(1,
Duration.standardSeconds(1L)))
            .apply("Apply window",
Window.<Long>into(FixedWindows.of(Duration.standardSeconds(2))))

.apply(Combine.globally(Count.<Long>combineFn()).withoutDefaults());

    return counter
        .apply(
            ParDo.of(
                new DoFn<Long, KV<Map<String, String>, List<String>>>() {
                  @ProcessElement
                  public void process(
                      @Element Long input,
                      OutputReceiver<KV<Map<String, String>, List<String>>>
o) {
                    Map<String, String> rules = new HashMap<>();
                    List<String> listOfSources = new ArrayList<>();
                    try {
                      rules = getListOfcompositeUrl(apiUrl);
                      listOfSources = Utils.getValidSources(appMappingUrl);
                    } catch (Exception e) {
                      LOG.error("Exception occured", e);
                    }
                    o.output(KV.of(rules, listOfSources));
                  }
                }))
        .apply(View.asMap());
  }

PCollectionView<Map<Map<String, String>, List<String>>> ruleList =

        getCompositeRuleExpressions(

            compositeSegmentRefreshInSeconds, apiUrl, p, appMappingUrl);

    record

        .apply("Apply window",
Window.<String>into(FixedWindows.of(Duration.standardSeconds(1))))

        .apply(

            "Process rules for user",

            ParDo.of(new ProcessRulesFn(ruleList, type, index, url))

                .withSideInputs(ruleList))


Exception Message:

19/12/10 22:23:03 INFO SparkRunner$Evaluator: Evaluating
Combine.GroupedValues
19/12/10 22:23:03 INFO SparkRunner$Evaluator: Evaluating
org.apache.beam.sdk.transforms.MapElements$1@79926285
19/12/10 22:23:03 INFO SparkRunner$Evaluator: Evaluating
org.apache.beam.sdk.transforms.View$VoidKeyToMultimapMaterialization$VoidKeyToMultimapMaterializationDoFn@66b5c35c
[WARNING]
java.lang.IllegalStateException: No TransformEvaluator registered for
UNBOUNDED transform View.CreatePCollectionView
    at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState
(Preconditions.java:588)
    at
org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator$Translator.translateUnbounded
(StreamingTransformTranslator.java:552)
    at org.apache.beam.runners.spark.SparkRunner$Evaluator.translate
(SparkRunner.java:456)
    at org.apache.beam.runners.spark.SparkRunner$Evaluator.doVisitTransform
(SparkRunner.java:426)
    at
org.apache.beam.runners.spark.SparkRunner$Evaluator.visitPrimitiveTransform
(SparkRunner.java:419)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit
(TransformHierarchy.java:665)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit
(TransformHierarchy.java:657)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit
(TransformHierarchy.java:657)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600
(TransformHierarchy.java:317)
    at org.apache.beam.sdk.runners.TransformHierarchy.visit
(TransformHierarchy.java:251)
    at org.apache.beam.sdk.Pipeline.traverseTopologically
(Pipeline.java:460)
    at
org.apache.beam.runners.spark.translation.streaming.SparkRunnerStreamingContextFactory.call
(SparkRunnerStreamingContextFactory.java:88)
    at
org.apache.beam.runners.spark.translation.streaming.SparkRunnerStreamingContextFactory.call
(SparkRunnerStreamingContextFactory.java:46)
    at
org.apache.spark.streaming.api.java.JavaStreamingContext$$anonfun$7.apply
(JavaStreamingContext.scala:627)


Regards,

Jitendra

On Tue, Dec 10, 2019 at 11:07 PM Alexey Romanenko <[email protected]>
wrote:

> Hi Jitendra,
>
> Could you give more details about your pipeline? Is it possible to share a
> code of this pipeline?
>
> > On 5 Dec 2019, at 17:50, jitendra sharma <[email protected]>
> wrote:
> >
> > Hi,
> >
> > I am running beam job using Spark Runner and getting below error:
> >
> > 19/12/05 22:14:36 WARN UnboundedDataset: Provided StorageLevel:
> MEMORY_ONLY is ignored for streams, using the default level:
> StorageLevel(memory, 1 replicas)
> > [WARNING]
> > java.lang.IllegalStateException: No TransformEvaluator registered for
> UNBOUNDED transform View.CreatePCollectionView
> >     at
> org.apache.beam.repackaged.beam_runners_spark.com.google.common.base.Preconditions.checkState
> (Preconditions.java:518)
> >     at
> org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator$Translator.translateUnbounded
> (StreamingTransformTranslator.java:553)
> >     at org.apache.beam.runners.spark.SparkRunner$Evaluator.translate
> (SparkRunner.java:464)
> >
> > is any idea or help appreciated?
> >
> >
> > Regards,
> > Jitendra Sharma
>
>

-- 
Jitendra Sharma

Reply via email to