Grouping and counting occurences of specific column rows

2022-04-22 Thread marc nicole
Hi all,
Sorry for posting this twice,

I need to know how to group by several column attributes (e.g.,List groupByAttributes) a dataset (dataset) and then count the occurrences of
associated grouped rows, how do i achieve that ?
I tried through the following code:

> Dataset groupedRows = dataset.withColumn("freqs",
> count("*").over(Window.partitionBy(groupByAttributes.toArray(new
> Column[groupByAttributes.size()];

Would that do it?
To note:  I want the "grouped" rows to be separate for the subsequent
transformations so a groupBY is not adequate in this case.


Streaming write to orc problem

2022-04-22 Thread hsy...@gmail.com
Hello all,

I’m just trying to build a pipeline reading data from a streaming source
and write to orc file. But I don’t see any file that is written to the file
system nor any exceptions

Here is an example

val df = spark.readStream.format(“...")
  .option(
“Topic",
"Some topic"
  )
  .load()
val q = df.writeStream.format("orc").option("path", "gs://testdata/raw")
  .option("checkpointLocation",
"gs://testdata/raw_chk").trigger(Trigger.ProcessingTime(5,
TimeUnit.SECONDS)).start
q.awaitTermination(120)
q.stop()


I couldn’t find any file until 1200 seconds are over
Does it mean all the data is cached in memory. If I keep the pipeline
running I see no file would be flushed in the file system.

How do I control how often spark streaming write to disk?

Thanks!


[Spark Core]: Unexpectedly exiting executor while gracefully decommissioning

2022-04-22 Thread Yeachan Park
Hello all, we are running into some issues while attempting graceful
decommissioning of executors. We are running spark-thriftserver (3.2.0) on
Kubernetes (GKE 1.20.15-gke.2500). We enabled:

   - spark.decommission.enabled
   - spark.storage.decommission.rddBlocks.enabled
   - spark.storage.decommission.shuffleBlocks.enabled
   - spark.storage.decommission.enabled

and set spark.storage.decommission.fallbackStorage.path to a path in our
bucket.

The logs from the driver seems to suggest the decommissioning process
started but then unexpectedly exited and failed while the executor logs
seem to suggest that decommissioning was successful.

Attached are the error logs:

https://gist.github.com/yeachan153/9bfb2f0ab9ac7f292fb626186b014bbf


Thanks in advance.