We are using Spark structured streaming to make the join association between two data streams. Use Kafka to collect data in the earliest way (the sender sends data cyclically, sending only one data message at a time).
The following are our kafka configuration parameters: <code> def setKafkaConsumerParam(topic: String, servers: String = Config.data.parseString(Constants.KAFKA_BROKER_LIST)): Map[String, String] = { Map[String, String]( "kafka.bootstrap.servers" -> servers, "startingOffsets" -> "earliest", // "failOnDataLoss" -> "false", "subscribe" -> topic ) } </code> The following is our kafka message read: <code> def onMessage(serverName: String, traceId: String, tableStruct: TableStruct): TableDataFrame = { LOG.info("onMessage,DIB") val topic = "test" val message = spark.readStream.format(Constants.KAFKA).options(ParamUtil.setKafkaConsumerParam(topic)).load() // value like this: {"id": "1", "name": "DJ"} val dataFrame = message.selectExpr("CAST(value AS STRING)", "CAST(timestamp as Long)").as[(String, Long)] .select(from_json($"value", SparkUtil.createSchema(tableStruct.structModels:_*)).as(tableStruct.tableName), $"timestamp".as(tableStruct.tableName + ":timestamp")) TableDataFrame(tableStruct.tableName, dataFrame) } </code> Join process: <code> joinDataFrame = joinDataFrame1.join(joinDataFrame2, expr("customer.c_custkey") === expr("orders.o_orderkey")) </code> However, the result is wrong. The correct join result for 1500 and 15000 data should be 15000. But we only got 375 pieces of data. -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org