Re: Message getting lost in Kafka + Spark Streaming

2017-05-31 Thread Vikash Pareek
Thanks Sidney for your response,

To check if all the messages are processed I used accumulator and also add
a print statement for debuging.


*val accum = ssc.sparkContext.accumulator(0, "Debug Accumulator")*
*...*
*...*
*...*
*val mappedDataStream = dataStream.map(_._2);*
*  mappedDataStream.foreachRDD { rdd =>*
*...*
*...*
*...*
*partition.foreach { row =>*
*  if (debug) println(row.mkString)*
*  val keyedMessage = new KeyedMessage[String,
String](props.getProperty("outTopicUnharmonized"),*
*null, row.toString())*
*  producer.send(keyedMessage)*
*  println("Messges sent to Kafka: " + keyedMessage.message)*
*  accum += 1*
*}*
*//hack, should be done with the flush*
*Thread.sleep(1000)*
*producer.close()*
*print("Accumulator's value is: " + accum)*

And I am getting all the messages in "*println("Messges sent to Kafka: " +
keyedMessage.message)*" received by the stream, and accumulator value is
also same as number of incoming messages.



Best Regards,


[image: InfoObjects Inc.] 
Vikash Pareek
Team Lead  *InfoObjects Inc.*
Big Data Analytics

m: +91 8800206898 a: E5, Jhalana Institutionall Area, Jaipur, Rajasthan
302004
w: www.linkedin.com/in/pvikash e: vikash.par...@infoobjects.com




On Thu, Jun 1, 2017 at 11:24 AM, Sidney Feiner 
wrote:

