Re: how can i write spark addListener metric to kafka

2020-06-09 Thread Tathagata Das
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#reporting-metrics-programmatically-using-asynchronous-apis

On Tue, Jun 9, 2020 at 4:42 PM a s  wrote:

> hi Guys,
>
> I am building a structured streaming  app for google analytics data
>
> i want to capture the number of rows read and processed
>
> i am able to see it in log how can i send it to kafka
>
>
> Thanks
> Alis
>


how can i write spark addListener metric to kafka

2020-06-09 Thread a s
hi Guys,

I am building a structured streaming  app for google analytics data

i want to capture the number of rows read and processed

i am able to see it in log how can i send it to kafka


Thanks
Alis


Re: [spark-structured-streaming] [kafka] consume topics from multiple Kafka clusters

2020-06-09 Thread Srinivas V
ok, thanks for confirming, I will do it this way.

Regards
Srini

On Tue, Jun 9, 2020 at 11:31 PM Gerard Maas  wrote:

> Hi Srinivas,
>
> Reading from different brokers is possible but you need to connect to each
> Kafka cluster separately.
> Trying to mix connections to two different Kafka clusters in one
> subscriber is not supported. (I'm sure that it would give all kind of weird
> errors)
> The  "kafka.bootstrap.servers" option is there to indicate the potential
> many brokers of the *same* Kafka cluster.
>
> The way to address this is following the suggestion of German to create a
> subscriptions for each Kafka cluster you are talking to.
>
> val df_cluster1 = spark
>   .read
>   .format("kafka")
>   .option("kafka.bootstrap.servers", "cluster1_host:cluster1_port")
>   .option("subscribe", "topic1, topic2")
>  .load()
>
> val df_cluster2 = spark
>   .read
>   .format("kafka")
>   .option("kafka.bootstrap.servers", "cluster2_host:cluster2_port")
>   .option("subscribe", "topic3, topicn, topicn+1,")
>  .load()
>
> After acquiring the DataFrame, you can union them and treat all the data
> with a single process.
>
> val unifiedData = df_cluster1.union(df_cluster2)
> // apply further transformations on `unifiedData`
>
> kr, Gerard.
>
>
> :
>
>
>
> On Tue, Jun 9, 2020 at 6:30 PM Srinivas V  wrote:
>
>> Thanks for the quick reply. This may work but I have like 5 topics to
>> listen to right now, I am trying to keep all topics in an array in a
>> properties file and trying to read all at once. This way it is dynamic and
>> you have one code block like below and you may add or delete topics from
>> the config file without changing code. If someone confirms that it does not
>> work, I would have to do something like you have provided.
>>
>> val df_cluster1 = spark
>>   .read
>>   .format("kafka")
>>   .option("kafka.bootstrap.servers", 
>> "cluster1_host:cluster1_port,cluster2_host:port")
>>
>> .option("subscribe", "topic1, topic2,topic3,topic4,topic5")
>>
>>


Re: [spark-structured-streaming] [kafka] consume topics from multiple Kafka clusters

2020-06-09 Thread Gerard Maas
Hi Srinivas,

Reading from different brokers is possible but you need to connect to each
Kafka cluster separately.
Trying to mix connections to two different Kafka clusters in one subscriber
is not supported. (I'm sure that it would give all kind of weird errors)
The  "kafka.bootstrap.servers" option is there to indicate the potential
many brokers of the *same* Kafka cluster.

The way to address this is following the suggestion of German to create a
subscriptions for each Kafka cluster you are talking to.

val df_cluster1 = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "cluster1_host:cluster1_port")
  .option("subscribe", "topic1, topic2")
 .load()

val df_cluster2 = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "cluster2_host:cluster2_port")
  .option("subscribe", "topic3, topicn, topicn+1,")
 .load()

After acquiring the DataFrame, you can union them and treat all the data
with a single process.

val unifiedData = df_cluster1.union(df_cluster2)
// apply further transformations on `unifiedData`

kr, Gerard.


:



On Tue, Jun 9, 2020 at 6:30 PM Srinivas V  wrote:

