Writing streams into some sink (preferably fault-tolerant, exactly once
sink, see docs) and then joining is definitely a possible way. But you will
likely incur higher latency. If you want lower latency, then stream-stream
joins is the best approach, which we are working on right now. Spark 2.3 is
likely to have stream-stream joins (no release date). For now, the best way
would be to use mapGroupsWithState (spark 2.2, scala/java). The rough idea
of how to implement inner join is as follows.

case class Type1(...)    // fields in first streamcase class
Type2(...)    // fields in second streamcase class CombinedType(first:
Type1, second: Type2)       // a combined type that can hold data from
both streams
val streamingDataset1 = streamingDF1.as[Type1].map { first =>
CombinedType(first, null) }            // first stream as common typed
datasetval streamingDataset2 = streamingDF2.as[Type2].map { second =>
CombinedType(null, second) }       // second stream as common typed
val combinedDataset = streamingDataset1.union(streamingDataset2)
  .groupByKey { x => getKey(x) }      // group by common id
  .flatMapGroupsWithState {  case (key, values, state) =>
      // update state for the key using the values, and possible
output an object

On Wed, Aug 9, 2017 at 12:05 AM, Priyank Shrivastava <priy...@asperasoft.com
> wrote:

> I have streams of data coming in from various applications through Kafka.
> These streams are converted into dataframes in Spark.  I would like to join
> these dataframes on a common ID they all contain.
> Since  joining streaming dataframes is currently not supported, what is
> the current recommended way to join two dataFrames  in a streaming context.
> Is it recommended to keep writing the streaming dataframes into some sink
> to convert them into static dataframes which can then be joined?  Would
> this guarantee end-to-end exactly once and fault tolerant guarantees?
> Priyank

Reply via email to