> Are you sure that every message gets processed? It could be that some
> messages failed passing the decoder.
> And during the processing, are you maybe putting the events into a map?
> That way, events with the same key could override each other and that way
> you'll have less final events.
>
> -Original Message-
> From: Vikash Pareek [mailto:vikash.par...@infoobjects.com]
> Sent: Tuesday, May 30, 2017 4:00 PM
> To: user@spark.apache.org
> Subject: Message getting lost in Kafka + Spark Streaming
>
> I am facing an issue related to spark streaming with kafka, my use case is
> as
> follow:
> 1. Spark streaming(DirectStream) application reading data/messages from
> kafka topic and process it 2. On the basis of proccessed message, app will
> write proccessed message to different kafka topics for e.g. if messgese is
> harmonized then write to harmonized topic else unharmonized topic
>
> the problem is that during the streaming somehow we are lossing some
> messaged i.e all the incoming messages are not written to harmonized or
> unharmonized topics.
> for e.g. if app received 30 messages in one batch then sometime it write
> all the messges to output topics(this is expected behaviour) but sometimes
> it writes only 27 (3 messages are lost, this number can change).
>
> Versions as follow:
> Spark 1.6.0
> Kafka 0.9
>
> Kafka topics confguration is as follow:
> # of brokers: 3
> # replicxation factor: 3
> # of paritions: 3
>
> Following are the properties we are using for kafka:
> *  val props = new Properties()
>   props.put("metadata.broker.list",
> properties.getProperty("metadataBrokerList"))
>   props.put("auto.offset.reset",
> properties.getProperty("autoOffsetReset"))
>   props.put("group.id", properties.getProperty("group.id"))
>   props.put("serializer.class", "kafka.serializer.StringEncoder")
>   props.put("outTopicHarmonized",
> properties.getProperty("outletKafkaTopicHarmonized"))
>   props.put("outTopicUnharmonized",
> properties.getProperty("outletKafkaTopicUnharmonized"))
>   props.put("acks", "all");
>   props.put("retries", "5");
>   props.put("request.required.acks", "-1")
> *
> Following is the piece of code where we are writing proccessed messges to
> kafka:
> *  val schemaRdd2 = finalHarmonizedDF.toJSON
>
>   schemaRdd2.foreachPartition { partition =>
> val producerConfig = new ProducerConfig(props)
> val producer = new Producer[String, String](producerConfig)
>
> partition.foreach { row =>
>   if (debug) println(row.mkString)
>   val keyedMessage = new KeyedMessage[String,
> String](props.getProperty("outTopicHarmonized"),
> null, row.toString())
>   producer.send(keyedMessage)
>
> }
> //hack, should be done with the flush
> Thread.sleep(1000)
> producer.close()
>   }
> *
> We explicitely added sleep(1000) for testing purpose.
> But this is also not solving the problem :(
>
> Any suggestion would be appreciated.
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Message-getting-lost-in-Kafka-Spark-
> Streaming-tp28719.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


RE: Message getting lost in Kafka + Spark Streaming

2017-05-31 Thread Sidney Feiner
Are you sure that every message gets processed? It could be that some messages 
failed passing the decoder.
And during the processing, are you maybe putting the events into a map? That 
way, events with the same key could override each other and that way you'll 
have less final events.

-Original Message-
From: Vikash Pareek [mailto:vikash.par...@infoobjects.com] 
Sent: Tuesday, May 30, 2017 4:00 PM
To: user@spark.apache.org
Subject: Message getting lost in Kafka + Spark Streaming

I am facing an issue related to spark streaming with kafka, my use case is as
follow:
1. Spark streaming(DirectStream) application reading data/messages from kafka 
topic and process it 2. On the basis of proccessed message, app will write 
proccessed message to different kafka topics for e.g. if messgese is harmonized 
then write to harmonized topic else unharmonized topic
 
the problem is that during the streaming somehow we are lossing some messaged 
i.e all the incoming messages are not written to harmonized or unharmonized 
topics.
for e.g. if app received 30 messages in one batch then sometime it write all 
the messges to output topics(this is expected behaviour) but sometimes it 
writes only 27 (3 messages are lost, this number can change).
 
Versions as follow:
Spark 1.6.0
Kafka 0.9
 
Kafka topics confguration is as follow:
# of brokers: 3
# replicxation factor: 3
# of paritions: 3
 
Following are the properties we are using for kafka:
*  val props = new Properties()
  props.put("metadata.broker.list",
properties.getProperty("metadataBrokerList"))
  props.put("auto.offset.reset",
properties.getProperty("autoOffsetReset"))
  props.put("group.id", properties.getProperty("group.id"))
  props.put("serializer.class", "kafka.serializer.StringEncoder")
  props.put("outTopicHarmonized",
properties.getProperty("outletKafkaTopicHarmonized"))
  props.put("outTopicUnharmonized",
properties.getProperty("outletKafkaTopicUnharmonized"))
  props.put("acks", "all");
  props.put("retries", "5");
  props.put("request.required.acks", "-1")
*
Following is the piece of code where we are writing proccessed messges to
kafka:
*  val schemaRdd2 = finalHarmonizedDF.toJSON
 
  schemaRdd2.foreachPartition { partition =>
val producerConfig = new ProducerConfig(props)
val producer = new Producer[String, String](producerConfig)
 
partition.foreach { row =>
  if (debug) println(row.mkString)
  val keyedMessage = new KeyedMessage[String, 
String](props.getProperty("outTopicHarmonized"),
null, row.toString())
  producer.send(keyedMessage)
 
}
//hack, should be done with the flush
Thread.sleep(1000)
producer.close()
  }
*
We explicitely added sleep(1000) for testing purpose.
But this is also not solving the problem :(
 
Any suggestion would be appreciated.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Message-getting-lost-in-Kafka-Spark-Streaming-tp28719.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



RE: The following Error seems to happen once in every ten minutes (Spark Structured Streaming)?

2017-05-31 Thread Mahesh Sawaiker
Your data node(s) is/are going down for some reason, check the logs of the 
datanode and fix the underlying issue why datanode is going down.
There should be no need to delete any data, just starting the data nodes should 
do the trick for you.

From: kant kodali [mailto:kanth...@gmail.com]
Sent: Thursday, June 01, 2017 4:35 AM
To: user @spark
Subject: The following Error seems to happen once in every ten minutes (Spark 
Structured Streaming)?


Hi All,

When my query is streaming I get the following error once in say 10 minutes. 
Lot of the solutions online seems to suggest just clear data directories under 
datanode and namenode and restart the HDFS cluster but I didn't see anything 
that explains the cause? If it happens so frequent what do I need to do? I use 
spark standalone 2.1.1 (I don't use any resource managers like YARN or Mesos at 
this time)



org.apache.spark.util.TaskCompletionListenerException: File 
/usr/local/hadoop/metrics/state/0/5/temp-6025335567362823423 could only be 
replicated to 0 nodes instead of minReplication (=1).  There are 2 datanode(s) 
running and no node(s) are excluded in this operation.



Thanks!

DISCLAIMER
==
This e-mail may contain privileged and confidential information which is the 
property of Persistent Systems Ltd. It is intended only for the use of the 
individual or entity to which it is addressed. If you are not the intended 
recipient, you are not authorized to read, retain, copy, print, distribute or 
use this message. If you have received this communication in error, please 
notify the sender and delete all copies of this message. Persistent Systems 
Ltd. does not accept any liability for virus infected mails.



RE: Spark sql with Zeppelin, Task not serializable error when I try to cache the spark sql table

2017-05-31 Thread Mahesh Sawaiker
It’s because the class in which you have defined the udf is not serializable.
Declare the udf in a class and make the class seriablizable.

From: shyla deshpande [mailto:deshpandesh...@gmail.com]
Sent: Thursday, June 01, 2017 10:08 AM
To: user
Subject: Spark sql with Zeppelin, Task not serializable error when I try to 
cache the spark sql table

Hello all,

I am using Zeppelin 0.7.1 with Spark 2.1.0

I am getting org.apache.spark.SparkException: Task not serializable error when 
I try to cache the spark sql table. I am using a UDF on a column of table and 
want to cache the resultant table . I can execute the paragraph successfully 
when there is no caching.

Please help! Thanks
---Following is my code
UDF :
def fn1(res: String): Int = {
  100
}
 spark.udf.register("fn1", fn1(_: String): Int)


   spark
  .read
  .format("org.apache.spark.sql.cassandra")
  .options(Map("keyspace" -> "k", "table" -> "t"))
  .load
  .createOrReplaceTempView("t1")


 val df1 = spark.sql("SELECT  col1, col2, fn1(col3)   from t1" )

 df1.createOrReplaceTempView("t2")

   spark.catalog.cacheTable("t2")

DISCLAIMER
==
This e-mail may contain privileged and confidential information which is the 
property of Persistent Systems Ltd. It is intended only for the use of the 
individual or entity to which it is addressed. If you are not the intended 
recipient, you are not authorized to read, retain, copy, print, distribute or 
use this message. If you have received this communication in error, please 
notify the sender and delete all copies of this message. Persistent Systems 
Ltd. does not accept any liability for virus infected mails.



Spark sql with Zeppelin, Task not serializable error when I try to cache the spark sql table

2017-05-31 Thread shyla deshpande
Hello all,

I am using Zeppelin 0.7.1 with Spark 2.1.0

I am getting org.apache.spark.SparkException: Task not serializable error
when I try to cache the spark sql table. I am using a UDF on a column of
table and want to cache the resultant table . I can execute the paragraph
successfully when there is no caching.

Please help! Thanks
---Following is my code
UDF :
def fn1(res: String): Int = {
  100
}
 spark.udf.register("fn1", fn1(_: String): Int)


   spark
  .read
  .format("org.apache.spark.sql.cassandra")
  .options(Map("keyspace" -> "k", "table" -> "t"))
  .load
  .createOrReplaceTempView("t1")


 val df1 = spark.sql("SELECT  col1, col2, fn1(col3)   from t1" )

 df1.createOrReplaceTempView("t2")

   spark.catalog.cacheTable("t2")


Question about mllib.recommendation.ALS

2017-05-31 Thread Sahib Aulakh [Search] ­
Hello:

I am training the ALS model for recommendations. I have about 200m ratings
from about 10m users and 3m products. I have a small cluster with 48 cores
and 120gb cluster-wide memory.

My code is very similar to the example code

spark/examples/src/main/scala/org/apache/spark/examples/mllib/MovieLensALS.scala
code.

I have a couple of questions:


   1. All steps up to model training runs reasonably fast. Model training
   is under 10 minutes for rank 20. However, the
   model.recommendProductsForUsers step is either slow or just does not work
   as the code just seems to hang at this point. I have tried user and product
   blocks sizes of -1 and 20, 40, etc, played with executor memory size, etc.
   Can someone shed some light here as to what could be wrong?
   2. Also, is there any example code for the ml.recommendation.ALS
   algorithm? I can figure out how to train the model but I don't understand
   (from the documentation) how to perform predictions?

Thanks for any information you can provide.
Sahib Aulakh.


-- 
Sahib Aulakh
Sr. Principal Engineer


The following Error seems to happen once in every ten minutes (Spark Structured Streaming)?

2017-05-31 Thread kant kodali
Hi All,

When my query is streaming I get the following error once in say 10
minutes. Lot of the solutions online seems to suggest just clear data
directories under datanode and namenode and restart the HDFS cluster but I
didn't see anything that explains the cause? If it happens so frequent what
do I need to do? I use spark standalone 2.1.1 (I don't use any resource
managers like YARN or Mesos at this time)


org.apache.spark.util.TaskCompletionListenerException: File
/usr/local/hadoop/metrics/state/0/5/temp-6025335567362823423 could only be
replicated to 0 nodes instead of minReplication (=1).  There are 2
datanode(s) running and no node(s) are excluded in this operation.


Thanks!


[apache-spark] Re: Problem with master webui with reverse proxy when workers >= 10

2017-05-31 Thread Trevor McKay
So one thing I've noticed, if I turn on as much debug logging as I can
find, is that when reverse proxy is enabled I end up with a lot of
these threads being logged:

17/05/31 21:06:37 DEBUG SelectorManager: Starting Thread[MasterUI-218-s
elector-ClientSelectorManager@36a67cc9/14,5,main] on org.spark_project.
jetty.io.SelectorManager$ManagedSelector@7bfecdf7 keys=0 selected=0

I mean, a lot.  They appear to be periodic.

Those are not present in the log when reverse proxy is off.  So I'm
wondering if some jetty resource or thread pool is
being overrun (but I know nothing about jetty).

Spark any ideas?

Best,

Trevor

On Wed, 2017-05-31 at 12:46 -0700, tmckay wrote:
> Hi folks,
> 
> >   I'm running a containerized spark 2.1.0 setup.  If I have reverse
proxy
> > turned on, everything is fine if I have fewer than 10 workers.  If I
create
> a cluster with 10 or more, the master web ui is unavailable.
> 
> > This is even true for the same cluster if I start with a lower number
and
> > add workers to bring the total above 10.  The web ui is available
until I
> cross that boundary, then becomes unavailable.
> 
> > If I run without reverse proxy, I can create clusters of 10+ (20, 30,
40
> ...) and the console is always available.
> 
> > Any ideas? Has anyone seen this behavior on bear metal, or VMs, or
with
> containers?
> > Any logging I can turn on in the master webserver that might shed
some
> light?
> 
> Thanks,
> 
> Trevor McKay
> 
> 
> 
> --
> > > View this message in context: http://apache-spark-user-
list.1001560.n3.nabble.com/Problem-with-master-webui-with-reverse-
proxy-when-workers-10-tp28729.html
> > Sent from the Apache Spark User List mailing list archive at
Nabble.com.
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 

Re: Using SparkContext in Executors

2017-05-31 Thread lucas.g...@gmail.com
+1 to Ayan's answer, I think this is a common distributed anti pattern that
trips us all up at some point or another.

You definitely want to (in most cases) yield and create a new
RDD/Dataframe/Dataset and then perform your save operation on that.

On 28 May 2017 at 21:09, ayan guha  wrote:

> Hi
>
> You can modify your parse function to yield/emit the output record,
> instead of inserting. that way, you can essentially call .toDF to convert
> the outcome to a dataframe and then use driver's cassandra connection to
> save to cassandra (data will still in Executors, but now connector itself
> will create local connections and communicate with cassandra from
> executor).
>
> On Mon, May 29, 2017 at 8:55 AM, Stephen Boesch  wrote:
>
>> You would need to use *native* Cassandra API's in each Executor -   not
>> org.apache.spark.sql.cassandra.CassandraSQLContext -  including to create
>> a separate Cassandra connection on each Executor.
>>
>> 2017-05-28 15:47 GMT-07:00 Abdulfattah Safa :
>>
>>> So I can't run SQL queries in Executors ?
>>>
>>> On Sun, May 28, 2017 at 11:00 PM Mark Hamstra 
>>> wrote:
>>>
 You can't do that. SparkContext and SparkSession can exist only on the
 Driver.

 On Sun, May 28, 2017 at 6:56 AM, Abdulfattah Safa <
 fattah.s...@gmail.com> wrote:

> How can I use SparkContext (to create Spark Session or Cassandra
> Sessions) in executors?
> If I pass it as parameter to the foreach or foreachpartition, then it
> will have a null value.
> Shall I create a new SparkContext in each executor?
>
> Here is what I'm trying to do:
> Read a dump directory with millions of dump files as follows:
>
> dumpFiles = Directory.listFiles(dumpDirectory)
> dumpFilesRDD = sparkContext.parallize(dumpFiles, numOfSlices)
> dumpFilesRDD.foreachPartition(dumpFilePath->parse(dumpFilePath))
> .
> .
> .
>
> In parse(), each dump file is parsed and inserted into database using
> SparlSQL. In order to do that, SparkContext is needed in the function 
> parse
> to use the sql() method.
>


>>
>
>
> --
> Best Regards,
> Ayan Guha
>


good http sync client to be used with spark

2017-05-31 Thread vimal dinakaran
Hi,
 In our application pipeline we need to push the data from spark streaming
to a http server.

I would like to have a http client with below requirements.

1. synchronous calls
2. Http connection pool support
3. light weight and easy to use.

spray,akka http are mostly suited for async call . Correct me if I am wrong.

Could you please let me know what is the client that suits the above ?


mapWithState termination

2017-05-31 Thread Dominik Safaric
Dear all,

I would appreciate if anyone could explain when does mapWithState terminate, 
i.e. apply subsequent transformations such as writing the state to an external 
sink? 

Given a KafkaConsumer instance pulling messages from a Kafka topic, and a 
mapWithState transformation updating the state given a certain key, the 
subsequent foreachRDD transformation is not being applied at all. However, when 
running the application in debug mode using a sufficiently small input data 
size, of for example a few thousand messages, the foreachRDD transformation is 
applied upon consumption of all messages. Is this the desired behaviour? Does 
the timeout interval of the mapWithState control this behaviour, and if yes, 
what is the default value? In addition, is there an alternative in updating 
state for a given key, and writing the output to within the 
foreachRDD/foreachPartition transformation every n seconds?

Thanks in advance,
Dominik 

Re: Running into the same problem as JIRA SPARK-20325

2017-05-31 Thread Michael Armbrust
>
> So, my question is the same as stated in the following ticket which is Do
> we need create a checkpoint directory for each individual query?
>

Yes.  Checkpoints record what data has been processed.  Thus two different
queries need their own checkpoints.


Problem with master webui with reverse proxy when workers >= 10

2017-05-31 Thread tmckay
Hi folks,

  I'm running a containerized spark 2.1.0 setup.  If I have reverse proxy
turned on, everything is fine if I have fewer than 10 workers.  If I create
a cluster with 10 or more, the master web ui is unavailable.

This is even true for the same cluster if I start with a lower number and
add workers to bring the total above 10.  The web ui is available until I
cross that boundary, then becomes unavailable.

If I run without reverse proxy, I can create clusters of 10+ (20, 30, 40
...) and the console is always available.

Any ideas? Has anyone seen this behavior on bear metal, or VMs, or with
containers?
Any logging I can turn on in the master webserver that might shed some
light?

Thanks,

Trevor McKay



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Problem-with-master-webui-with-reverse-proxy-when-workers-10-tp28729.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Running into the same problem as JIRA SPARK-20325

2017-05-31 Thread kant kodali
Hi All,

I am using Spark 2.1.1 and forEachSink to write to Kafka. I call .start and
.awaitTermination for each query however I get the following error

"Cannot start query with id d4b3554d-ee1d-469c-bf9d-19976c8a7b47 as another
query with same id is already active"


So, my question is the same as stated in the following ticket which is Do
we need create a checkpoint directory for each individual query?

https://issues.apache.org/jira/browse/SPARK-20325

Thanks!


An Architecture question on the use of virtualised clusters

2017-05-31 Thread Mich Talebzadeh
Hi,

I realize this may not have direct relevance to Spark but has anyone tried
to create virtualized HDFS clusters using tools like ISILON or similar?

The prime motive behind this approach is to minimize the propagation or
copy of data which has regulatory implication. In shoret you want your data
to be in one place regardless of artefacts used against it such as Spark?

Thanks,

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.


Creating Dataframe by querying Impala

2017-05-31 Thread morfious902002
Hi,
I am trying to create a Dataframe by querying Impala Table. It works fine in
my local environment but when I try to run it in cluster I either get 

Error:java.lang.ClassNotFoundException: com.cloudera.impala.jdbc41.Driver

or

No Suitable Driver found. 

Can someone help me or direct me to how I can accomplish this?

I am using Spark 1.6.1. Here is my command (No Suitable Driver found error)
:-
'/appserver/spark/spark-1.6.1-bin-hadoop2.6/bin/spark-submit' '--master'
'yarn' '--deploy-mode' 'cluster' '--name' 'Livy' '--jars'
"hdfs:///user/lib/ImpalaJDBC41.jar,hdfs:///user/lib/TCLIServiceClient.jar,hdfs:///user/lib/libfb303-0.9.0.jar,hdfs:///user/lib/libthrift-0.9.0.jar,hdfs:///user/lib/hive_metastore.jar,hdfs:///user/lib/hive_service.jar"
'--class' 'Main.class' '--driver-memory' '5G' '--driver-cores' '2'
'--executor-memory' '8G' '--driver-cores' '2' '--executor-cores' '3'
'--num-executors' '2' 'my.jar' 'arg' 'arg' 'arg'



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Creating-Dataframe-by-querying-Impala-tp28723.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: How to convert Dataset to Dataset in Spark Structured Streaming?

2017-05-31 Thread kant kodali
https://stackoverflow.com/questions/44280360/how-to-convert-datasetrow-to-dataset-of-json-messages-to-write-to-kafka

Thanks!

On Wed, May 31, 2017 at 1:41 AM, kant kodali  wrote:

> small correction.
>
> If I try to convert a Row into a Json String it results into something
> like this {"key1", "name", "value1": "hello",  "key2", "ratio", "value2":
> 1.56 , "key3", "count", "value3": 34} but *what I need is something like
> this { result: {"name": "hello", "ratio": 1.56, "count": 34} } however I
> don't have a result column or even this {**"name": "hello", "ratio":
> 1.56, "count": 34} **would work.*
>
>
> On Wed, May 31, 2017 at 1:05 AM, kant kodali  wrote:
>
>> Hi Jules,
>>
>> I read that blog several times prior to asking this question.
>>
>> Thanks!
>>
>> On Wed, May 31, 2017 at 12:12 AM, Jules Damji 
>> wrote:
>>
>>> Hello Kant,
>>>
>>> See is the examples in this blog explains how to deal with your
>>> particular case: https://databricks.com/blog/2017/02/23/working-complex
>>> -data-formats-structured-streaming-apache-spark-2-1.html
>>>
>>> Cheers
>>> Jules
>>>
>>> Sent from my iPhone
>>> Pardon the dumb thumb typos :)
>>>
>>> On May 30, 2017, at 7:31 PM, kant kodali  wrote:
>>>
>>> Hi All,
>>>
>>> I have a Dataset and I am trying to convert it into Dataset
>>> (json String) using Spark Structured Streaming. I have tried the following.
>>>
>>> df2.toJSON().writeStream().foreach(new KafkaSink())
>>>
>>> This doesn't seem to work for the following reason.
>>>
>>> "Queries with streaming sources must be executed with
>>> writeStream.start()"
>>>
>>> My dataframe has looks like this
>>>
>>> name, ratio, count  // column names
>>>
>>> "hello", 1.56, 34
>>>
>>> If I try to convert a Row into a Json String it results into something
>>> like this {"key1", "name", "value1": "hello",  "key2", "ratio", "value2":
>>> 1.56 , "key3", "count", "value3": 34} but *what I need is something
>>> like this { result: {"name": "hello", "ratio": 1.56, "count": 34} } however
>>> I don't have a result column. *
>>>
>>> It looks like there are couple of functions to_json and json_tuple but
>>> they seem to take only one Column as a first argument so should I call
>>> to_json on every column? Also how would I turn this into DataSet ?
>>>
>>> Thanks!
>>>
>>>
>>>
>>>
>>>
>>
>