> Thanks for the quick reply. This may work but I have like 5 topics to
> listen to right now, I am trying to keep all topics in an array in a
> properties file and trying to read all at once. This way it is dynamic and
> you have one code block like below and you may add or delete topics from
> the config file without changing code. If someone confirms that it does not
> work, I would have to do something like you have provided.
>
> val df_cluster1 = spark
>   .read
>   .format("kafka")
>   .option("kafka.bootstrap.servers", 
> "cluster1_host:cluster1_port,cluster2_host:port")
>
> .option("subscribe", "topic1, topic2,topic3,topic4,topic5")
>
>


Re: [spark-structured-streaming] [kafka] consume topics from multiple Kafka clusters

2020-06-09 Thread Srinivas V
Thanks for the quick reply. This may work but I have like 5 topics to
listen to right now, I am trying to keep all topics in an array in a
properties file and trying to read all at once. This way it is dynamic and
you have one code block like below and you may add or delete topics from
the config file without changing code. If someone confirms that it does not
work, I would have to do something like you have provided.

val df_cluster1 = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers",
"cluster1_host:cluster1_port,cluster2_host:port")

.option("subscribe", "topic1, topic2,topic3,topic4,topic5")


Re: [spark-structured-streaming] [kafka] consume topics from multiple Kafka clusters

2020-06-09 Thread German SM
Hello,

I've never tried that, this doesn't work?

val df_cluster1 = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "cluster1_host:cluster1_port")
  .option("subscribe", "topic1")

val df_cluster2 = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "cluster2_host:port")
  .option("subscribe", "topic2")


On Tue, 9 Jun 2020 at 18:10, Srinivas V  wrote:

> Hello,
>  In Structured Streaming, is it possible to have one spark application
> with one query to consume topics from multiple kafka clusters?
>
> I am trying to consume two topics each from different Kafka Cluster, but
> it gives one of the topics as an unknown topic and the job keeps running
> without completing in Spark UI.
>
> Is it not allowed in Spark 2.4.5?
>
> Regards
> Srini
>
>
>
>


[spark-structured-streaming] [kafka] consume topics from multiple Kafka clusters

2020-06-09 Thread Srinivas V
Hello,
 In Structured Streaming, is it possible to have one spark application with
one query to consume topics from multiple kafka clusters?

I am trying to consume two topics each from different Kafka Cluster, but it
gives one of the topics as an unknown topic and the job keeps running
without completing in Spark UI.

Is it not allowed in Spark 2.4.5?

Regards
Srini


Out of memory causing due to high number of spark submissions in FIFO mode

2020-06-09 Thread Sunil Pasumarthi
Hi all,

I have written a small ETL spark application which takes data from GCS and
transforms them and saves them again into some other GCS bucket.
I am trying to run this application for different ids using a spark cluster
in google's dataproc and just tweaking the default configuration to use a
FAIR scheduler with FIFO queue by configuring these settings
  in /etc/hadoop/conf/yarn-site.xml
  yarn.resourcemanager.scheduler.class =
yarn.resourcemanager.scheduler.class
  yarn.scheduler.fair.allocation.file = /etc/hadoop/conf/fair-scheduler.xml
  yarn.scheduler.fair.user-as-default-queue = false
  in /etc/hadoop/conf/fair-scheduler.xml, allocations as
  1


in a spark cluster for a
2 core, 4GB RAM master   - 1
4 core, 16GB RAM workers - 2
I did testing for 5 spark submissions and everything is working as
expected. All the applications are running one after the other without any
exceptions.

when I tried to run the same testing exercise for 100 submissions, some of
the submissions failed with out of memory errors. When I re-ran the OOM
submissions individually they completed without any error.

