I am reading data from Kafka using structured streaming and I need to save
the data to InfluxDB. In the regular Dstreams based approach I did this as
follows:

      val messages:DStream[(String, String)] =  kafkaStream.map(record =>
(record.topic, record.value))
      messages.foreachRDD { rdd =>
        rdd.foreachPartition { partitionOfRecords =>
          val influxService = new InfluxService()
          val connection = influxService.createInfluxDBConnectionWithParams(
              host,
              port,
              username,
              password,
              database
              )
          partitionOfRecords.foreach(record => {
            ABCService.handleData(connection, record._1, record._2)
          }
          )
        }
      }
      ssc.start()
      logger.info("Started Spark-Kafka streaming session")
      ssc.awaitTermination()

Note: I create connection object inside foreachpartition. How do I do this
in Structured Streaming ? I tried connection pooling approach (where I
create a pool of connections on the master node and pass it to worker nodes
)  here
<https://stackoverflow.com/questions/50205650/spark-connection-pooling-is-this-the-right-approach>
  
and the workers could not get the connection pool object. Anything obvious
that I am missing here ?



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to