Re: Worker node log not showed

2017-05-31 Thread Paolo Patierno
No it's running in standalone mode as Docker image on Kubernetes.


The only way I found was to access "stderr" file created under the "work" 
directory in the SPARK_HOME but ... is it the right way ?


Paolo Patierno
Senior Software Engineer (IoT) @ Red Hat
Microsoft MVP on Windows Embedded & IoT
Microsoft Azure Advisor

Twitter : @ppatierno
Linkedin : paolopatierno
Blog : DevExperience



From: Alonso Isidoro Roman 
Sent: Wednesday, May 31, 2017 8:39 AM
To: Paolo Patierno
Cc: user@spark.apache.org
Subject: Re: Worker node log not showed

Are you running the code with yarn?

if so, figure out the applicationID through the web ui, then run the next 
command:

yarn logs your_application_id


Alonso Isidoro Roman
about.me/alonso.isidoro.roman


2017-05-31 9:42 GMT+02:00 Paolo Patierno 
mailto:ppatie...@live.com>>:

Hi all,


I have a simple cluster with one master and one worker. On another machine I 
launch the driver where at some point I have following line of codes :


max.foreachRDD(rdd -> {

LOG.info("*** max.foreachRDD");

rdd.foreach(value -> {

LOG.info("*** rdd.foreach");

});
});