the submission's log which has out of memory
'''
20/06/05 19:44:23 INFO org.spark_project.jetty.util.log: Logging
initialized @5463ms
20/06/05 19:44:24 INFO org.spark_project.jetty.server.Server:
jetty-9.3.z-SNAPSHOT, build timestamp: unknown, git hash: unknown
20/06/05 19:44:24 INFO org.spark_project.jetty.server.Server: Started
@5599ms
20/06/05 19:44:24 WARN org.apache.spark.util.Utils: Service 'SparkUI' could
not bind on port 4040. Attempting port 4041.
20/06/05 19:44:24 WARN org.apache.spark.util.Utils: Service 'SparkUI' could
not bind on port 4041. Attempting port 4042.
20/06/05 19:44:24 WARN org.apache.spark.util.Utils: Service 'SparkUI' could
not bind on port 4042. Attempting port 4043.
20/06/05 19:44:24 WARN org.apache.spark.util.Utils: Service 'SparkUI' could
not bind on port 4043. Attempting port 4044.
20/06/05 19:44:24 WARN org.apache.spark.util.Utils: Service 'SparkUI' could
not bind on port 4044. Attempting port 4045.
20/06/05 19:44:24 INFO org.spark_project.jetty.server.AbstractConnector:
Started ServerConnector@723f98fa{HTTP/1.1,[http/1.1]}{0.0.0.0:4045}
20/06/05 19:44:24 WARN org.apache.spark.scheduler.FairSchedulableBuilder:
Fair Scheduler configuration file not found so jobs will be scheduled in
FIFO order. To use fair scheduling, configure pools in fairscheduler.xml or
set spark.scheduler.allocation.file to a file that contains the
configuration.
20/06/05 19:44:26 INFO org.apache.hadoop.yarn.client.RMProxy: Connecting to
ResourceManager at airf-m-2c-w-4c-4-faff-m/10.160.0.156:8032
20/06/05 19:44:27 INFO org.apache.hadoop.yarn.client.AHSProxy: Connecting
to Application History server at airf-m-2c-w-4c-4-faff-m/10.160.0.156:10200
20/06/05 19:44:29 INFO
org.apache.hadoop.yarn.client.api.impl.YarnClientImpl: Submitted
application application_1591383928453_0047
20/06/05 19:46:34 WARN org.apache.spark.sql.SparkSession$Builder: Using an
existing SparkSession; some configuration may not take effect.
20/06/05 19:46:41 INFO
com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl:
Repairing batch of 24 missing directories.
20/06/05 19:46:44 INFO
com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl:
Successfully repaired 24/24 implicit directories.
OpenJDK 64-Bit Server VM warning: INFO:
os::commit_memory(0x9820, 46661632, 0) failed; error='Cannot
allocate memory' (errno=12)
#
# There is insufficient memory for the Java Runtime Environment to continue.
# Native memory allocation (mmap) failed to map 46661632 bytes for
committing reserved memory.
# An error report file with more information is saved as:
#
/tmp/9e22ca5b-5bf8-47b7-12ee-69cd9e37e7c8_spark_submit_20200605_82b0375c/hs_err_pid9917.log
Job output is complete
'''

ALso, when I was test running an application I never saw this log
  Service 'SparkUI' could not bind on port 4040. Attempting port 4041.

I am very new to spark. I didnt know which configurations might help to
debug this. This log also didn't help.
I lost the hs_err file when the cluster was deleted.
What can I do to debug this?
Thanks for taking your time to read this.


Re: [SPARK-30957][SQL] Null-safe variant of Dataset.join(Dataset[_], Seq[String])

2020-06-09 Thread Alexandros Biratsis
Hi Enrico and Spark devs,

Since the current plan is not to provide a built-in functionality for
dropping repeated/redundant columns, I wrote two helper methods as a
workaround solution.

The 1st method supports multiple Column instances extending the current drop

which
supports column names only:

implicit class DataframeExt(val df: DataFrame) {
  def drop(cols: Seq[Column]) : DataFrame = {
cols.foldLeft(df){
  (tdf, c) => tdf.drop(c)
}
  }
}

2nd implicit method which converts a sequence of column names into Column
instances, optionally binding them to the parent dataframes:

implicit class SeqExt(val cols: Seq[String]) {
  def toCol(dfs: DataFrame*) : Seq[Column] = {
if(dfs.nonEmpty) {
  dfs.foldLeft(Seq[Column]()) {
(acc, df) => acc ++ cols.map {df(_)}
  }
}
else{
  cols.map {col(_)}
}
  }
}

After adding these two to your library you can use it as:

import implicits._

val dropCols = Seq("c2", "c3")
val joinCols = Seq("c1")

