Hi All,

I have the following classes. And *I have all my questions in bold below*
but I will also add more context here. I just want to fire and forget a
streaming Job and
my streaming job just reads from kafka, applies some transformation and
writes back to kafka but for the sake of testing I am am writing the output
to console in the below code.

I want my streaming job to run forever but the problem is when I submit the
jar using livyClient and do .get() it will be stuck there forever. The goal
is to launch
as many streaming jobs as possible that in theory should run forever.

public class StreamingJob implements Job<Void> {

  public StreamingJob() {}

  @Override
  public Void call(JobContext ctx) throws Exception {
    Config config = ConfigFactory.load();
    SparkSession sparkSession = ctx.sparkSession();
    ctx.sc().getConf().setAppName("MyJob"); // *This app name is not
getting set when I go http://localhost:8998/sessions
<http://localhost:8998/sessions>*



*                                           // I can see my query but
Appid is always set to null    System.out.println("READING STREAM");*

    Dataset<Row> df = sparkSession.readStream()
        .format("kafka")
        .option("kafka.bootstrap.servers",
config.getString("kafka.consumer.settings.bootstrapServers"))
        .load();

    df.printSchema(); // *Where does these print statements go ?*


    Dataset<Row> resultSet = df.map(// some code);
    StreamingQuery streamingQuery = resultSet.writeStream()
        .format("console")
        .start();
    streamingQuery.awaitTermination(); // *This thing blocks forever
and I don't want to set a timeout. *


*                                      // so what should I do to fire
and forget a streaming job ?     return null;*
  }
}



public class FireStreamingJob {
  private Config config = ConfigFactory.load();
  private LivyClient livy;

  public JobEngine() throws IOException, URISyntaxException {
    StringBuilder sb = new StringBuilder("http://";);
    sb.append(this.config.getString("livy.host"));
    sb.append(":");
    sb.append(this.config.getString("livy.port"));
    String livyUrl = sb.toString();
    this.livy = new LivyClientBuilder()
        .setURI(new URI(livyUrl))
        .build();
  }

  public static void main(String[] args) throws ExecutionException,
InterruptedException, IOException, URISyntaxException {

     JobEngine engine = new JobEngine();

     for(String jarpath: args) {

     livy.uploadJar(new File(jarPath)).get();

     try {
        livy.submit(new StreamingJob()).get(); // *This will block forever*

        System.out.println("SUBMITTED JAR!");  // *The control will
never get here so I can't submit another job.*
     } finally {
        livy.stop(true);
     }

    }

  }
}

Reply via email to