The message "*** max.foreachRDD" is visible in the console of the driver 
machine ... and it's ok.
I can't see the "*** rdd.foreach" message that should be executed on the worker 
node right ? Btw on the worker node console I can't see it. Why ?

My need is to log what happens in the code executed on worker node (because it 
works if I execute it on master local[*] but not when submitted to a worker 
node).

Following the log4j.properties file I put in the /conf dir :


# Set everything to be logged to the console
log4j.rootCategory=INFO, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: 
%m%n

# Settings to quiet third party logs that are too verbose
log4j.logger.org.spark-project.jetty=WARN
log4j.logger.org.spark-project.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO


Thanks
Paolo.


Paolo Patierno
Senior Software Engineer (IoT) @ Red Hat
Microsoft MVP on Windows Embedded & IoT
Microsoft Azure Advisor

Twitter : @ppatierno
Linkedin : paolopatierno
Blog : DevExperience



Re: How to convert Dataset to Dataset in Spark Structured Streaming?

2017-05-31 Thread kant kodali
small correction.

If I try to convert a Row into a Json String it results into something like
this {"key1", "name", "value1": "hello",  "key2", "ratio", "value2": 1.56 ,
"key3", "count", "value3": 34} but *what I need is something like this {
result: {"name": "hello", "ratio": 1.56, "count": 34} } however I don't
have a result column or even this {**"name": "hello", "ratio": 1.56,
"count": 34} **would work.*