val weatherDf = dfA.join(dfB, joinCols, "inner")
 .join(dfC, joinCols, "inner")
 .join(dfD, joinCols, "inner")
 .drop(dropCols.toCol(dfB, dfC, dfD))

Cheers,
Alex

On Wed, Feb 26, 2020 at 10:07 AM Enrico Minack 
wrote:

> I have created a jira to track this request:
> https://issues.apache.org/jira/browse/SPARK-30957
>
> Enrico
>
> Am 08.02.20 um 16:56 schrieb Enrico Minack:
>
> Hi Devs,
>
> I am forwarding this from the user mailing list. I agree that the <=>
> version of join(Dataset[_], Seq[String]) would be useful.
>
> Does any PMC consider this useful enough to be added to the Dataset API?
> I'd be happy to create a PR in that case.
>
> Enrico
>
>
>  Weitergeleitete Nachricht 
> Betreff: dataframe null safe joins given a list of columns
> Datum: Thu, 6 Feb 2020 12:45:11 +
> Von: Marcelo Valle  
> An: user @spark  
>
> I was surprised I couldn't find a way of solving this in spark, as it must
> be a very common problem for users. Then I decided to ask here.
>
> Consider the code bellow:
>
> ```
> val joinColumns = Seq("a", "b")
> val df1 = Seq(("a1", "b1", "c1"), ("a2", "b2", "c2"), ("a4", null,
> "c4")).toDF("a", "b", "c")
> val df2 = Seq(("a1", "b1", "d1"), ("a3", "b3", "d3"), ("a4", null,
> "d4")).toDF("a", "b", "d")
> df1.join(df2, joinColumns).show()
> ```
>
> The output is :
>
> ```
> +---+---+---+---+
> |  a|  b|  c|  d|
> +---+---+---+---+
> | a1| b1| c1| d1|
> +---+---+---+---+
> ```
>
> But I want it to be:
>
> ```
> +---+-+---+---+
> |  a|b|  c|  d|
> +---+-+---+---+
> | a1|   b1| c1| d1|
> | a4| null| c4| d4|
> +---+-+---+---+
> ```
>
> The join syntax of `df1.join(df2, joinColumns)` has some advantages, as it
> doesn't create duplicate columns by default. However, it uses the operator
> `===` to join, not the null safe one `<=>`.
>
> Using the following syntax:
>
> ```
> df1.join(df2, df1("a") <=> df2("a") && df1("b") <=> df2("b")).show()
> ```
>
> Would produce:
>
> ```
> +---++---+---++---+
> |  a|   b|  c|  a|   b|  d|
> +---++---+---++---+
> | a1|  b1| c1| a1|  b1| d1|
> | a4|null| c4| a4|null| d4|
> +---++---+---++---+
> ```
>
> So to get the result I really want, I must do:
>
> ```
> df1.join(df2, df1("a") <=> df2("a") && df1("b") <=>
> df2("b")).drop(df2("a")).drop(df2("b")).show()
> +---++---+---+
> |  a|   b|  c|  d|
> +---++---+---+
> | a1|  b1| c1| d1|
> | a4|null| c4| d4|
> +---++---+---+
> ```
>
> Which works, but is really verbose, especially when you have many join
> columns.
>
> Is there a better way of solving this without needing a utility method?
> This same problem is something I find in every spark project.
>
>
>
> This email is confidential [and may be protected by legal privilege]. If
> you are not the intended recipient, please do not copy or disclose its
> content but contact the sender immediately upon receipt.
>
> KTech Services Ltd is registered in England as company number 10704940.
>
> Registered Office: The River Building, 1 Cousin Lane, London EC4R 3TE,
> United Kingdom
>
>
>


[PySpark CrossValidator] Dropping column randCol before fitting model

2020-06-09 Thread Ablaye FAYE
Hello,

I have noticed that the _fit method of CrossValidator class adds a new
column (randCol) to the input dataset in Pyspark. This column allows to
split the dataset in k folds.

Is this variable removed from the training data and test data of the fold
before fitting model?

I ask this question because I've gone through all the code but I haven't
seen a place where this variable is removed before executing the fitting.

Thanks for your help