Hi all,

I'm seeing an odd behavior where if I switch the context from regular to
checkpointed, the system properties are no longer automatically carried
over into the worker / executors and turn out to be null there.

This is in Java, using spark-streaming_2.10, version 1.5.0.

I'm placing properties into a Properties object and pass it over into the
worker logic.  I would think I shouldn't have to do this and it works if I
just set properties into the regular context, they get automatically set
and carried over to the worker side in that case.

Is this something fixed or changed in the later versions of Spark?

This is what I ended up doing the following in the driver program:

  private JavaStreamingContext createCheckpointedContext(SparkConf sparkConf,
Parameters params) {

    JavaStreamingContextFactory factory = new JavaStreamingContextFactory()
{

      @Override

      public JavaStreamingContext create() {

        return createContext(sparkConf, params);

      }

    };

    return JavaStreamingContext.getOrCreate(params.getCheckpointDir(),
factory);

  }


  private JavaStreamingContext createContext(SparkConf sparkConf,
Parameters params) {

    // Create context with the specified batch interval, in milliseconds.

    JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
Durations.milliseconds(params.getBatchDurationMillis()));

    // Set the checkpoint directory, if we're checkpointing

    if (params.isCheckpointed()) {

      jssc.checkpoint(params.getCheckpointDir());

    }

    Properties props = new Properties();

  JavaConverters.seqAsJavaListConverter(sparkConf
.getExecutorEnv()).asJava().stream().map(x -> props.setProperty(x._1, x._2
));


.............

  // ... Create Direct Stream from Kafka...

    messageBodies.foreachRDD(new Function<JavaRDD<String>, Void>() {

      @Override

      public Void call(JavaRDD<String> rdd) throws Exception {

        ProcessPartitionFunction func = new ProcessPartitionFunction(

          props, // <-- Had to pass that through, so this works in a
checkpointed scenario

          params.getAppName(),

          params.getTopic(),

          ......

        rdd.foreachPartition(func);

        return null;

      }

    });

Would appreciate any recommendations/clues,
Thanks,
- Dmitry

Reply via email to