On Wed, May 31, 2017 at 1:05 AM, kant kodali  wrote:

> Hi Jules,
>
> I read that blog several times prior to asking this question.
>
> Thanks!
>
> On Wed, May 31, 2017 at 12:12 AM, Jules Damji  wrote:
>
>> Hello Kant,
>>
>> See is the examples in this blog explains how to deal with your
>> particular case: https://databricks.com/blog/2017/02/23/working-complex
>> -data-formats-structured-streaming-apache-spark-2-1.html
>>
>> Cheers
>> Jules
>>
>> Sent from my iPhone
>> Pardon the dumb thumb typos :)
>>
>> On May 30, 2017, at 7:31 PM, kant kodali  wrote:
>>
>> Hi All,
>>
>> I have a Dataset and I am trying to convert it into Dataset
>> (json String) using Spark Structured Streaming. I have tried the following.
>>
>> df2.toJSON().writeStream().foreach(new KafkaSink())
>>
>> This doesn't seem to work for the following reason.
>>
>> "Queries with streaming sources must be executed with writeStream.start()"
>>
>> My dataframe has looks like this
>>
>> name, ratio, count  // column names
>>
>> "hello", 1.56, 34
>>
>> If I try to convert a Row into a Json String it results into something
>> like this {"key1", "name", "value1": "hello",  "key2", "ratio", "value2":
>> 1.56 , "key3", "count", "value3": 34} but *what I need is something like
>> this { result: {"name": "hello", "ratio": 1.56, "count": 34} } however I
>> don't have a result column. *
>>
>> It looks like there are couple of functions to_json and json_tuple but
>> they seem to take only one Column as a first argument so should I call
>> to_json on every column? Also how would I turn this into DataSet ?
>>
>> Thanks!
>>
>>
>>
>>
>>
>


