Structured Streaming With Kafka - processing each event

2021-02-24 Thread Sachit Murarka
Hello Users,

I am using Spark 3.0.1 Structuring streaming with Pyspark.

My use case::
I get so many records in kafka(essentially some metadata with the location
of actual data). I have to take that metadata from kafka and apply some
processing.
Processing includes : Reading the actual data location from metadata and
fetching the actual data and applying some operation on actual data.

What I have tried::

def process_events(event):
fetch_actual_data()
#many more steps

def fetch_actual_data():
#applying operation on actual data

df = spark.readStream.format("kafka") \
.option("kafka.bootstrap.servers", KAFKA_URL) \
.option("subscribe", KAFKA_TOPICS) \
.option("startingOffsets",
START_OFFSET).load() .selectExpr("CAST(value AS STRING)")


query = df.writeStream.foreach(process_events).option("checkpointLocation",
"/opt/checkpoint").trigger(processingTime="30 seconds").start()


My Queries:

1. Will this foreach run across different executor processes? Generally in
spark , foreach means it runs on a single executor.

2. I receive too many records in kafka and above code will run multiple
times for each single message. If I change it for foreachbatch, will it
optimize it?


Kind Regards,
Sachit Murarka


Re: Spark on the cloud deployments

2021-02-24 Thread Mich Talebzadeh
Hi Stephane,


If you are currently using on-premisses then you should also consider
Google Cloud platform (GCP). As a practitioner I see a number of customers
migrating from others to GCP.


Databricks on GCP will be available (if I am correct) in April this year. GCP
already offers Google Compute Engines as IaaS which support Spark with
Yarn. In addition, you have other cost saving  'preemptible instances' that
can run Spark on affordable tin boxes so to speak. GCP also offers BigQuery
as a Data Warehouse (DW) with ML models built in. So there is a fair bit of
'either or choice' here. There is also the question of the migration path
from GCP artifacts to Databricks. Will Databricks provide all these as a
service? For example, BigQuery is a fully managed serverless warehouse.
Will Lakehouse provide the same in GCP etc? BigQuery besides ML provides
Oracle's PL/SQL type functions and procedures so some are migrating from
Oracle classic on premises to BigQuery


However, neither BigQuery nor compute engines are cheap. Personally I
believe the landscape on Cloud is getting congested and unless there is a
clear motivation to move from one to another, many will choose to stay
where they are. if you are already using Spark on a private Cloud, then the
journey to GCP should be pretty smooth. As ever, your mileage will vary.
You may also decide to go for a multi-cloud mixture with the best of breed.


HTH,


Mich

LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*





*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Wed, 24 Feb 2021 at 15:25, Stephane Verlet  wrote:

> Hello,
>
> We have been using Spark on a on-premise cluster for several years and
> looking at moving to a cloud deployment.
>
> I was wondering what is your current favorite cloud setup.  Just simple
> AWR / Azure, or something on top like Databricks ?
>
> This would support a on demand report application so usage would be
> sporadic with spikes during the day. Current deployment is Spark with
> Hive data.
>
> Thanks for sharing
>
> Stephane
>
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Unsubscribe

2021-02-24 Thread Roland Johann
unsubscribe--
Roland Johann
Data Architect/Data Engineer

phenetic GmbH
Lütticher Straße 10, 50674 Köln, Germany

Mobil: +49 172 365 26 46
Mail: roland.joh...@phenetic.io
Web: phenetic.io

Handelsregister: Amtsgericht Köln (HRB 92595)
Geschäftsführer: Roland Johann, Uwe Reimann


Re: How to control count / size of output files for

2021-02-24 Thread Attila Zsolt Piros
hi!

It is because of "spark.sql.shuffle.partitions". See the value 200 in the
physical plan at the rangepartitioning:


scala> val df = sc.parallelize(1 to 1000, 10).toDF("v").sort("v")
df: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [v: int]

