@Jeff Evans
@Sean Owen
Both of these postings are examples of same object orientated concept.
They are examples of extraction of child Object from Parent Object.
The difference is that when a Muslim asked he was told by Jeff Evans
"we are not here handhold you."
“do a simple Google search”
“They're not being paid to handhold you and quickly answer to your every whim.”
COMPARATIVELY
BUT when the good Dr Mich Talebzadeh asked same. No humiliation or offensive
comments.
No comments at all.
Hi,
Thank you all,
I am just thinking of passing that date 06/04/2020 12:03:43 and getting the
correct format from the module. In effect
This date format yyyy-MM-dd'T'HH:mm:ss.SSSSSSSSSZ as pattern
in other words rather than new Date() pass "06/04/2020 12:03:43" as string
REgards,
Dr Mich Talebzadeh
val fixedStr = "2020-06-04T12:03:43";
val dt = new DateTime(fixedStr);
val jdkDate = dt.toDate();
val pattern3 = "dd yyyy MM HH:mm:ss.SSSSSSSSSZ";
val simpleDateFormat3 = (new SimpleDateFormat(pattern2, new Locale("en",
"UK")));
val date3 = simpleDateFormat3.format(jdkDate);
System.out.println(date3);
On Sat, 28 Mar 2020, 15:50 Jeff Evans, <[hidden email]> wrote:
Dude, you really need to chill. Have you ever worked with a large open source
project before? It seems not. Even so, insinuating there are tons of bugs that
were left uncovered until you came along (despite the fact that the project is
used by millions across many different organizations) is ludicrous. Learn a
little bit of humility
If you're new to something, assume you have made a mistake rather than that
there is a bug. Lurk a bit more, or even do a simple Google search, and you
will realize Sean is a very senior committer (i.e. expert) in Spark, and has
been for many years. He, and everyone else participating in these lists, is
doing it voluntarily on their own time. They're not being paid to handhold you
and quickly answer to your every whim.
As you can see from the code :
STEP 1: I create a object of type static frame which holds all the information
to the datasource (csv files).
STEP 2: Then I create a variable called staticSchema assigning the information
of the schema from the original static data frame.
STEP 3: then I create another variable called val streamingDataFrame of type
spark.readStream.
and Into the .schema function parameters I pass the object staticSchema which
is meant to hold the information to the csv files including the .load(path)
function etc.
So then when I am creating val StreamingDataFrame and passing it
.schema(staticSchema)
the variable StreamingDataFrame should have all the information.
I should only have to call .option("maxFilePerTrigger",1) and not .format
("csv") .option("header","true").load("/data/retail-data/by-day/*.csv")
Otherwise what is the point of passing .schema(staticSchema) to
StreamingDataFrame.
You can replicate it using the complete code below.
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{window,column,desc,col}
object RetailData {
def main(args: Array[String]): Unit = {
// create spark session
val spark =
SparkSession.builder().master("spark://[192.168.0.38:7077](http://192.168.0.38:7077/)").appName("Retail
Data").getOrCreate();
// set spark runtime configuration
spark.conf.set("spark.sql.shuffle.partitions","5")
spark.conf.set("spark.sql.streaming.forceDeleteTempCheckpointLocation","True")
// create a static frame
val staticDataFrame = spark.read.format("csv")
.option ("header","true")
.option("inferschema","true")
.load("/data/retail-data/by-day/*.csv")
staticDataFrame.createOrReplaceTempView("retail_data")
val staticSchema = staticDataFrame.schema
staticDataFrame
.selectExpr(
"CustomerId","UnitPrice * Quantity as total_cost", "InvoiceDate")
.groupBy(col("CustomerId"),
window(col("InvoiceDate"),
"1 day"))
.sum("total_cost")
.sort(desc("sum(total_cost)"))
.show(2)
val streamingDataFrame = spark.readStream
.schema(staticSchema)
.format("csv")
.option("maxFilesPerTrigger", 1)
.option("header","true")
.load("/data/retail-data/by-day/*.csv")
println(streamingDataFrame.isStreaming)
// lazy operation so we will need to call a streaming action to start the action
val purchaseByCustomerPerHour = streamingDataFrame
.selectExpr(
"CustomerId",
"(UnitPrice * Quantity) as total_cost",
"InvoiceDate")
.groupBy(
col("CustomerId"), window(col("InvoiceDate"), "1 day"))
.sum("total_cost")
// stream action to write to console
purchaseByCustomerPerHour.writeStream
.format("console")
.queryName("customer_purchases")
.outputMode("complete")
.start()
} // main
} // object
Sent with [ProtonMail](https://protonmail.com) Secure Email.