Re: Worker node log not showed

2017-05-31 Thread Alonso Isidoro Roman
Are you running the code with yarn?

if so, figure out the applicationID through the web ui, then run the next
command:

yarn logs your_application_id

Alonso Isidoro Roman
[image: https://]about.me/alonso.isidoro.roman


2017-05-31 9:42 GMT+02:00 Paolo Patierno :

> Hi all,
>
>
> I have a simple cluster with one master and one worker. On another machine
> I launch the driver where at some point I have following line of codes :
>
>
> max.foreachRDD(rdd -> {
>
> LOG.info("*** max.foreachRDD");
>
> rdd.foreach(value -> {
>
> LOG.info("*** rdd.foreach");
>
> });
> });
>
>
> The message "*** max.foreachRDD" is visible in the console of the driver
> machine ... and it's ok.
> I can't see the "*** rdd.foreach" message that should be executed on the
> worker node right ? Btw on the worker node console I can't see it. Why ?
>
> My need is to log what happens in the code executed on worker node
> (because it works if I execute it on master local[*] but not when submitted
> to a worker node).
>
> Following the log4j.properties file I put in the /conf dir :
>
> # Set everything to be logged to the console
> log4j.rootCategory=INFO, console
> log4j.appender.console=org.apache.log4j.ConsoleAppender
> log4j.appender.console.target=System.err
> log4j.appender.console.layout=org.apache.log4j.PatternLayout
> log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p 
> %c{1}: %m%n
>
> # Settings to quiet third party logs that are too verbose
> log4j.logger.org.spark-project.jetty=WARN
> log4j.logger.org.spark-project.jetty.util.component.AbstractLifeCycle=ERROR
> log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
> log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
>
>
>
> Thanks
> Paolo.
>
>
> *Paolo Patierno*
>
> *Senior Software Engineer (IoT) @ Red Hat **Microsoft MVP on **Windows
> Embedded & IoT*
> *Microsoft Azure Advisor*
>
> Twitter : @ppatierno 
> Linkedin : paolopatierno 
> Blog : DevExperience 
>


Re: How to convert Dataset to Dataset in Spark Structured Streaming?

2017-05-31 Thread kant kodali
Hi Jules,

I read that blog several times prior to asking this question.

Thanks!

On Wed, May 31, 2017 at 12:12 AM, Jules Damji  wrote:

> Hello Kant,
>
> See is the examples in this blog explains how to deal with your particular
> case: https://databricks.com/blog/2017/02/23/working-complex-data-formats-
> structured-streaming-apache-spark-2-1.html
>
> Cheers
> Jules
>
> Sent from my iPhone
> Pardon the dumb thumb typos :)
>
> On May 30, 2017, at 7:31 PM, kant kodali  wrote:
>
> Hi All,
>
> I have a Dataset and I am trying to convert it into Dataset
> (json String) using Spark Structured Streaming. I have tried the following.
>
> df2.toJSON().writeStream().foreach(new KafkaSink())
>
> This doesn't seem to work for the following reason.
>
> "Queries with streaming sources must be executed with writeStream.start()"
>
> My dataframe has looks like this
>
> name, ratio, count  // column names
>
> "hello", 1.56, 34
>
> If I try to convert a Row into a Json String it results into something
> like this {"key1", "name", "value1": "hello",  "key2", "ratio", "value2":
> 1.56 , "key3", "count", "value3": 34} but *what I need is something like
> this { result: {"name": "hello", "ratio": 1.56, "count": 34} } however I
> don't have a result column. *
>
> It looks like there are couple of functions to_json and json_tuple but
> they seem to take only one Column as a first argument so should I call
> to_json on every column? Also how would I turn this into DataSet ?
>
> Thanks!
>
>
>
>
>


