Do you really need to create a DStream from the original messaging queue?
Can't you just read them in a while loop or something on the driver?
On Fri, Jun 16, 2017 at 1:01 PM, Mike Hugo wrote:
> Hello,
>
> I have a web application that publishes JSON messages on to a messaging
> queue that contain metadata and a link to a CSV document on S3. I'd like
> to iterate over these JSON messages, and for each one pull the CSV document
> into spark SQL to transform it (based on the metadata in the JSON message)
> and output the results to a search index. Each file on S3 has different
> headers, potentially different delimiters, and differing numbers of rows.
>
> Basically what I'm trying to do is something like this:
>
> JavaDStream parsedMetadataAndRows =
> queueStream.map(new Function() {
> @Override
> ParsedDocument call(String metadata) throws Exception {
> Map gson = new Gson().fromJson(metadata, Map.class)
>
> // get metadata from gson
> String s3Url = gson.url
> String delimiter = gson.delimiter
> // etc...
>
> // read s3Url
> Dataset dataFrame = sqlContext.read()
> .format("com.databricks.spark.csv")
> .option("delimiter", delimiter)
> .option("header", true)
> .option("inferSchema", true)
> .load(url)
>
> // process document,
> ParsedDocument docPlusRows = //...
>
> return docPlusRows
> })
>
> JavaEsSparkStreaming.saveToEs(parsedMetadataAndRows,
> "index/type" // ...
>
>
> But it appears I cannot get access to the sqlContext when I run this on
> the spark cluster because that code is executing in the executor not in the
> driver.
>
> Is there a way I can access or create a SqlContext to be able to pull the
> file down from S3 in my map function? Or do you have any recommendations
> as to how I could set up a streaming job in a different way that would
> allow me to accept metadata on the stream of records coming in and pull
> each file down from s3 for processing?
>
> Thanks in advance for your help!
>
> Mike
>