Hi All, I have the following classes. And *I have all my questions in bold below* but I will also add more context here. I just want to fire and forget a streaming Job and my streaming job just reads from kafka, applies some transformation and writes back to kafka but for the sake of testing I am am writing the output to console in the below code.
I want my streaming job to run forever but the problem is when I submit the jar using livyClient and do .get() it will be stuck there forever. The goal is to launch as many streaming jobs as possible that in theory should run forever. public class StreamingJob implements Job<Void> { public StreamingJob() {} @Override public Void call(JobContext ctx) throws Exception { Config config = ConfigFactory.load(); SparkSession sparkSession = ctx.sparkSession(); ctx.sc().getConf().setAppName("MyJob"); // *This app name is not getting set when I go http://localhost:8998/sessions <http://localhost:8998/sessions>* * // I can see my query but Appid is always set to null System.out.println("READING STREAM");* Dataset<Row> df = sparkSession.readStream() .format("kafka") .option("kafka.bootstrap.servers", config.getString("kafka.consumer.settings.bootstrapServers")) .load(); df.printSchema(); // *Where does these print statements go ?* Dataset<Row> resultSet = df.map(// some code); StreamingQuery streamingQuery = resultSet.writeStream() .format("console") .start(); streamingQuery.awaitTermination(); // *This thing blocks forever and I don't want to set a timeout. * * // so what should I do to fire and forget a streaming job ? return null;* } } public class FireStreamingJob { private Config config = ConfigFactory.load(); private LivyClient livy; public JobEngine() throws IOException, URISyntaxException { StringBuilder sb = new StringBuilder("http://"); sb.append(this.config.getString("livy.host")); sb.append(":"); sb.append(this.config.getString("livy.port")); String livyUrl = sb.toString(); this.livy = new LivyClientBuilder() .setURI(new URI(livyUrl)) .build(); } public static void main(String[] args) throws ExecutionException, InterruptedException, IOException, URISyntaxException { JobEngine engine = new JobEngine(); for(String jarpath: args) { livy.uploadJar(new File(jarPath)).get(); try { livy.submit(new StreamingJob()).get(); // *This will block forever* System.out.println("SUBMITTED JAR!"); // *The control will never get here so I can't submit another job.* } finally { livy.stop(true); } } } }