Help in Parsing 'Categorical' type of data

2017-05-31 Thread Amlan Jyoti
Hi,

I am trying to run Naive Bayes Model using Spark ML libraries, in Java. 
The sample snippet of dataset is given below:

Raw Data -


But, as the input data needs to in numeric, so I am using one-hot-encoder 
on the Gender field[m->0,1][f->1,0]; and the finally the 'features' vector 
is inputted to Model, and I could get the Output.

Transformed Data - 


But the model results are not correct as the 'Gender' field[Originally, 
Categorical] is now considered as a continuous field after one-hot 
encoding transformations. 

Expectation is that - for 'continuous data', mean and variance ; and for 
'categorical data', the number of occurrences of different categories, is 
to be calculated. [In, my case, mean and variances are calculated even for 
the Gender Field].

So, is there any way by which I can indicate to the model that a 
particular data field is 'categorical' by nature?

Thanks

Best Regards
Amlan Jyoti


=-=-=
Notice: The information contained in this e-mail
message and/or attachments to it may contain 
confidential or privileged information. If you are 
not the intended recipient, any dissemination, use, 
review, distribution, printing or copying of the 
information contained in this e-mail message 
and/or attachments to it are strictly prohibited. If 
you have received this communication in error, 
please notify us by reply e-mail or telephone and 
immediately and permanently delete the message 
and any attachments. Thank you