scala> df.explain()
== Physical Plan ==
*(2) Sort [v#300 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(v#300 ASC NULLS FIRST, 200), true, [id=#334]
   +- *(1) Project [value#297 AS v#300]
  +- *(1) SerializeFromObject [input[0, int, false] AS value#297]
 +- Scan[obj#296]

scala> df.rdd.getNumPartitions
res13: Int = 200

Best Regards,
Attila







--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Structured streaming, Writing Kafka topic to BigQuery table, throws error

2021-02-24 Thread Mich Talebzadeh
Thanks Jungtaek.

I am stuck on how to add rows to BigQuery. Spark API in PySpark does it
fine. However, we are talking about structured streaming with PySpark.

This is my code that reads and display data on the console fine

class MDStreaming:
def __init__(self, spark_session,spark_context):
self.spark = spark_session
self.sc = spark_context
self.config = config

def startStreaming(self):
self.sc.setLogLevel("ERROR")
#{"rowkey":"c9289c6e-77f5-4a65-9dfb-d6b675d67cff","ticker":"MSFT",
"timeissued":"2021-02-23T08:42:23", "price":31.12}
schema = StructType().add("rowkey", StringType()).add("ticker",
StringType()).add("timeissued", TimestampType()).add("price", FloatType())
try:
# construct a streaming dataframe streamingDataFrame that
subscribes to topic config['MDVariables']['topic']) -> md (market data)
streamingDataFrame = self.spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers",
config['MDVariables']['bootstrapServers'],) \
.option("schema.registry.url",
config['MDVariables']['schemaRegistryURL']) \
.option("group.id", config['common']['appName']) \
.option("zookeeper.connection.timeout.ms",
config['MDVariables']['zookeeperConnectionTimeoutMs']) \
.option("rebalance.backoff.ms",
config['MDVariables']['rebalanceBackoffMS']) \
.option("zookeeper.session.timeout.ms",
config['MDVariables']['zookeeperSessionTimeOutMs']) \
.option("auto.commit.interval.ms",
config['MDVariables']['autoCommitIntervalMS']) \
.option("subscribe", config['MDVariables']['topic']) \
.option("failOnDataLoss", "false") \
.option("includeHeaders", "true") \
.option("startingOffsets", "earliest") \
.load() \
.select(from_json(col("value").cast("string"),
schema).alias("parsed_value"))
return streamingDataFrame
except Exception as e:
print(f"""{e}, quitting""")
sys.exit(1)

def processData(self, streamingDataFrame):

result = streamingDataFrame. \
 writeStream. \
* foreach(ForeachWriter()).* \
 start()

result.awaitTermination()

if __name__ == "__main__":
appName = config['common']['appName']
spark_session = s.spark_session(appName)
spark_session = s.setSparkConfBQ(spark_session)
spark_context = s.sparkcontext()
mdstreaming = MDStreaming(spark_session, spark_context)
streamingDataFrame = mdstreaming.startStreaming()
mdstreaming.processData(streamingDataFrame)

That class ForeachWriter() is supposed to add data (batchsize 10 rows) to
the GCP BigQuery table. My code is as follows: However, it does not seem to
invoke methods in this class. Every 2 seconds a batch of 10 rows passed to
this class. Specifically in the method process(self,row) what
is rows_to_insert = [... signify?


class ForeachWriter:

'''

Class to send a set of rows to BigQuery.

When used with `foreach`, copies of this class is going to be used to
write

multiple rows in the executor. See the python docs for
`DataStreamWriter.foreach`

for more details.

'''


def open(self):

# This is called first when preparing to send multiple rows.

# Put all the initialization code inside open() so that a fresh

# copy of this class is initialized in the executor where open()

# will be called.

self.config = config

self.table_id =
self.config['MDVariables']['fullyQualifiedoutputTableId']

self.client =
bigquery.Client(self.config['MDVariables']['projectId'])

self.table_ref =
client.dataset(self.config['MDVariables']['targetDataset']).table(self.table_id)

return True

def process(self,row):


# This is called for each row after open() has been called.

# This implementation sends one row at a time.

# A more efficient implementation can be to send batches of rows at
a time.

rows_to_insert = [

{u"full_name": u"Phred Phlyntstone", u"age": 32},

{u"full_name": u"Wylma Phlyntstone", u"age": 29},

]


errors = client.insert_rows_json(self.table_id,
self.config['MDVariables']['rows_to_insert'], row_ids=[None] *
len(self.config['MDVariables']['rows_to_insert'])

)  # Make an API request.

if errors == []:

  print("New rows have been added.")

else:

  print("Encountered errors while inserting rows:
{}".format(errors))

def close(self, err):

# This is called after all the rows have been processed.

if err:

raise err





*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any 

Re: Spark on the cloud deployments

2021-02-24 Thread Lalwani, Jayesh
AWS has 2 offerings built on top of Spark: EMR and Glue. You can, of course, 
spin up your EC2 instances and deploy Spark on it. The 3 offerings allows you 
to tradeoff between flexibility and  infrastructure management. EC2 gives you 
the most flexibility, because it's basically a bunch of nodes, and you can 
configure spark anyway you want. Con is that you need to manage your EC2 
instances. EMR is a step up: You manage your EC2 instances, but you don't need 
to manage Spark. With Glue, you don't need to manage infrastructure.  Glue is 
serverless (for you)

Besides, those, you also get different choices. Like, if your usage is spiky, 
you could implement this in Kinesis. Or you could have your reporting 
application make queries to Athena

On 2/24/21, 10:25 AM, "Stephane Verlet"  wrote:

CAUTION: This email originated from outside of the organization. Do not 
click links or open attachments unless you can confirm the sender and know the 
content is safe.



Hello,

We have been using Spark on a on-premise cluster for several years and
looking at moving to a cloud deployment.

I was wondering what is your current favorite cloud setup.  Just simple
AWR / Azure, or something on top like Databricks ?

This would support a on demand report application so usage would be
sporadic with spikes during the day. Current deployment is Spark with
Hive data.

Thanks for sharing

Stephane



-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org




How to control count / size of output files for

2021-02-24 Thread Ivan Petrov
Hi, I'm trying to control the size and/or count of spark output.

Here is my code. I expect to get 5 files  but I get dozens of small files.
Why?

dataset
.repartition(5)
.sort("long_repeated_string_in_this_column") // should be better compressed
with snappy
.write
.parquet(outputPath)


Spark on the cloud deployments

2021-02-24 Thread Stephane Verlet

Hello,

We have been using Spark on a on-premise cluster for several years and 
looking at moving to a cloud deployment.


I was wondering what is your current favorite cloud setup.  Just simple 
AWR / Azure, or something on top like Databricks ?


This would support a on demand report application so usage would be 
sporadic with spikes during the day. Current deployment is Spark with 
Hive data.


Thanks for sharing

Stephane



-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org