Worker node log not showed

2017-05-31 Thread Paolo Patierno
Hi all,


I have a simple cluster with one master and one worker. On another machine I 
launch the driver where at some point I have following line of codes :


max.foreachRDD(rdd -> {

LOG.info("*** max.foreachRDD");

rdd.foreach(value -> {

LOG.info("*** rdd.foreach");

});
});

The message "*** max.foreachRDD" is visible in the console of the driver 
machine ... and it's ok.
I can't see the "*** rdd.foreach" message that should be executed on the worker 
node right ? Btw on the worker node console I can't see it. Why ?

My need is to log what happens in the code executed on worker node (because it 
works if I execute it on master local[*] but not when submitted to a worker 
node).

Following the log4j.properties file I put in the /conf dir :


# Set everything to be logged to the console
log4j.rootCategory=INFO, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: 
%m%n

# Settings to quiet third party logs that are too verbose
log4j.logger.org.spark-project.jetty=WARN
log4j.logger.org.spark-project.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO


Thanks
Paolo.


Paolo Patierno
Senior Software Engineer (IoT) @ Red Hat
Microsoft MVP on Windows Embedded & IoT
Microsoft Azure Advisor

Twitter : @ppatierno
Linkedin : paolopatierno
Blog : DevExperience


Re: How to convert Dataset to Dataset in Spark Structured Streaming?

2017-05-31 Thread Jules Damji
Hello Kant, 

See is the examples in this blog explains how to deal with your particular 
case: 
https://databricks.com/blog/2017/02/23/working-complex-data-formats-structured-streaming-apache-spark-2-1.html

Cheers
Jules 

Sent from my iPhone
Pardon the dumb thumb typos :)

> On May 30, 2017, at 7:31 PM, kant kodali  wrote:
> 
> Hi All, 
> 
> I have a Dataset and I am trying to convert it into Dataset 
> (json String) using Spark Structured Streaming. I have tried the following.
> 
> df2.toJSON().writeStream().foreach(new KafkaSink())
> This doesn't seem to work for the following reason. 
> 
> "Queries with streaming sources must be executed with writeStream.start()"
> 
> My dataframe has looks like this
> 
> name, ratio, count  // column names
> 
> "hello", 1.56, 34 
> 
> If I try to convert a Row into a Json String it results into something like 
> this {"key1", "name", "value1": "hello",  "key2", "ratio", "value2": 1.56 , 
> "key3", "count", "value3": 34} but what I need is something like this { 
> result: {"name": "hello", "ratio": 1.56, "count": 34} } however I don't have 
> a result column. 
> 
> It looks like there are couple of functions to_json and json_tuple but they 
> seem to take only one Column as a first argument so should I call to_json on 
> every column? Also how would I turn this into DataSet ?
> 
> Thanks!
> 
> 
>