spark metadata metastore bug ?

2022-01-06 Thread Nicolas Paris
Spark can't see hive schema updates partly because it stores the schema
in a weird way in hive metastore.


1. FROM SPARK: create a table

>>> spark.sql("select 1 col1, 2 
>>> col2").write.format("parquet").saveAsTable("my_table")
>>> spark.table("my_table").printSchema()
root
 |-- col1: integer (nullable = true)
 |-- col2: integer (nullable = true)


2. FROM HIVE: alter the schema
==
0: jdbc:hive2://localhost:1> ALTER TABLE my_table REPLACE COLUMNS(`col1` 
int, `col2` int, `col3` string);
0: jdbc:hive2://localhost:1> describe my_table;
+---++--+
| col_name  | data_type  | comment  |
+---++--+
| col1  | int|  |
| col2  | int|  |
| col3  | string |  |
+---++--+


3. FROM SPARK: problem, column does not appear
==
>>> spark.table("my_table").printSchema()
root
 |-- col1: integer (nullable = true)
 |-- col2: integer (nullable = true)


4. FROM METASTORE DB: two ways of storing the columns
==
metastore=# select * from "COLUMNS_V2";
 CD_ID | COMMENT | COLUMN_NAME | TYPE_NAME | INTEGER_IDX
---+-+-+---+-
 2 | | col1| int   |   0
 2 | | col2| int   |   1
 2 | | col3| string|   2


metastore=# select * from "TABLE_PARAMS";
 TBL_ID | PARAM_KEY |   
 PARAM_VALUE

+---+-
---
  1 | spark.sql.sources.provider| parquet
  1 | spark.sql.sources.schema.part.0   | 
{"type":"struct","fields":[{"name":"col1","type":"integer","nullable":true,"metadata":{}},{"name":"col2","type":"integer","n
ullable":true,"metadata":{}}]}
  1 | spark.sql.create.version  | 2.4.8
  1 | spark.sql.sources.schema.numParts | 1
  1 | last_modified_time| 1641483180
  1 | transient_lastDdlTime | 1641483180
  1 | last_modified_by  | anonymous

metastore=# truncate "TABLE_PARAMS";
TRUNCATE TABLE


5. FROM SPARK: now the column magically appears
==
>>> spark.table("my_table").printSchema()
root
 |-- col1: integer (nullable = true)
 |-- col2: integer (nullable = true)
 |-- col3: string (nullable = true)


Then is it necessary to store that stuff in the TABLE_PARAMS ?


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Choice of IDE for Spark

2021-10-01 Thread Nicolas Paris
> With IntelliJ you are OK with Spark & Scala.

also intelliJ as a nice python plugin that turns it into pycharm.


On Thu Sep 30, 2021 at 1:57 PM CEST, Jeff Zhang wrote:
> IIRC, you want an IDE for pyspark on yarn ?
>
> Mich Talebzadeh  于2021年9月30日周四
> 下午7:00写道:
>
> > Hi,
> >
> > This may look like a redundant question but it comes about because of the
> > advent of Cloud workstation usage like Amazon workspaces and others.
> >
> > With IntelliJ you are OK with Spark & Scala. With PyCharm you are fine
> > with PySpark and the virtual environment. Mind you as far as I know PyCharm
> > only executes spark-submit in local mode. For yarn, one needs to open a
> > terminal and submit from there.
> >
> > However, in Amazon workstation, you get Visual Studio Code
> >  (VSC, an MS product) and openoffice
> > installed. With VSC, you get stuff for working with json files but I am not
> > sure with a plugin for Python etc, will it be as good as PyCharm? Has
> > anyone used VSC in anger for Spark and if so what is the experience?
> >
> > Thanks
> >
> >
> >
> >view my Linkedin profile
> > 
> >
> >
> >
> > *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> > loss, damage or destruction of data or any other property which may arise
> > from relying on this email's technical content is explicitly disclaimed.
> > The author will in no case be liable for any monetary damages arising from
> > such loss, damage or destruction.
> >
> >
> >
>
>
> --
> Best Regards
>
> Jeff Zhang


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: AWS EMR SPARK 3.1.1 date issues

2021-08-29 Thread Nicolas Paris
as a workaround turn off pruning :

spark.sql.hive.metastorePartitionPruning false
spark.sql.hive.convertMetastoreParquet false

see 
https://github.com/awslabs/aws-glue-data-catalog-client-for-apache-hive-metastore/issues/45

On Tue Aug 24, 2021 at 9:18 AM CEST, Gourav Sengupta wrote:
> Hi,
>
> I received a response from AWS, this is an issue with EMR, and they are
> working on resolving the issue I believe.
>
> Thanks and Regards,
> Gourav Sengupta
>
> On Mon, Aug 23, 2021 at 1:35 PM Gourav Sengupta <
> gourav.sengupta.develo...@gmail.com> wrote:
>
> > Hi,
> >
> > the query still gives the same error if we write "SELECT * FROM table_name
> > WHERE data_partition > CURRENT_DATE() - INTERVAL 10 DAYS".
> >
> > Also the queries work fine in SPARK 3.0.x, or in EMR 6.2.0.
> >
> >
> > Thanks and Regards,
> > Gourav Sengupta
> >
> > On Mon, Aug 23, 2021 at 1:16 PM Sean Owen  wrote:
> >
> >> Date handling was tightened up in Spark 3. I think you need to compare to
> >> a date literal, not a string literal.
> >>
> >> On Mon, Aug 23, 2021 at 5:12 AM Gourav Sengupta <
> >> gourav.sengupta.develo...@gmail.com> wrote:
> >>
> >>> Hi,
> >>>
> >>> while I am running in EMR 6.3.0 (SPARK 3.1.1) a simple query as "SELECT
> >>> * FROM  WHERE  > '2021-03-01'" the query
> >>> is failing with error:
> >>>
> >>> ---
> >>> pyspark.sql.utils.AnalysisException:
> >>> org.apache.hadoop.hive.metastore.api.InvalidObjectException: Unsupported
> >>> expression '2021 - 03 - 01' (Service: AWSGlue; Status Code: 400; Error
> >>> Code: InvalidInputException; Request ID:
> >>> dd3549c2-2eeb-4616-8dc5-5887ba43dd22; Proxy: null)
> >>>
> >>> ---
> >>>
> >>> The above query works fine in all previous versions of SPARK.
> >>>
> >>> Is this the expected behaviour in SPARK 3.1.1? If so can someone please
> >>> let me know how to write this query.
> >>>
> >>> Also if this is the expected behaviour I think that a lot of users will
> >>> have to make these changes in their existing code making transition to
> >>> SPARK 3.1.1 expensive I think.
> >>>
> >>> Regards,
> >>> Gourav Sengupta
> >>>
> >>


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Why spark-submit works with package not with jar

2020-10-20 Thread Nicolas Paris
you can proceed step by step.

> java.lang.NoClassDefFoundError:
> com/google/api/client/http/HttpRequestInitializer

I would run `grep -lRi HttpRequestInitializer` in the ivy2 folder to
spot the jar containing that class. after several other class not found you
should succeed

Mich Talebzadeh  writes:

> Hi Nicolas,
>
> I removed ~/.iv2 and reran the spark job with the package included (the one
> working)
>
> Under ~/.ivy/jars I Have 37 jar files, including the one that I had before.
>
> /home/hduser/.ivy2/jars> ls
> com.databricks_spark-avro_2.11-4.0.0.jar
>  com.google.cloud.bigdataoss_gcs-connector-1.9.4-hadoop2.jar
> com.google.oauth-client_google-oauth-client-1.24.1.jar
> org.checkerframework_checker-qual-2.5.2.jar
> com.fasterxml.jackson.core_jackson-core-2.9.2.jar
> com.google.cloud.bigdataoss_gcsio-1.9.4.jar
> com.google.oauth-client_google-oauth-client-java6-1.24.1.jar
> org.codehaus.jackson_jackson-core-asl-1.9.13.jar
> com.github.samelamin_spark-bigquery_2.11-0.2.6.jar
>  com.google.cloud.bigdataoss_util-1.9.4.jar
>  commons-codec_commons-codec-1.6.jar
>  org.codehaus.jackson_jackson-mapper-asl-1.9.13.jar
> com.google.api-client_google-api-client-1.24.1.jar
>  com.google.cloud.bigdataoss_util-hadoop-1.9.4-hadoop2.jar
> commons-logging_commons-logging-1.1.1.jar
>  org.codehaus.mojo_animal-sniffer-annotations-1.14.jar
> com.google.api-client_google-api-client-jackson2-1.24.1.jar
> com.google.code.findbugs_jsr305-3.0.2.jar
> com.thoughtworks.paranamer_paranamer-2.3.jar
> org.slf4j_slf4j-api-1.7.5.jar
> com.google.api-client_google-api-client-java6-1.24.1.jar
>  com.google.errorprone_error_prone_annotations-2.1.3.jar
> joda-time_joda-time-2.9.3.jar
>  org.tukaani_xz-1.0.jar
> com.google.apis_google-api-services-bigquery-v2-rev398-1.24.1.jar
> com.google.guava_guava-26.0-jre.jar
> org.apache.avro_avro-1.7.6.jar
> org.xerial.snappy_snappy-java-1.0.5.jar
> com.google.apis_google-api-services-storage-v1-rev135-1.24.1.jar
>  com.google.http-client_google-http-client-1.24.1.jar
>  org.apache.commons_commons-compress-1.4.1.jar
> com.google.auto.value_auto-value-annotations-1.6.2.jar
>  com.google.http-client_google-http-client-jackson2-1.24.1.jar
> org.apache.httpcomponents_httpclient-4.0.1.jar
> com.google.cloud.bigdataoss_bigquery-connector-0.13.4-hadoop2.jar
> com.google.j2objc_j2objc-annotations-1.1.jar
>  org.apache.httpcomponents_httpcore-4.0.1.jar
>
> I don't think I need to add all of these to spark-submit --jars list. Is
> there a way I can find out which dependency is missing
>
> This is the error I am getting when I use the jar file
> * com.github.samelamin_spark-bigquery_2.11-0.2.6.jar* instead of the
> package *com.github.samelamin:spark-bigquery_2.11:0.2.6*
>
> java.lang.NoClassDefFoundError:
> com/google/api/client/http/HttpRequestInitializer
>   at
> com.samelamin.spark.bigquery.BigQuerySQLContext.bq$lzycompute(BigQuerySQLContext.scala:19)
>   at
> com.samelamin.spark.bigquery.BigQuerySQLContext.bq(BigQuerySQLContext.scala:19)
>   at
> com.samelamin.spark.bigquery.BigQuerySQLContext.runDMLQuery(BigQuerySQLContext.scala:105)
>   ... 76 elided
> Caused by: java.lang.ClassNotFoundException:
> com.google.api.client.http.HttpRequestInitializer
>   at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>
>
> Thanks
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Tue, 20 Oct 2020 at 20:09, Nicolas Paris 
> wrote:
>
>> once you got the jars from --package in the ~/.ivy2 folder you can then
>> add the list to --jars . in this way there is no missing dependency.
>>
>>
>> ayan guha  writes:
>>
>> > Hi
>> >
>> > One way to think of this is --packages is better when you have third
>> party
>> > dependency and --jars is better when you have custom in-house built jars.
>> >
>> > On Wed, 21 Oct 2020 at 3:44 am, Mich Talebzadeh <
>> mich.talebza...@gmail.com>
>> > wrote:
>> >
>> >> Thanks Sean and Russell. Much appreciated.
>> >>
>> >> Just to clarify recently I had issues with different versions of Google
>> >> Guava jar files in building Uber jar file (to evict the unwanted ones).
>> >> These used to work a year and half ago us

Re: Why spark-submit works with package not with jar

2020-10-20 Thread Nicolas Paris
once you got the jars from --package in the ~/.ivy2 folder you can then
add the list to --jars . in this way there is no missing dependency.


ayan guha  writes:

> Hi
>
> One way to think of this is --packages is better when you have third party
> dependency and --jars is better when you have custom in-house built jars.
>
> On Wed, 21 Oct 2020 at 3:44 am, Mich Talebzadeh 
> wrote:
>
>> Thanks Sean and Russell. Much appreciated.
>>
>> Just to clarify recently I had issues with different versions of Google
>> Guava jar files in building Uber jar file (to evict the unwanted ones).
>> These used to work a year and half ago using Google Dataproc compute
>> engines (comes with Spark preloaded) and I could create an Uber jar file.
>>
>> Unfortunately this has become problematic now so tried to use spark-submit
>> instead as follows:
>>
>> ${SPARK_HOME}/bin/spark-submit \
>> --master yarn \
>> --deploy-mode client \
>> --conf spark.executor.memoryOverhead=3000 \
>> --class org.apache.spark.repl.Main \
>> --name "Spark shell on Yarn" "$@"
>> --driver-class-path /home/hduser/jars/ddhybrid.jar \
>> --jars /home/hduser/jars/spark-bigquery-latest.jar, \
>>/home/hduser/jars/ddhybrid.jar \
>> --packages com.github.samelamin:spark-bigquery_2.11:0.2.6
>>
>> Effectively tailored spark-shell. However, I do not think there is a
>> mechanism to resolve jar conflicts without  building an Uber jar file
>> through SBT?
>>
>> Cheers
>>
>>
>>
>> On Tue, 20 Oct 2020 at 16:54, Russell Spitzer 
>> wrote:
>>
>>> --jar Adds only that jar
>>> --package adds the Jar and a it's dependencies listed in maven
>>>
>>> On Tue, Oct 20, 2020 at 10:50 AM Mich Talebzadeh <
>>> mich.talebza...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> I have a scenario that I use in Spark submit as follows:
>>>>
>>>> spark-submit --driver-class-path /home/hduser/jars/ddhybrid.jar --jars
>>>> /home/hduser/jars/spark-bigquery-latest.jar,/home/hduser/jars/ddhybrid.jar,
>>>> */home/hduser/jars/spark-bigquery_2.11-0.2.6.jar*
>>>>
>>>> As you can see the jar files needed are added.
>>>>
>>>>
>>>> This comes back with error message as below
>>>>
>>>>
>>>> Creating model test.weights_MODEL
>>>>
>>>> java.lang.NoClassDefFoundError:
>>>> com/google/api/client/http/HttpRequestInitializer
>>>>
>>>>   at
>>>> com.samelamin.spark.bigquery.BigQuerySQLContext.bq$lzycompute(BigQuerySQLContext.scala:19)
>>>>
>>>>   at
>>>> com.samelamin.spark.bigquery.BigQuerySQLContext.bq(BigQuerySQLContext.scala:19)
>>>>
>>>>   at
>>>> com.samelamin.spark.bigquery.BigQuerySQLContext.runDMLQuery(BigQuerySQLContext.scala:105)
>>>>
>>>>   ... 76 elided
>>>>
>>>> Caused by: java.lang.ClassNotFoundException:
>>>> com.google.api.client.http.HttpRequestInitializer
>>>>
>>>>   at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
>>>>
>>>>   at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>>>>
>>>>   at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>>>>
>>>>
>>>>
>>>> So there is an issue with finding the class, although the jar file used
>>>>
>>>>
>>>> /home/hduser/jars/spark-bigquery_2.11-0.2.6.jar
>>>>
>>>> has it.
>>>>
>>>>
>>>> Now if *I remove the above jar file and replace it with the same
>>>> version but package* it works!
>>>>
>>>>
>>>> spark-submit --driver-class-path /home/hduser/jars/ddhybrid.jar --jars
>>>> /home/hduser/jars/spark-bigquery-latest.jar,/home/hduser/jars/ddhybrid.jar
>>>> *-**-packages com.github.samelamin:spark-bigquery_2.11:0.2.6*
>>>>
>>>>
>>>> I have read the write-ups about packages searching the maven
>>>> libraries etc. Not convinced why using the package should make so much
>>>> difference between a failure and success. In other words, when to use a
>>>> package rather than a jar.
>>>>
>>>>
>>>> Any ideas will be appreciated.
>>>>
>>>>
>>>> Thanks
>>>>
>>>>
>>>>
>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>> any loss, damage or destruction of data or any other property which may
>>>> arise from relying on this email's technical content is explicitly
>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>> arising from such loss, damage or destruction.
>>>>
>>>>
>>>>
>>> --
> Best Regards,
> Ayan Guha


-- 
nicolas paris

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Count distinct and driver memory

2020-10-19 Thread Nicolas Paris
> I was caching it because I didn't want to re-execute the DAG when I
> ran the count query. If you have a spark application with multiple
> actions, Spark reexecutes the entire DAG for each action unless there
> is a cache in between. I was trying to avoid reloading 1/2 a terabyte
> of data.  Also, cache should use up executor memory, not driver
> memory.
why not counting the parquet file instead? writing/reading a parquet
files is more efficients than caching in my experience.
if you really need caching you could choose a better strategy such
DISK.

Lalwani, Jayesh  writes:

> I was caching it because I didn't want to re-execute the DAG when I ran the 
> count query. If you have a spark application with multiple actions, Spark 
> reexecutes the entire DAG for each action unless there is a cache in between. 
> I was trying to avoid reloading 1/2 a terabyte of data.  Also, cache should 
> use up executor memory, not driver memory.
>
> As it turns out cache was the problem. I didn't expect cache to take Executor 
> memory and spill over to disk. I don't know why it's taking driver memory. 
> The input data has millions of partitions which results in millions of tasks. 
> Perhaps the high memory usage is a side effect of caching the results of lots 
> of tasks. 
>
> On 10/19/20, 1:27 PM, "Nicolas Paris"  wrote:
>
> CAUTION: This email originated from outside of the organization. Do not 
> click links or open attachments unless you can confirm the sender and know 
> the content is safe.
>
>
>
> > Before I write the data frame to parquet, I do df.cache. After writing
> > the file out, I do df.countDistinct(“a”, “b”, “c”).collect()
> if you write the df to parquet, why would you also cache it ? caching by
> default loads the memory. this might affect  later use, such
> collect. the resulting GC can be explained by both caching and collect
>
>
> Lalwani, Jayesh  writes:
>
> > I have a Dataframe with around 6 billion rows, and about 20 columns. 
> First of all, I want to write this dataframe out to parquet. The, Out of the 
> 20 columns, I have 3 columns of interest, and I want to find how many 
> distinct values of the columns are there in the file. I don’t need the actual 
> distinct values. I just need the count. I knoe that there are around 
> 10-16million distinct values
> >
> > Before I write the data frame to parquet, I do df.cache. After writing 
> the file out, I do df.countDistinct(“a”, “b”, “c”).collect()
> >
> > When I run this, I see that the memory usage on my driver steadily 
> increases until it starts getting future time outs. I guess it’s spending 
> time in GC. Does countDistinct cause this behavior? Does Spark try to get all 
> 10 million distinct values into the driver? Is countDistinct not recommended 
> for data frames with large number of distinct values?
> >
> > What’s the solution? Should I use approx._count_distinct?
>
>
> --
> nicolas paris
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org


-- 
nicolas paris

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Count distinct and driver memory

2020-10-19 Thread Nicolas Paris
> Before I write the data frame to parquet, I do df.cache. After writing
> the file out, I do df.countDistinct(“a”, “b”, “c”).collect()
if you write the df to parquet, why would you also cache it ? caching by
default loads the memory. this might affect  later use, such
collect. the resulting GC can be explained by both caching and collect


Lalwani, Jayesh  writes:

> I have a Dataframe with around 6 billion rows, and about 20 columns. First of 
> all, I want to write this dataframe out to parquet. The, Out of the 20 
> columns, I have 3 columns of interest, and I want to find how many distinct 
> values of the columns are there in the file. I don’t need the actual distinct 
> values. I just need the count. I knoe that there are around 10-16million 
> distinct values
>
> Before I write the data frame to parquet, I do df.cache. After writing the 
> file out, I do df.countDistinct(“a”, “b”, “c”).collect()
>
> When I run this, I see that the memory usage on my driver steadily increases 
> until it starts getting future time outs. I guess it’s spending time in GC. 
> Does countDistinct cause this behavior? Does Spark try to get all 10 million 
> distinct values into the driver? Is countDistinct not recommended for data 
> frames with large number of distinct values?
>
> What’s the solution? Should I use approx._count_distinct?


-- 
nicolas paris

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Time-based frequency table at scale

2020-03-11 Thread Nicolas Paris
Hi,

did you try exploding the arrays, then doing the aggregation/count and
at the end applying a udf to add the 0 values ?
my experience is working on arrays is usually a bad idea.

sakag  writes:

> Hi all, 
>  
> We have a rather interesting use case, and are struggling to come up with an
> approach that scales. Reaching out to seek your expert opinion/feedback and
> tips. 
>  
> What we are trying to do is to find the count of numerical ids over a
> sliding time window where each of our data records has a timestamp and a set
> of numerical ids in the below format. 
>  
> timestamp | ids
> 1  [1,2,3,8]
> 1  [1,2]
> 2  [1,2,3,4]
> 2  [1, 10]
>  
> What we are looking to get as output is:
>  
> timestamp | id_count_map
> 1 | {1: 2, 2: 2, 3: 1, 4:0, 5:0, 6:0, 8:1}
> 2 | {1: 2, 2:1, 3: 1, 4: 1, 5:0, 6:0, 7:0, 8:0, 9:0, 10:1}
>  
> This gives us the frequency of occurrence of these ids over time periods.
> Please note that the output expected is in a dense format.
>  
> However, we are running into scale issues with the data that has these
> characteristics.
>  
> - 500 million records - Total ~100 GB
> - Each record can have 500 elements in the ids column 
> - Max id value (length of id_count_map) is 750K
>  
> We have tried the below approaches to achieve this 
> 1) Expanding ids to a dense, frequency-based vector and then doing a
> row-wise sum over a Window partitioned by timestamp
> 2) Converting ids into a SparseVector and computing the L1 norm (using
> Summarizer) over a Window partitioned by timestamp
> 3) GroupBy/aggregating ids by timestamp, converting to a sparse,
> frequency-based vector using collections.Counter, and expanding to a dense
> format
> 4) GroupBy/aggregating ids by timestamp, converting to a sparse,
> frequency-based vector using CountVectorizer, and then expanding to a dense
> format
>  
> Any other approaches we could try?
>  
> Thanks!
> Sakshi


-- 
nicolas paris

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: SPARK Suitable IDE

2020-03-05 Thread Nicolas Paris


Holden Karau  writes:
> I work in emacs with ensime.

the ensime project was stoped and the project archived. its successor
"metals" works well for scala >=2.12

any good ressource to setup ensime with emacs ? can't wait overall spark
community goes on scala 2.12

--
nicolas paris

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Does dataframe spark API write/create a single file instead of directory as a result of write operation.

2020-02-22 Thread Nicolas PARIS


> Is there any way to save it as raw_csv file as we do in pandas? I have a

I did write such a function for scala. Please take a look at
https://github.com/EDS-APHP/spark-etl/blob/master/spark-csv/src/main/scala/CSVTool.scala
see writeCsvToLocal

it first writes csv to hdfs, and then fetches every csv part into one
local csv with headers.


Kshitij  writes:

> Is there any way to save it as raw_csv file as we do in pandas? I have a
> script that uses the CSV file for further processing.
>
> On Sat, 22 Feb 2020 at 14:31, rahul c  wrote:
>
>> Hi Kshitij,
>>
>> There are option to suppress the metadata files from get created.
>> Set the below properties and try.
>>
>> 1) To disable the transaction logs of spark
>> "spark.sql.sources.commitProtocolClass =
>> org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol".
>> This will help to disable the "committed" and "started" files but
>> still _SUCCESS, _common_metadata and _metadata files will generate.
>>
>> 2) We can disable the _common_metadata and _metadata files using
>> "parquet.enable.summary-metadata=false".
>>
>> 3) We can also disable the _SUCCESS file using
>> "mapreduce.fileoutputcommitter.marksuccessfuljobs=false".
>>
>> On Sat, 22 Feb, 2020, 10:51 AM Kshitij,  wrote:
>>
>>> Hi,
>>>
>>> There is no dataframe spark API which writes/creates a single file
>>> instead of directory as a result of write operation.
>>>
>>> Below both options will create directory with a random file name.
>>>
>>> df.coalesce(1).write.csv()
>>>
>>>
>>>
>>> df.write.csv()
>>>
>>>
>>> Instead of creating directory with standard files (_SUCCESS , _committed
>>> , _started). I want a single file with file_name specified.
>>>
>>>
>>> Thanks
>>>
>>


--
nicolas paris

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Questions about count() performance with dataframes and parquet files

2020-02-18 Thread Nicolas PARIS
> either materialize the Dataframe on HDFS (e.g. parquet or checkpoint)

I wonder if avro is a better candidate for this because it's row
oriented it should be faster to write/read for such a task. Never heard
about checkpoint. 

Enrico Minack  writes:

> It is not about very large or small, it is about how large your
> cluster is w.r.t. your data. Caching is only useful if you have the
> respective memory available across your executors. Otherwise you could
> either materialize the Dataframe on HDFS (e.g. parquet or checkpoint)
> or indeed have to do the join twice. It's a memory-over-CPU trade-off.
>
> Enrico
>
>
> Am 17.02.20 um 22:06 schrieb Nicolas PARIS:
>>> .dropDuplicates() \ .cache() |
>>> Since df_actions is cached, you can count inserts and updates quickly
>>> with only that one join in df_actions:
>> Hi Enrico. I am wondering if this is ok for very large tables ? Is
>> caching faster than recomputing both insert/update ?
>>
>> Thanks
>>
>> Enrico Minack  writes:
>>
>>> Ashley,
>>>
>>> I want to suggest a few optimizations. The problem might go away but
>>> at least performance should improve.
>>> The freeze problems could have many reasons, the Spark UI SQL pages
>>> and stages detail pages would be useful. You can send them privately,
>>> if you wish.
>>>
>>> 1. the repartition(1) should be replaced by coalesce(1). The former
>>> will shuffle all data, while the latter will read in the existing
>>> partitions and not shuffle them again.
>>> 2. Repartitioning to a single partition is discouraged, unless you can
>>> guarantee the data fit into one worker's memory.
>>> 3. You can compute Insert and Update in one go, so that you don't have
>>> to join with df_reference twice.
>>>
>>> |df_actions =
>>> df_source_hashed.alias('a').join(df_reference.alias('b'), pk_list,
>>> how="left") \ .withColumn('|||_action|',
>>> when(col('b.hashkey')||.isNull,
>>> 'Insert').otherwise(col(|||'a.hashkey') != col('b.hashkey'),
>>> 'Update')) \| .select(col('_action'), *df_source_hashed) \
>>> .dropDuplicates() \ .cache() |
>>>
>>> Since df_actions is cached, you can count inserts and updates quickly
>>> with only that one join in df_actions:
>>>
>>> |inserts_count = df_actions|||.where(col('_action') === 
>>> 'Insert')|.count()||updates_count = df_actions|||.where(col('_action') === 
>>> 'Update')|.count()|
>>>
>>> And you can get rid of the union:
>>>
>>> |df_output = df_actions.where(col('_action').isNotNull) |
>>>
>>> If you have to write that output to parquet anyway, then you can get
>>> the count quickly from the parquet file if it is partitioned by the
>>> _action column (Spark then only looks into parquet's metadata to get
>>> the count, it does not read any row):
>>>
>>> |df_output.repartition(1).write.partitionBy('_action').format('parquet').mode('overwrite').save('/path/to/output.parquet')
>>> df_output =
>>> |||sql_context.read.parquet('|/path/to/output.parquet|')
>>> |inserts_count = |||df_output|.where(col('_action') ===
>>> 'Insert').count() updates_count = |||df_output|.where(col('_action')
>>> === 'Update').count() |
>>>
>>> These are all just sketches, but I am sure you get the idea.
>>>
>>> Enrico
>>>
>>>
>>> Am 13.02.20 um 05:08 schrieb Ashley Hoff:
>>>> Hi,
>>>>
>>>> I am currently working on an app using PySpark to produce an insert
>>>> and update daily delta capture, being outputted as Parquet.  This is
>>>> running on a 8 core 32 GB Linux server in standalone mode (set to 6
>>>> worker cores of 2GB memory each) running Spark 2.4.3.
>>>>
>>>> This is being achieved by reading in data from a TSQL database, into
>>>> a dataframe, which has a hash of all records appended to it and
>>>> comparing it to a dataframe from yesterdays data (which has been
>>>> saved also as parquet).
>>>>
>>>> As part of the monitoring and logging, I am trying to count the
>>>> number of records for the respective actions.  Example code:
>>>> |df_source = spark_session.read.format('jdbc'). df_reference =
>>>> sql_context.read.parquet('/path/to/reference.parquet')
>>>> df_source_hashed = df_source.withColumn('hashkey', md5(concat_ws('',
>>>> *df_source.columns))) \ .cache() df_inserts =
>>>> df_source

Re: Questions about count() performance with dataframes and parquet files

2020-02-17 Thread Nicolas PARIS
sorImpl.java:0
>>  => WholeStageCodegen/MapPartitionsRDD [75]count at
>> NativeMethodAccessorImpl.java:0  =>
>> InMemoryTableScan/MapPartitionsRDD [78]count at
>> NativeMethodAccessorImpl.java:0 => MapPartitionsRDD [79]count at
>> NativeMethodAccessorImpl.java:0 =>
>> WholeStageCodegen/MapPartitionsRDD [80]count at
>> NativeMethodAccessorImpl.java:0 => Exchange/MapPartitionsRDD
>> [81]count at NativeMethodAccessorImpl.java:0
>>
>> The other observation I have found that if I remove the counts from
>> the data frame operations and instead open the outputted parquet
>> field and count using a
>> `sql_context.read.load('/path/to/output.parquet').filter(col("_action")
>> == "Insert").count()` command, I am reducing my run-times by around
>> 20 to 30%.  In my feeble mind, opening up the outputs and re-reading
>> them seems counter-intuitive.
>>
>> Is anyone able to give me some guidance on why or how to ensure that
>> I am doing the above as efficiently as possible?
>>
>> Best Regards
>> Ashley


--
nicolas paris

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Ceph / Lustre VS hdfs comparison

2020-02-12 Thread Nicolas PARIS
Hi

Anyone has experience in ceph / lustre as a replacement of hdfs for
spark storage (parquet, orc..)?

Is hdfs still far superior to the former ?

Thanks

-- 
nicolas paris

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



detect idle sparkcontext to release resources

2020-01-23 Thread Nicolas Paris
hi

we have many users on the spark on yarn cluster. most of them forget to
release their sparkcontext after analysis (spark-thrift or pyspark
jupyter kernels).

I wonder how to detect their is no activity on the sparkcontext to kill
them.

Thanks
-- 
nicolas

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Best approach to write UDF

2020-01-21 Thread Nicolas Paris
Hi

I have written spark udf and I am able to use them in spark scala /
pyspark by using the org.apache.spark.sql.api.java.UDFx API.

I d'like to use them in spark-sql thought thrift. I tried to create the
functions "create function as 'org.my.MyUdf'". however I get the below
error when using it:

> org.apache.spark.sql.AnalysisException: No handler for UDF/UDAF/UDTF 
> 'org.my.MyUdf'; 

I have read there (https://stackoverflow.com/a/56970800/3865083) that
only the org.apache.hadoop.hive.ql.exec.UDF API works for thrift. 

How one can write UDF the good way ?

Thanks

-- 
nicolas

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Identify bottleneck

2019-12-20 Thread Nicolas Paris
apparently the "withColumn" issue only apply for hundred or thousand of
calls. This was not the case here (twenty calls)

On Fri, Dec 20, 2019 at 08:53:16AM +0100, Enrico Minack wrote:
> The issue is explained in depth here: https://medium.com/@manuzhang/
> the-hidden-cost-of-spark-withcolumn-8ffea517c015
> 
> Am 19.12.19 um 23:33 schrieb Chris Teoh:
> 
> As far as I'm aware it isn't any better. The logic all gets processed by
> the same engine so to confirm, compare the DAGs generated from both
> approaches and see if they're identical.
> 
> On Fri, 20 Dec 2019, 8:56 am ayan guha,  wrote:
> 
> Quick question: Why is it better to use one sql vs multiple 
> withColumn?
> isnt everything eventually rewritten by catalyst?
> 
> On Wed, 18 Dec 2019 at 9:14 pm, Enrico Minack 
> wrote:
> 
> How many withColumn statements do you have? Note that it is better
> to use a single select, rather than lots of withColumn. This also
> makes drops redundant.
> 
> Reading 25m CSV lines and writing to Parquet in 5 minutes on 32
> cores is really slow. Can you try this on a single machine, i.e.
> run wit "local[*]".
> 
> Can you rule out the writing part by counting the rows? I presume
> this all happens in a single stage.
> 
> Enrico
> 
> 
> Am 18.12.19 um 10:56 schrieb Antoine DUBOIS:
> 
> Hello
> 
> I'm working on an ETL based on csv describing file systems to
> transform it into parquet so I can work on them easily to
> extract informations.
> I'm using Mr. Powers framework Daria to do so. I've quiet
> different input and a lot of transformation and the framework
> helps organize the code.
> I have a stand-alone cluster v2.3.2 composed of 4 node with 8
> cores and 32GB of memory each.
> The storage is handle by a CephFS volume mounted on all nodes.
> First a small description of my algorithm (it's quiet simple):
> 
> 
> Use SparkContext to load the csv.bz2 file,
> Chain a lot of withColumn() statement,
> Drop all unnecessary columns,
> Write parquet file to CephFS
> 
> 
> This treatment can take several hours depending on how much
> lines the CSV is and I wanted to identify if bz2 or network
> could be an issue
> so I run the following test (several time with consistent
> result) :
> I tried the following scenario with 20 cores and 2 core per
> task:
>   ■ Read the csv.bz2 from CephFS with connection with 1Gb/s 
> for
> each node: ~5 minutes.
>   ■ Read the csv.bz2 from TMPFS(setup to look like a shared
> storage space): ~5 minutes.
>   ■ From the 2 previous tests I concluded that uncompressing
> the file was part of the bottleneck so I decided to
> uncompress the file and store it in TMPFS as well, result:
> ~5.9 minutes.
> The test file has 25'833'369 lines and is 370MB compressed and
> 3700MB uncompressed. Those results have been reproduced 
> several
> time each.
> My question here is by what am I bottleneck in this case ?
> 
> I though that the uncompressed file in RAM would be the
> fastest. Is it possible that my program is suboptimal reading
> the CSV ?
> In the execution logs on the cluster I have 5 to 10 seconds GC
> time max, and timeline shows mainly CPU time (no shuffling, no
> randomization overload either).
> I also noticed that memory storage is never used during the
> execution. I know from several hours of research that bz2 is
> the only real compression algorithm usable as an input in 
> spark
> for parallelization reasons.
> 
> Do you have any idea of why such a behaviour ?
> and do you have any idea on how to improve such treatment ?
> 
> Cheers
> 
> Antoine
> 
> 
> 
> --
> Best Regards,
> Ayan Guha
> 
> 

-- 
nicolas

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: SparkR integration with Hive 3 spark-r

2019-11-18 Thread Nicolas Paris
Hi Alfredo

my 2 cents:
To my knowlegde and reading the spark3 pre-release note, it will handle
hive metastore 2.3.5 - no mention of hive 3 metastore. I made several
tests on this in the past[1] and it seems to handle any hive metastore
version.

However spark cannot read hive managed table AKA transactional tables.
So I would say you should be able to read any hive 3 regular table with
any of spark, pyspark or sparkR.


[1] https://parisni.frama.io/posts/playing-with-hive-spark-metastore-versions/

On Mon, Nov 18, 2019 at 11:23:50AM -0600, Alfredo Marquez wrote:
> Hello,
> 
> Our company is moving to Hive 3, and they are saying that there is no SparkR
> implementation in Spark 2.3.x + that will connect to Hive 3.  Is this true?
> 
> If it is true, will this be addressed in the Spark 3 release?
> 
> I don't use python, so losing SparkR to get work done on Hadoop is a huge 
> loss.
> 
> P.S. This is my first email to this community; if there is something I should
> do differently, please let me know.
> 
> Thank you
> 
> Alfredo

-- 
nicolas

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



announce: spark-postgres 3 released

2019-11-10 Thread Nicolas Paris
Hello spark users,

Spark-postgres is designed for reliable and performant ETL in big-data
workload and offer read/write/scd capability . The version 3 introduces
a  datasource API and simplifies the usage. It outperforms sqoop by
factor 8 and the apache spark core jdbc by infinity.

Features:
- use of pg COPY statements
- parallel reads/writes
- use of hdfs to store intermediary csv
- reindex after bulk-loading
- SCD1 computations done on the spark side
- use unlogged tables when needed
- handle arrays and multiline string columns
- useful jdbc functions (ddl, updates...)

The official repository:
https://framagit.org/parisni/spark-etl/tree/master/spark-postgres

And its mirror on microsoft github:
https://github.com/EDS-APHP/spark-etl/tree/master/spark-postgres

-- 
nicolas

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: pyspark - memory leak leading to OOM after submitting 100 jobs?

2019-10-31 Thread Nicolas Paris
have you deactivated the spark.ui ?
I have read several thread explaining the ui can lead to OOM because it
stores 1000 dags by default


On Sun, Oct 20, 2019 at 03:18:20AM -0700, Paul Wais wrote:
> Dear List,
> 
> I've observed some sort of memory leak when using pyspark to run ~100
> jobs in local mode.  Each job is essentially a create RDD -> create DF
> -> write DF sort of flow.  The RDD and DFs go out of scope after each
> job completes, hence I call this issue a "memory leak."  Here's
> pseudocode:
> 
> ```
> row_rdds = []
> for i in range(100):
>   row_rdd = spark.sparkContext.parallelize([{'a': i} for i in range(1000)])
>   row_rdds.append(row_rdd)
> 
> for row_rdd in row_rdds:
>   df = spark.createDataFrame(row_rdd)
>   df.persist()
>   print(df.count())
>   df.write.save(...) # Save parquet
>   df.unpersist()
> 
>   # Does not help:
>   # del df
>   # del row_rdd
> ```
> 
> In my real application:
>  * rows are much larger, perhaps 1MB each
>  * row_rdds are sized to fit available RAM
> 
> I observe that after 100 or so iterations of the second loop (each of
> which creates a "job" in the Spark WebUI), the following happens:
>  * pyspark workers have fairly stable resident and virtual RAM usage
>  * java process eventually approaches resident RAM cap (8GB standard)
> but virtual RAM usage keeps ballooning.
> 
> Eventually the machine runs out of RAM and the linux OOM killer kills
> the java process, resulting in an "IndexError: pop from an empty
> deque" error from py4j/java_gateway.py .
> 
> 
> Does anybody have any ideas about what's going on?  Note that this is
> local mode.  I have personally run standalone masters and submitted a
> ton of jobs and never seen something like this over time.  Those were
> very different jobs, but perhaps this issue is bespoke to local mode?
> 
> Emphasis: I did try to del the pyspark objects and run python GC.
> That didn't help at all.
> 
> pyspark 2.4.4 on java 1.8 on ubuntu bionic (tensorflow docker image)
> 
> 12-core i7 with 16GB of ram and 22GB swap file (swap is *on*).
> 
> Cheers,
> -Paul
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 

-- 
nicolas

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: graphx vs graphframes

2019-10-17 Thread Nicolas Paris
Hi Alastair

Cypher support looks like promising and the dev list thread discussion
is interesting. 
thanks for your feedback. 

On Thu, Oct 17, 2019 at 09:19:28AM +0100, Alastair Green wrote:
> Hi Nicolas, 
> 
> I was following the current thread on the dev channel about Spark
> Graph, including Cypher support, 
> 
> http://apache-spark-developers-list.1001551.n3.nabble.com/
> Add-spark-dependency-on-on-org-opencypher-okapi-shade-okapi-td28118.html
> 
> and I remembered your post.
> 
> Actually, GraphX and GraphFrames are both not being developed actively, so far
> as I can tell. 
> 
> The only activity on GraphX in the last two years was a fix for Scala 2.13
> functionality: to quote the PR 
> 
> 
> ### Does this PR introduce any user-facing change?
> 
> No behavior change at all.
> 
> The only activity on GraphFrames since the addition of Pregel support in Scala
> back in December 2018, has been build/test improvements and recent builds
> against 2.4 and 3.0 snapshots. I’m not sure there was a lot of functional
> change before that either. 
> 
> The efforts to provide graph processing in Spark with the more full-featured
> Cypher query language that you can see in the proposed 3.0 changes discussed 
> in
> the dev list, and the related openCypher/morpheus project (which among many
> other things allows you to cast a Morpheus graph into a GraphX graph) and
> extends the proposed 3.0 changes in a compatible way, are active. 
> 
> Yrs, 
> 
> Alastair
> 
> 
> Alastair Green
> 
> Query Languages Standards and Research
> 
> 
> Neo4j UK Ltd
> 
> Union House
> 182-194 Union Street
> London, SE1 0LH
> 
> 
> +44 795 841 2107
> 
> 
> On Sun, Sep 22, 2019 at 21:17, Nicolas Paris  wrote:
> 
> hi all
> 
> graphframes was intended to replace graphx.
> 
> however the former looks not maintained anymore while the latter is
> still active.
> 
> any thought ?
> --
> nicolas
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 
> 

-- 
nicolas

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



graphx vs graphframes

2019-09-22 Thread Nicolas Paris
hi all

graphframes was intended to replace graphx.

however the former looks not maintained anymore while the latter is
still active.

any thought ?
-- 
nicolas

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Call Oracle Sequence using Spark

2019-08-16 Thread Nicolas Paris


> I have to call Oracle sequence using spark.

You might use jdbc and write your own lib from scala

I did such thing for postgres 
(https://framagit.org/parisni/spark-etl/tree/master/spark-postgres)
see sqlExecWithResultSet


On Thu, Aug 15, 2019 at 10:58:11PM +0530, rajat kumar wrote:
> Hi All,
> 
> I have to call Oracle sequence using spark. Can you pls tell what is the way 
> to
> do that?
> 
> Thanks
> Rajat

-- 
nicolas

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Announcing Delta Lake 0.3.0

2019-08-06 Thread Nicolas Paris
>   • Scala/Java APIs for DML commands - You can now modify data in Delta Lake
> tables using programmatic APIs for Delete, Update and Merge. These APIs
> mirror the syntax and semantics of their corresponding SQL commands and 
> are
> great for many workloads, e.g., Slowly Changing Dimension (SCD) 
> operations,
> merging change data for replication, and upserts from streaming queries.
> See the documentation for more details.

just tested the merge feature on a large table: awesome
- fast to build
- fast to query afterward
- robust (version history is an incredible feature)


thanks


On Thu, Aug 01, 2019 at 06:44:30PM -0700, Tathagata Das wrote:
> Hello everyone, 
> 
> We are excited to announce the availability of Delta Lake 0.3.0 which
> introduces new programmatic APIs for manipulating and managing data in Delta
> Lake tables.
> 
> 
> Here are the main features: 
> 
> 
>   • Scala/Java APIs for DML commands - You can now modify data in Delta Lake
> tables using programmatic APIs for Delete, Update and Merge. These APIs
> mirror the syntax and semantics of their corresponding SQL commands and 
> are
> great for many workloads, e.g., Slowly Changing Dimension (SCD) 
> operations,
> merging change data for replication, and upserts from streaming queries.
> See the documentation for more details.
> 
> 
>   • Scala/Java APIs for query commit history - You can now query a table’s
> commit history to see what operations modified the table. This enables you
> to audit data changes, time travel queries on specific versions, debug and
> recover data from accidental deletions, etc. See the documentation for 
> more
> details.
> 
> 
>   • Scala/Java APIs for vacuuming old files - Delta Lake uses MVCC to enable
> snapshot isolation and time travel. However, keeping all versions of a
> table forever can be prohibitively expensive. Stale snapshots (as well as
> other uncommitted files from aborted transactions) can be garbage 
> collected
> by vacuuming the table. See the documentation for more details.
> 
> 
> To try out Delta Lake 0.3.0, please follow the Delta Lake Quickstart: https://
> docs.delta.io/0.3.0/quick-start.html
> 
> To view the release notes:
> https://github.com/delta-io/delta/releases/tag/v0.3.0
> 
> We would like to thank all the community members for contributing to this
> release.
> 
> TD

-- 
nicolas

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: New Spark Datasource for Hive ACID tables

2019-07-27 Thread Nicolas Paris
Congrats

The read/write feature with hive3 is highly interesting

On Fri, Jul 26, 2019 at 06:07:55PM +0530, Abhishek Somani wrote:
> Hi All,
> 
> We at Qubole have open sourced a datasource that will enable users to work on
> their Hive ACID Transactional Tables using Spark. 
> 
> Github: https://github.com/qubole/spark-acid
> 
> Hive ACID tables allow users to work on their data transactionally, and also
> gives them the ability to Delete, Update and Merge data efficiently without
> having to rewrite all of their data in a table, partition or file. We believe
> that being able to work on these tables from Spark is a much desired value 
> add,
> as is also apparent in https://issues.apache.org/jira/browse/SPARK-15348 and 
> https://issues.apache.org/jira/browse/SPARK-16996 with multiple people looking
> for it. Currently the datasource supports reading from these ACID tables only,
> and we are working on adding the ability to write into these tables via Spark
> as well.
> 
> The datasource is also available as a spark package, and instructions on how 
> to
> use it are available on the Github page.
> 
> We welcome your feedback and suggestions.
> 
> Thanks,
> Abhishek Somani 

-- 
nicolas

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Avro large binary read memory problem

2019-07-23 Thread Nicolas Paris


On Tue, Jul 23, 2019 at 05:10:19PM +, Mario Amatucci wrote:
> https://spark.apache.org/docs/2.2.0/configuration.html#memory-management

thanks for the pointer, however, I tried almost every configuration and
the behavior tends to show that spark keeps things in memory instead of
releasing it 


On Tue, Jul 23, 2019 at 05:10:19PM +, Mario Amatucci wrote:
> https://spark.apache.org/docs/2.2.0/configuration.html#memory-management
> 
> MARIO AMATUCCI 
> Senior Software Engineer 
>  
> Office: +48 12 881 10 05 x 31463   Email: mario_amatu...@epam.com 
> Gdansk, Poland   epam.com 
>  
> ~do more with less~ 
>  
> CONFIDENTIALITY CAUTION AND DISCLAIMER
> This message is intended only for the use of the individual(s) or entity(ies) 
> to which it is addressed and contains information that is legally privileged 
> and confidential. If you are not the intended recipient, or the person 
> responsible for delivering the message to the intended recipient, you are 
> hereby notified that any dissemination, distribution or copying of this 
> communication is strictly prohibited. All unintended recipients are obliged 
> to delete this message and destroy any printed copies. 
>  
> 
> -Original Message-
> From: Nicolas Paris  
> Sent: Tuesday, July 23, 2019 6:56 PM
> To: user@spark.apache.org
> Subject: Avro large binary read memory problem
> 
> Hi
> 
> I have those avro file with the schema id:Long, content:Binary
> 
> the binary are large image with a maximum of 2GB of size.
> 
> I d like to get a subset of row "where id in (...)"
> 
> Sadly I get memory errors even if the subset is 0 of size. It looks like the 
> reader stores the binary information until the heap size or the container is 
> killed by yarn.
> 
> Any idea how to tune the memory management to avoid to get memory problem?
> 
> Thanks
> 
> -- spark 2.4.3
> 
> --
> nicolas
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 

-- 
nicolas

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Avro large binary read memory problem

2019-07-23 Thread Nicolas Paris
Hi

I have those avro file with the schema id:Long, content:Binary

the binary are large image with a maximum of 2GB of size.

I d like to get a subset of row "where id in (...)"

Sadly I get memory errors even if the subset is 0 of size. It looks like
the reader stores the binary information until the heap size or the
container is killed by yarn.

Any idea how to tune the memory management to avoid to get memory
problem?

Thanks

-- spark 2.4.3

-- 
nicolas

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



write csv does not handle \r correctly

2019-07-13 Thread Nicolas Paris
hi

spark 2.4.1

The csv writer does not quote string columns when they contain the \r
carriage return character. It works as expected for both \n and \r\n 

\r is considered as a newline by many parsers, and spark should consider
it as a newline marker.

thanks 
-- 
nicolas

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



timestamp column orc problem with hive

2019-07-13 Thread Nicolas Paris
Hi

spark 2.4.1
hive 1.2.2

The orc files saved as tables from spark are not working correctly with
hive. A "timestampCol is not null" does not work as expected.

The parquet format works as expected for the same input.

Is this a known issue ?

thanks
-- 
nicolas

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: run new spark version on old spark cluster ?

2019-05-21 Thread Nicolas Paris
Hi

I finally got all working. Here are the steps (for information, I am on HDP 
2.6.5):

- copy the old hive-site.xml into the new spark conf folder
- (optional?) donwload the jersey-bundle-1.8.jar and put it into the jars folder
- build a tar gz from all the jars and copy that archive to hdfs with chown 
hdfs:hadoop
- create a spark-default.conf file into conf folder and add the below lines:

> spark.driver.extraLibraryPath /usr/hdp/current/hadoop-client/lib/native
> spark.executor.extraLibraryPath
> /usr/hdp/current/hadoop-client/lib/native
> spark.driver.extraJavaOptions -Dhdp.version=2.6.5.0-292
> spark.yarn.am.extraJavaOptions -Dhdp.version=2.6.5.0-292
> spark.eventLog.dir hdfs:///spark-history
> spark.eventLog.enabled false
> spark.hadoop.yarn.timeline-service.enabled false
> org.apache.spark.deploy.history.FsHistoryProvider
> spark.yarn.containerLauncherMaxThreads 25
> spark.driver.memoryOverhead 200
> spark.executor.memoryOverhead 200
> spark.yarn.max.executor.failures 3
> spark.yarn.preserve.staging.files false
> spark.yarn.queue default
> spark.yarn.scheduler.heartbeat.interval-ms 5000
> spark.yarn.submit.file.replication 3
> spark.yarn.archive hdfs:///hdp/apps/2.6.5.0-292/spark2/spark2.4.tar.gz
> spark.ui.port 4041

then the below command works (included hive, hdfs and yarn):

> bin/spark-shell --master yarn


Thanks for your support,



On Mon, May 20, 2019 at 03:42:46PM -0400, Koert Kuipers wrote:
> most likely have to set something in spark-defaults.conf like
> 
> spark.master yarn
> spark.submit.deployMode client
> 
> On Mon, May 20, 2019 at 3:14 PM Nicolas Paris  
> wrote:
> 
> Finally that was easy to connect to both hive/hdfs. I just had to copy
> the hive-site.xml from the old spark version and that worked instantly
> after unzipping.
> 
> Right now I am stuck on connecting to yarn.
> 
> 
> On Mon, May 20, 2019 at 02:50:44PM -0400, Koert Kuipers wrote:
> > we had very little issues with hdfs or hive, but then we use hive only
> for
> > basic reading and writing of tables.
> >
> > depending on your vendor you might have to add a few settings to your
> > spark-defaults.conf. i remember on hdp you had to set the hdp.version
> somehow.
> > we prefer to build spark with hadoop being provided, and then add hadoop
> > classpath to spark classpath. this works well on cdh, hdp, and also for
> cloud
> > providers.
> >
> > for example this is a typical build with hive for cdh 5 (which is based
> on
> > hadoop 2.6, you change hadoop version based on vendor):
> > dev/make-distribution.sh --name  --tgz -Phadoop-2.6
> -Dhadoop.version=
> > 2.6.0 -Pyarn -Phadoop-provided -Phive
> > add hadoop classpath to the spark classpath in spark-env.sh:
> > export SPARK_DIST_CLASSPATH=$(hadoop classpath)
> >
> > i think certain vendors support multiple "vendor supported" installs, so
> you
> > could also look into that if you are not comfortable with running your
> own
> > spark build.
> >
> > On Mon, May 20, 2019 at 2:24 PM Nicolas Paris 
> wrote:
> >
> >     > correct. note that you only need to install spark on the node you
> launch
> >     it
> >     > from. spark doesnt need to be installed on cluster itself.
> >
> >     That sound reasonably doable for me. My guess is I will have some
> >     troubles to make that spark version work with both hive & hdfs
> installed
> >     on the cluster - or maybe that's finally plug-&-play i don't know.
> >
> >     thanks
> >
> >     On Mon, May 20, 2019 at 02:16:43PM -0400, Koert Kuipers wrote:
> >     > correct. note that you only need to install spark on the node you
> launch
> >     it
> >     > from. spark doesnt need to be installed on cluster itself.
> >     >
> >     > the shared components between spark jobs on yarn are only really
> >     > spark-shuffle-service in yarn and spark-history-server. i have
> found
> >     > compatibility for these to be good. its best if these run latest
> version.
> >     >
> >     > On Mon, May 20, 2019 at 2:02 PM Nicolas Paris <
> nicolas.pa...@riseup.net>
> >     wrote:
> >     >
> >     >     > you will need the spark version you intend to launch with on
> the
> >     machine
> >     >     you
> >     >     > launch from and point to the correct spark-su

Re: run new spark version on old spark cluster ?

2019-05-20 Thread Nicolas Paris
Finally that was easy to connect to both hive/hdfs. I just had to copy
the hive-site.xml from the old spark version and that worked instantly
after unzipping.

Right now I am stuck on connecting to yarn. 


On Mon, May 20, 2019 at 02:50:44PM -0400, Koert Kuipers wrote:
> we had very little issues with hdfs or hive, but then we use hive only for
> basic reading and writing of tables.
> 
> depending on your vendor you might have to add a few settings to your
> spark-defaults.conf. i remember on hdp you had to set the hdp.version somehow.
> we prefer to build spark with hadoop being provided, and then add hadoop
> classpath to spark classpath. this works well on cdh, hdp, and also for cloud
> providers.
> 
> for example this is a typical build with hive for cdh 5 (which is based on
> hadoop 2.6, you change hadoop version based on vendor):
> dev/make-distribution.sh --name  --tgz -Phadoop-2.6 
> -Dhadoop.version=
> 2.6.0 -Pyarn -Phadoop-provided -Phive
> add hadoop classpath to the spark classpath in spark-env.sh:
> export SPARK_DIST_CLASSPATH=$(hadoop classpath)
> 
> i think certain vendors support multiple "vendor supported" installs, so you
> could also look into that if you are not comfortable with running your own
> spark build.
> 
> On Mon, May 20, 2019 at 2:24 PM Nicolas Paris  
> wrote:
> 
> > correct. note that you only need to install spark on the node you launch
> it
> > from. spark doesnt need to be installed on cluster itself.
> 
> That sound reasonably doable for me. My guess is I will have some
> troubles to make that spark version work with both hive & hdfs installed
> on the cluster - or maybe that's finally plug-&-play i don't know.
> 
> thanks
> 
> On Mon, May 20, 2019 at 02:16:43PM -0400, Koert Kuipers wrote:
> > correct. note that you only need to install spark on the node you launch
> it
> > from. spark doesnt need to be installed on cluster itself.
> >
> > the shared components between spark jobs on yarn are only really
> > spark-shuffle-service in yarn and spark-history-server. i have found
> > compatibility for these to be good. its best if these run latest 
> version.
> >
> > On Mon, May 20, 2019 at 2:02 PM Nicolas Paris 
> wrote:
> >
> >     > you will need the spark version you intend to launch with on the
> machine
> >     you
> >     > launch from and point to the correct spark-submit
> >
> >     does this mean to install a second spark version (2.4) on the 
> cluster
> ?
> >
> >     thanks
> >
> >     On Mon, May 20, 2019 at 01:58:11PM -0400, Koert Kuipers wrote:
> >     > yarn can happily run multiple spark versions side-by-side
> >     > you will need the spark version you intend to launch with on the
> machine
> >     you
> >     > launch from and point to the correct spark-submit
> >     >
> >     > On Mon, May 20, 2019 at 1:50 PM Nicolas Paris <
> nicolas.pa...@riseup.net>
> >     wrote:
> >     >
> >     >     Hi
> >     >
> >     >     I am wondering whether that's feasible to:
> >     >     - build a spark application (with sbt/maven) based on spark2.4
> >     >     - deploy that jar on yarn on a spark2.3 based installation
> >     >
> >     >     thanks by advance,
> >     >
> >     >
> >     >     --
> >     >     nicolas
> >     >
> >     >   
>  -
> >     >     To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >     >
> >     >
> >
> >     --
> >     nicolas
> >
> >     
> -
> >     To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >
> >
> 
> --
> nicolas
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 
> 

-- 
nicolas

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: run new spark version on old spark cluster ?

2019-05-20 Thread Nicolas Paris
> correct. note that you only need to install spark on the node you launch it
> from. spark doesnt need to be installed on cluster itself.

That sound reasonably doable for me. My guess is I will have some
troubles to make that spark version work with both hive & hdfs installed
on the cluster - or maybe that's finally plug-&-play i don't know.

thanks

On Mon, May 20, 2019 at 02:16:43PM -0400, Koert Kuipers wrote:
> correct. note that you only need to install spark on the node you launch it
> from. spark doesnt need to be installed on cluster itself.
> 
> the shared components between spark jobs on yarn are only really
> spark-shuffle-service in yarn and spark-history-server. i have found
> compatibility for these to be good. its best if these run latest version.
> 
> On Mon, May 20, 2019 at 2:02 PM Nicolas Paris  
> wrote:
> 
> > you will need the spark version you intend to launch with on the machine
> you
> > launch from and point to the correct spark-submit
> 
> does this mean to install a second spark version (2.4) on the cluster ?
> 
> thanks
> 
> On Mon, May 20, 2019 at 01:58:11PM -0400, Koert Kuipers wrote:
> > yarn can happily run multiple spark versions side-by-side
> > you will need the spark version you intend to launch with on the machine
> you
> > launch from and point to the correct spark-submit
> >
> > On Mon, May 20, 2019 at 1:50 PM Nicolas Paris 
> wrote:
> >
> >     Hi
> >
> >     I am wondering whether that's feasible to:
> >     - build a spark application (with sbt/maven) based on spark2.4
> >     - deploy that jar on yarn on a spark2.3 based installation
> >
> >     thanks by advance,
> >
> >
> >     --
> >     nicolas
> >
> >     
> -
> >     To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >
> >
> 
> --
> nicolas
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 
> 

-- 
nicolas

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: run new spark version on old spark cluster ?

2019-05-20 Thread Nicolas Paris
> you will need the spark version you intend to launch with on the machine you
> launch from and point to the correct spark-submit

does this mean to install a second spark version (2.4) on the cluster ?

thanks

On Mon, May 20, 2019 at 01:58:11PM -0400, Koert Kuipers wrote:
> yarn can happily run multiple spark versions side-by-side
> you will need the spark version you intend to launch with on the machine you
> launch from and point to the correct spark-submit
> 
> On Mon, May 20, 2019 at 1:50 PM Nicolas Paris  
> wrote:
> 
> Hi
> 
> I am wondering whether that's feasible to:
> - build a spark application (with sbt/maven) based on spark2.4
> - deploy that jar on yarn on a spark2.3 based installation
> 
> thanks by advance,
> 
> 
> --
> nicolas
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 
> 

-- 
nicolas

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



run new spark version on old spark cluster ?

2019-05-20 Thread Nicolas Paris
Hi

I am wondering whether that's feasible to:
- build a spark application (with sbt/maven) based on spark2.4
- deploy that jar on yarn on a spark2.3 based installation

thanks by advance,


-- 
nicolas

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: log level in spark

2019-05-11 Thread Nicolas Paris
That's all right, i manage to reduce the log level by removing the
logback dependency in the pom.xml


On Sat, May 11, 2019 at 02:54:49PM +0200, Nicolas Paris wrote:
> Hi 
> 
> I have a spark code source with tests that create sparkSessions.
> 
> I am running spark testing framework.
> 
> My concern is I am not able to configure the log level to INFO.
> 
> I have large debug traces such:
> > DEBUG org.spark_project.jetty.util.Jetty - 
> > java.lang.NumberFormatException: For input string: "unknown
> 
> 
> I have configured my log4j.properties, and also tested to set the
> sparkSession level programatically.
> 
> Any help on configuring log level ? 
> 
> Thanks
> -- 
> nicolas
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 

-- 
nicolas

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



log level in spark

2019-05-11 Thread Nicolas Paris
Hi 

I have a spark code source with tests that create sparkSessions.

I am running spark testing framework.

My concern is I am not able to configure the log level to INFO.

I have large debug traces such:
> DEBUG org.spark_project.jetty.util.Jetty - 
> java.lang.NumberFormatException: For input string: "unknown


I have configured my log4j.properties, and also tested to set the
sparkSession level programatically.

Any help on configuring log level ? 

Thanks
-- 
nicolas

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: pySpark - pandas UDF and binaryType

2019-05-04 Thread Nicolas Paris
hi Gourav,

> And also be aware that pandas UDF does not always lead to better performance
> and sometimes even massively slow performance.

this information is not widely spread. this is good to know. in which 
circumstances is it worst than regular udf ?

> With Grouped Map dont you run into the risk of random memory errors as well?

indeed, that might append if the batched binaries have surprising high
size.

On Sat, May 04, 2019 at 02:25:34AM +0100, Gourav Sengupta wrote:
> And also be aware that pandas UDF does not always lead to better performance
> and sometimes even massively slow performance.
> 
> With Grouped Map dont you run into the risk of random memory errors as well?
> 
> On Thu, May 2, 2019 at 9:32 PM Bryan Cutler  wrote:
> 
> Hi,
> 
> BinaryType support was not added until Spark 2.4.0, see https://
> issues.apache.org/jira/browse/SPARK-23555. Also, pyarrow 0.10.0 or greater
> is require as you saw in the docs.
> 
> Bryan
> 
> On Thu, May 2, 2019 at 4:26 AM Nicolas Paris 
> wrote:
> 
> Hi all
> 
> I am using pySpark 2.3.0 and pyArrow 0.10.0
> 
> I want to apply a pandas-udf on a dataframe with 
> I have the bellow error:
> 
> > Invalid returnType with grouped map Pandas UDFs:
> > StructType(List(StructField(filename,StringType,true),StructField
> (contents,BinaryType,true)))
> > is not supported
> 
> 
> I am missing something ?
> the doc https://spark.apache.org/docs/latest/
> sql-pyspark-pandas-with-arrow.html#supported-sql-types
> says pyArrow 0.10 is minimum to handle binaryType
> 
> here is the code:
> 
> > from pyspark.sql.functions import pandas_udf, PandasUDFType
> >
> > df = sql("select filename, contents from test_binary")
> >
> > @pandas_udf("filename String, contents binary", 
> PandasUDFType.GROUPED_MAP)
> > def transform_binary(pdf):
> >     contents = pdf.contents
> >     return pdf.assign(contents=contents)
> >
> > df.groupby("filename").apply(transform_binary).count()
> 
> Thanks
> --
> nicolas
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 
> 

-- 
nicolas

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



pySpark - pandas UDF and binaryType

2019-05-02 Thread Nicolas Paris
Hi all

I am using pySpark 2.3.0 and pyArrow 0.10.0

I want to apply a pandas-udf on a dataframe with  
I have the bellow error:

> Invalid returnType with grouped map Pandas UDFs:
> StructType(List(StructField(filename,StringType,true),StructField(contents,BinaryType,true)))
> is not supported


I am missing something ? 
the doc 
https://spark.apache.org/docs/latest/sql-pyspark-pandas-with-arrow.html#supported-sql-types
says pyArrow 0.10 is minimum to handle binaryType

here is the code:

> from pyspark.sql.functions import pandas_udf, PandasUDFType
> 
> df = sql("select filename, contents from test_binary")
> 
> @pandas_udf("filename String, contents binary",  PandasUDFType.GROUPED_MAP)
> def transform_binary(pdf):
> contents = pdf.contents
> return pdf.assign(contents=contents)
> 
> df.groupby("filename").apply(transform_binary).count()

Thanks
-- 
nicolas

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: [SQL] 64-bit hash function, and seeding

2019-03-05 Thread Nicolas Paris
Hi Huon

Good catch. A 64 bit hash is definitely a useful function.

> the birthday paradox implies  >50% chance of at least one for tables larger 
> than 77000 rows

Do you know how many rows to have 50% chances for a 64 bit hash ?


About the seed column, to me there is no need for such an argument: you
just can add an integer as a regular column.

About the process for pull requests, I cannot help much


On Tue, Mar 05, 2019 at 04:30:31AM +, huon.wil...@data61.csiro.au wrote:
> Hi,
> 
> I’m working on something that requires deterministic randomness, i.e. a row 
> gets the same “random” value no matter the order of the DataFrame. A seeded 
> hash seems to be the perfect way to do this, but the existing hashes have 
> various limitations:
> 
> - hash: 32-bit output (only 4 billion possibilities will result in a lot of 
> collisions for many tables: the birthday paradox implies  >50% chance of at 
> least one for tables larger than 77000 rows)
> - sha1/sha2/md5: single binary column input, string output
> 
> It seems there’s already support for a 64-bit hash function that can work 
> with an arbitrary number of arbitrary-typed columns: XxHash64, and exposing 
> this for DataFrames seems like it’s essentially one line in 
> sql/functions.scala to match `hash` (plus docs, tests, function registry 
> etc.):
> 
> def hash64(cols: Column*): Column = withExpr { new 
> XxHash64(cols.map(_.expr)) }
> 
> For my use case, this can then be used to get a 64-bit “random” column like 
> 
> val seed = rng.nextLong()
> hash64(lit(seed), col1, col2)
> 
> I’ve created a (hopefully) complete patch by mimicking ‘hash’ at 
> https://github.com/apache/spark/compare/master...huonw:hash64; should I open 
> a JIRA and submit it as a pull request?
> 
> Additionally, both hash and the new hash64 already have support for being 
> seeded, but this isn’t exposed directly and instead requires something like 
> the `lit` above. Would it make sense to add overloads like the following?
> 
> def hash(seed: Int, cols: Columns*) = …
> def hash64(seed: Long, cols: Columns*) = …
> 
> Though, it does seem a bit unfortunate to be forced to pass the seed first.
> 
> - Huon
> 
>  
> 


> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org


-- 
nicolas

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Connect to hive 3 from spark

2019-03-04 Thread Nicolas Paris
Hi all

Do anybody knows if spark spark able to connect to hive metastore for
hive 3  (metastore v3)? 

I know spark cannot deal with transactional tables, however I wonder if
at least it can read/write non-transactional tables from hive 3.

Thanks

-- 
nicolas

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Postgres Read JDBC with COPY TO STDOUT

2018-12-31 Thread Nicolas Paris
The resulting library is on github: https://github.com/EDS-APHP/spark-postgres
While there is room for improvements it is also able to read/write postgres
data with the COPY statement allowing reading/writing **very large** tables
without problems.


On Sat, Dec 29, 2018 at 01:06:00PM +0100, Nicolas Paris wrote:
> Hi
> 
> The spark postgres JDBC reader is limited because it relies on basic
> SELECT statements with fetchsize and crashes on large tables even if
> multiple partitions are setup with lower/upper bounds.
> 
> I am about writing a new postgres JDBC reader based on "COPY TO STDOUT".
> It would stream the data and produce CSV on the fileSystem (hdfs or
> local).  The CSV would be then parsed with the spark CSV reader to
> produce a dataframe. It would send multiple "COPY TO STDOUT" for each
> executor.
> 
> Right now, I am able to loop over an output stream and write the string
> somewhere.
> I am wondering what would be the best way to process the resulting
> string stream. In particular the best way to direct it to a hdfs folder
> or maybe parse it on the fly into a dataframe.
> 
> Thanks,
> 
> -- 
> nicolas
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 

-- 
nicolas

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Postgres Read JDBC with COPY TO STDOUT

2018-12-29 Thread Nicolas Paris
Hi

The spark postgres JDBC reader is limited because it relies on basic
SELECT statements with fetchsize and crashes on large tables even if
multiple partitions are setup with lower/upper bounds.

I am about writing a new postgres JDBC reader based on "COPY TO STDOUT".
It would stream the data and produce CSV on the fileSystem (hdfs or
local).  The CSV would be then parsed with the spark CSV reader to
produce a dataframe. It would send multiple "COPY TO STDOUT" for each
executor.

Right now, I am able to loop over an output stream and write the string
somewhere.
I am wondering what would be the best way to process the resulting
string stream. In particular the best way to direct it to a hdfs folder
or maybe parse it on the fly into a dataframe.

Thanks,

-- 
nicolas

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: jdbc spark streaming

2018-12-28 Thread Nicolas Paris
Hi

The RDBMS context is quite broad: It has both large facts tables with
billion rows as well as hundreds of small normalized tables. Depending
on the spark transformation, the source data can be one or multiple
tables, as well as few rows, million or even billion of them. When new
data is inserted in some tables, it should trigger a spark job that
might fetch data from old and even static related tables. In most case,
the joins are made in Spark, not in the RDBMS to let it quiet. In all
cases, the sooner/faster the spark jobs get the data the better it is.

I have explored four ways right now : CDC, Batch, spark Streaming and
also apache livy.

CDC (such debezium) looks interesting. It can be combined with triggers
to populate some table to be then fetched with spark streaming; kafka
and so on. However this approach is quite complex and add some
processing/storage on the RDBMS side.

Batch is simple. However as said, it's quite slow and resource
consuming for both RDBMS and spark cluster.

Spark Streaming, is faster than batch, but more difficult to maintain
(to me). It impacts frequently the RDBMS.

Apache Livy: looks the best. The rest API allows to trigger ready and
sized spark contexts. Even better, it allows to trigger the job from the
client application that loads the RDBMS, just after the RDBMS was
populated. Finally, this is also flexible since it can handle any
worlkoad and also py/R/scala spark.


On Fri, Dec 28, 2018 at 05:41:51PM +, Thakrar, Jayesh wrote:
> Yes, you can certainly use spark streaming, but reading from the original 
> source table may still be time consuming and resource intensive.
> 
> Having some context on the RDBMS platform, data size/volumes involved and the 
> tolerable lag (between changes being created and it being processed by Spark) 
> will help people give you better recommendations/best practices.
> 
> All the same, one approach is to create triggers on the source table and 
> insert data into a different table and then read from there.
> Another approach is to push the delta data into something like Kafka and then 
> use Spark streaming against that.
> Taking that Kafka approach further, you can capture the delta upstream so 
> that the processing that pushes it into the RDBMS can also push it to Kafka 
> directly.
> 
> On 12/27/18, 4:52 PM, "Nicolas Paris"  wrote:
> 
> Hi
> 
> I have this living RDBMS and I d'like to apply a spark job on several
> tables once new data get in.
> 
> I could run batch spark jobs thought cron jobs every minutes. But the
> job takes time and resources to begin (sparkcontext, yarn)
> 
> I wonder if I could run one instance of a spark streaming job to save
> those resources. However I haven't seen about structured streaming from
> jdbc source in the documentation.
> 
> Any recommendation ?
> 
> 
> -- 
> nicolas
> 
> 
> 

-- 
nicolas

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



jdbc spark streaming

2018-12-27 Thread Nicolas Paris
Hi

I have this living RDBMS and I d'like to apply a spark job on several
tables once new data get in.

I could run batch spark jobs thought cron jobs every minutes. But the
job takes time and resources to begin (sparkcontext, yarn)

I wonder if I could run one instance of a spark streaming job to save
those resources. However I haven't seen about structured streaming from
jdbc source in the documentation.

Any recommendation ?


-- 
nicolas

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: streaming pdf

2018-11-19 Thread Nicolas Paris
On Mon, Nov 19, 2018 at 07:23:10AM +0100, Jörn Franke wrote:
> Why does it have to be a stream?
> 

Right now I manage the pipelines as spark batch processing. Mooving to
stream would add some improvements such:
- simplification of the pipeline
- more frequent data ingestion
- better resource management (?)


On Mon, Nov 19, 2018 at 07:23:10AM +0100, Jörn Franke wrote:
> Why does it have to be a stream?
> 
> > Am 18.11.2018 um 23:29 schrieb Nicolas Paris :
> > 
> > Hi
> > 
> > I have pdf to load into spark with at least 
> > format. I have considered some options:
> > 
> > - spark streaming does not provide a native file stream for binary with
> >  variable size (binaryRecordStream specifies a constant size) and I
> >  would have to write my own receiver.
> > 
> > - Structured streaming allows to process avro/parquet/orc files
> >  containing pdfs, but this makes things more complicated than
> >  monitoring a simple folder  containing pdfs
> > 
> > - Kafka is not designed to handle messages > 100KB, and for this reason
> >  it is not a good option to use in the stream pipeline.
> > 
> > Somebody has a suggestion ?
> > 
> > Thanks,
> > 
> > -- 
> > nicolas
> > 
> > -
> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> > 
> 

-- 
nicolas

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



streaming pdf

2018-11-18 Thread Nicolas Paris
Hi

I have pdf to load into spark with at least 
format. I have considered some options:

- spark streaming does not provide a native file stream for binary with
  variable size (binaryRecordStream specifies a constant size) and I
  would have to write my own receiver.

- Structured streaming allows to process avro/parquet/orc files
  containing pdfs, but this makes things more complicated than
  monitoring a simple folder  containing pdfs

- Kafka is not designed to handle messages > 100KB, and for this reason
  it is not a good option to use in the stream pipeline.

Somebody has a suggestion ?

Thanks,

-- 
nicolas

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: How to avoid long-running jobs blocking short-running jobs

2018-11-03 Thread Nicolas Paris
On Sat, Nov 03, 2018 at 02:04:01AM -0700, conner wrote:
> My solution is to find a good way to divide the spark cluster resource
> into two. 

What about yarn and its queue management system ?

-- 
nicolas

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Process Million Binary Files

2018-10-11 Thread Nicolas PARIS
Hi Joel

I built such pipeline to transform pdf-> text
https://github.com/EDS-APHP/SparkPdfExtractor
You can take a look

It transforms 20M pdfs in 2 hours on a 5 node spark cluster 

Le 2018-10-10 23:56, Joel D a écrit :
> Hi,
> 
> I need to process millions of PDFs in hdfs using spark. First I’m
> trying with some 40k files. I’m using binaryFiles api with which
> I’m facing couple of issues:
> 
> 1. It creates only 4 tasks and I can’t seem to increase the
> parallelism there. 
> 2. It took 2276 seconds and that means for millions of files it will
> take ages to complete. I’m also expecting it to fail for million
> records with some timeout or gc overhead exception.
> 
> Val files = sparkSession.sparkContext.binaryFiles(filePath, 200).cache
> 
> Val fileContentRdd = files.map(file => myFunc(file)
> 
> Do you have any guidance on how I can process millions of files using
> binaryFiles api?
> 
> How can I increase the number of tasks/parallelism during the creation
> of files rdd?
> 
> Thanks

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: csv reader performance with multiline option

2018-08-18 Thread Nicolas Paris
Hi

yes, multiline would only use one thread in that case.

The csv parser used by spark is uniVocity


Le 18 août 2018 à 18:07, Nirav Patel écrivait :
> does enabling 'multiLine' option impact performance? how? would it run read
> entire file with just one thread?
> 
> Thanks
> 
> 
> 
> What's New with Xactly
> 
> [insta]  [linkedin]  [twitter]  [facebook]  [youtube]

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Best way to process this dataset

2018-06-19 Thread Nicolas Paris
Hi Raymond

Spark works well on single machine too, since it benefits from multiple
core.
The csv parser is based on univocity and you might use the
"spark.read.csc" syntax instead of using the rdd api;

>From my experience, this will better than any other csv  parser

2018-06-19 16:43 GMT+02:00 Raymond Xie :

> Thank you Matteo, Askash and Georg:
>
> I am attempting to get some stats first, the data is like:
>
> 1,4152983,2355072,pv,1511871096
>
> I like to find out the count of Key of (UserID, Behavior Type)
>
> val bh_count = 
> sc.textFile("C:\\RXIE\\Learning\\Data\\Alibaba\\UserBehavior\\UserBehavior.csv").map(_.split(",")).map(x
>  => ((x(0).toInt,x(3)),1)).groupByKey()
>
> This shows me:
> scala> val first = bh_count.first
> [Stage 1:>  (0 +
> 1) / 1]2018-06-19 10:41:19 WARN  Executor:66 - Managed memory leak
> detected; size = 15848112 bytes, TID = 110
> first: ((Int, String), Iterable[Int]) = ((878310,pv),CompactBuffer(1, 1,
> 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
> 1, 1))
>
>
> *Note this environment is: Windows 7 with 32GB RAM. (I am firstly running
> it in Windows where I have more RAM instead of Ubuntu so the env differs to
> what I said in the original email)*
> *Dataset is 3.6GB*
>
> *Thank you very much.*
> **
> *Sincerely yours,*
>
>
> *Raymond*
>
> On Tue, Jun 19, 2018 at 4:04 AM, Matteo Cossu  wrote:
>
>> Single machine? Any other framework will perform better than Spark
>>
>> On Tue, 19 Jun 2018 at 09:40, Aakash Basu 
>> wrote:
>>
>>> Georg, just asking, can Pandas handle such a big dataset? If that data
>>> is further passed into using any of the sklearn modules?
>>>
>>> On Tue, Jun 19, 2018 at 10:35 AM, Georg Heiler <
>>> georg.kf.hei...@gmail.com> wrote:
>>>
 use pandas or dask

 If you do want to use spark store the dataset as parquet / orc. And
 then continue to perform analytical queries on that dataset.

 Raymond Xie  schrieb am Di., 19. Juni 2018 um
 04:29 Uhr:

> I have a 3.6GB csv dataset (4 columns, 100,150,807 rows), my
> environment is 20GB ssd harddisk and 2GB RAM.
>
> The dataset comes with
> User ID: 987,994
> Item ID: 4,162,024
> Category ID: 9,439
> Behavior type ('pv', 'buy', 'cart', 'fav')
> Unix Timestamp: span between November 25 to December 03, 2017
>
> I would like to hear any suggestion from you on how should I process
> the dataset with my current environment.
>
> Thank you.
>
> **
> *Sincerely yours,*
>
>
> *Raymond*
>

>>>
>


Re: Dataframe from 1.5G json (non JSONL)

2018-06-05 Thread Nicolas Paris
IMO your json cannot be read in parallell at all  then spark only offers you
to play again with memory.

I d'say at one step it has to feet in both one executor and in the driver.
I d'try something like 20GB for both driver and executors and by using
dynamic amount of executor in order to then repartition that fat json.




2018-06-05 22:40 GMT+02:00 raksja :

> Yes I would say thats the first thing that i tried. thing is even though i
> provide more num executor and more memory to each, this process gets OOM in
> only one task which is stuck and unfinished.
>
> I dont think its splitting the load to other tasks.
>
> I had 11 blocks on that file i stored in hdfs and i got 11 partitions in my
> dataframe, when i did show(1), it spinned up 11 tasks, 10 passed quickly 1
> stuck and oom.
>
> Also i repartitioned to 1000 and that didnt help either.
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Dataframe from 1.5G json (non JSONL)

2018-06-05 Thread Nicolas Paris
have you played with driver/executor memory configuration ?

Increasing them should avoid OOM

2018-06-05 22:30 GMT+02:00 raksja :

> Agreed, gzip or non splittable, the question that i have and examples i
> have
> posted above all are referring to non compressed file. A single json file
> with Array of objects in a continuous line.
>
>
>
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: PySpark API on top of Apache Arrow

2018-05-26 Thread Nicolas Paris
hi corey

not familiar with arrow, plasma. However recently read an article about
spark on
a standalone machine (your case). Sounds like you could take benefit of
pyspark
"as-is"

https://databricks.com/blog/2018/05/03/benchmarking-apache-spark-on-a-single-node-machine.html

regars,

2018-05-23 22:30 GMT+02:00 Corey Nolet :

> Please forgive me if this question has been asked already.
>
> I'm working in Python with Arrow+Plasma+Pandas Dataframes. I'm curious if
> anyone knows of any efforts to implement the PySpark API on top of Apache
> Arrow directly. In my case, I'm doing data science on a machine with 288
> cores and 1TB of ram.
>
> It would make life much easier if I was able to use the flexibility of the
> PySpark API (rather than having to be tied to the operations in Pandas). It
> seems like an implementation would be fairly straightforward using the
> Plasma server and object_ids.
>
> If you have not heard of an effort underway to accomplish this, any
> reasons why it would be a bad idea?
>
>
> Thanks!
>


Re:

2018-05-16 Thread Nicolas Paris
Hi

I would go for a regular mysql bulkload. I m saying writing an output
that mysql is able to load in one process. I d'say spark jdbc is ok for
small fetch/load. When comes large RDBMS call, it turns out using the
regular optimized API is better than jdbc

2018-05-16 16:18 GMT+02:00 Vadim Semenov :

> Upon downsizing to 20 partitions some of your partitions become too big,
> and I see that you're doing caching, and executors try to write big
> partitions to disk, but fail because they exceed 2GiB
>
> > Caused by: java.lang.IllegalArgumentException: Size exceeds
> Integer.MAX_VALUE
> at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:869)
> at org.apache.spark.storage.DiskStore$$anonfun$getBytes$4.apply
> (DiskStore.scala:125)
> at org.apache.spark.storage.DiskStore$$anonfun$getBytes$4.apply
> (DiskStore.scala:124)
>
> You can try to coalesce to 100 and reduce the number of executors to keep
> the load on MySQL reasonable
>
> On Wed, May 16, 2018 at 5:36 AM, Davide Brambilla <
> davide.brambi...@contentwise.tv> wrote:
>
>> Hi all,
>>we have a dataframe with 1000 partitions and we need to write the
>> dataframe into a MySQL using this command:
>>
>> df.coalesce(20)
>> df.write.jdbc(url=url,
>>   table=table,
>>   mode=mode,
>>   properties=properties)
>>
>> and we get this errors randomly
>>
>> java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
>> at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:869)
>> at org.apache.spark.storage.DiskStore$$anonfun$getBytes$4.apply
>> (DiskStore.scala:125)
>> at org.apache.spark.storage.DiskStore$$anonfun$getBytes$4.apply
>> (DiskStore.scala:124)
>> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1337)
>> at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:126)
>> at org.apache.spark.storage.BlockManager.getLocalValues(BlockMa
>> nager.scala:520)
>> at org.apache.spark.storage.BlockManager.get(BlockManager.scala:693)
>> at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockM
>> anager.scala:753)
>> at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
>> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsR
>> DD.scala:38)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsR
>> DD.scala:38)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsR
>> DD.scala:38)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsR
>> DD.scala:38)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsR
>> DD.scala:38)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsR
>> DD.scala:38)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>> at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMap
>> Task.scala:96)
>> at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMap
>> Task.scala:53)
>> at org.apache.spark.scheduler.Task.run(Task.scala:108)
>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
>> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool
>> Executor.java:1149)
>> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo
>> lExecutor.java:624)
>> at java.lang.Thread.run(Thread.java:748)
>>
>> Driver stacktrace:
>> at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$sch
>> eduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1690)
>> at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$
>> 1.apply(DAGScheduler.scala:1678)
>> at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$
>> 1.apply(DAGScheduler.scala:1677)
>> at scala.collection.mutable.ResizableArray$class.foreach(Resiza
>> bleArray.scala:59)
>> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>> at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGSchedu
>> ler.scala:1677)
>> at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskS
>> etFailed$1.apply(DAGScheduler.scala:855)
>> at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskS
>> etFailed$1.apply(DAGScheduler.scala:855)
>> at scala.Option.foreach(Option.scala:257)
>> at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(
>> DAGScheduler.scala:855)
>> at 

Re: Best practices for dealing with large no of PDF files

2018-04-23 Thread Nicolas Paris
guys
here the illustration

https://github.com/parisni/SparkPdfExtractor

Please add issues if any questions or improvement ideas

Enjoy

Cheers

2018-04-23 20:42 GMT+02:00 unk1102 :

> Thanks much Nicolas really appreciate it.
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Best practices for dealing with large no of PDF files

2018-04-23 Thread Nicolas Paris
sure then let me recap steps:
1. load pdfs in a local folder to hdfs avro
2. load avro in spark as a RDD
3. apply pdfbox to each csv and return content as string
4. write the result as a huge csv file

That's some work guys for me to push all that. Should find some time
however within 7 days

@unk1102: this won't cover the colors and formatting aspects then you could
play with pdfbox until I release
the other parts

Cheers

2018-04-23 19:34 GMT+02:00 Deepak Sharma :

> Yes Nicolas.
> It would be great hell if you can push code to github and share URL.
>
> Thanks
> Deepak
>
>
> On Mon, Apr 23, 2018, 23:00 unk1102  wrote:
>
>> Hi Nicolas thanks much for guidance it was very useful information if you
>> can
>> push that code to github and share url it would be a great help. Looking
>> forward. If you can find time to push early it would be even greater help
>> as
>> I have to finish POC on this use case ASAP.
>>
>>
>>
>> --
>> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>


Re: Best practices for dealing with large no of PDF files

2018-04-23 Thread Nicolas Paris
2018-04-23 18:59 GMT+02:00 unk1102 :

> Hi Nicolas thanks much for the reply. Do you have any sample code
> somewhere?
>

​I have some open-source code. I could find time to push on github if
needed.​



> Do your just keep pdf in avro binary all the time?


​yes, I store them. Actually, I did that one time for 50M pdf, and the
daily 100K and each run is
archived on hdfs so that I can query them with hive in a table with
multiple avro files ​



> How often you parse into
> text using pdfbox?


​Each time I improve my pdfbox extractor program. say...one time a year
maybe ​



> Is it on demand basis or you always parse as text and
> keep pdf as binary in avro as just interim state?
>


​Can be both.  Also, I store them into an orc file for an other use case
with a webservice
on top of that to share the pdfs. That table is 4TO and contains 50M pdfs.
It gets MERGED
every day with the new 100K pdf, thanks to HIVE merge and ORC acid
capabilities​


Re: Best practices for dealing with large no of PDF files

2018-04-23 Thread Nicolas Paris
Hi

Problem is number of files on hadoop;


I deal with 50M pdf files. What I did is to put them in an avro table on
hdfs,
as a binary column.

Then I read it with spark and push that into pdfbox.

Transforming 50M pdfs into text took 2hours on a 5 computers clusters

About colors and formating, I guess pdfbox is able to get that information
and then maybe you could add html balise in your txt output.
That's some extra work indeed




2018-04-23 18:25 GMT+02:00 unk1102 :

> Hi I need guidance on dealing with large no of pdf files when using Hadoop
> and Spark. Can I store as binaryFiles using sc.binaryFiles and then convert
> it to text using pdf parsers like Apache Tika or PDFBox etc or I convert it
> into text using these parsers and store it as text files but in doing so I
> am loosing colors, formatting etc Please guide.
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Accessing Hive Database (On Hadoop) using Spark

2018-04-15 Thread Nicolas Paris
Hi

Sounds your configuration files are not well filed.
What does :

spark.sql("SHOW DATABASES").show();

outputs ?
If you only have default database, such investigation there should help
https://stackoverflow.com/questions/47257680/unable-to-get-existing-hive-tables-from-hivecontext-using-spark


2018-04-15 18:14 GMT+02:00 Rishikesh Gawade :

> Hello there. I am a newbie in the world of Spark. I have been working on a
> Spark Project using Java.
> I have configured Hive and Spark to run on Hadoop.
> As of now i have created a Hive (derby) database on Hadoop HDFS at the
> given location(warehouse location): */user/hive/warehouse *and database
> name as : *spam *(saved as *spam.db* at the aforementioned location).
> I have been trying to read tables in this database in spark to create
> RDDs/DataFrames.
> Could anybody please guide me in how I can achieve this?
> I used the following statements in my Java Code:
>
> SparkSession spark = SparkSession
> .builder()
> .appName("Java Spark Hive Example").master("yarn")
> .config("spark.sql.warehouse.dir","/user/hive/warehouse")
> .enableHiveSupport()
> .getOrCreate();
> spark.sql("USE spam");
> spark.sql("SELECT * FROM spamdataset").show();
>
> After this i built the project using Maven as follows: mvn clean package
> -DskipTests and a JAR was generated.
>
> After this, I tried running the project via spark-submit CLI using :
>
> spark-submit --class com.adbms.SpamFilter --master yarn
> ~/IdeaProjects/mlproject/target/mlproject-1.0-SNAPSHOT.jar
>
> and got the following error:
>
> Exception in thread "main" 
> org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException:
> Database 'spam' not found;
> at org.apache.spark.sql.catalyst.catalog.SessionCatalog.org$
> apache$spark$sql$catalyst$catalog$SessionCatalog$$requireDbExists(
> SessionCatalog.scala:174)
> at org.apache.spark.sql.catalyst.catalog.SessionCatalog.
> setCurrentDatabase(SessionCatalog.scala:256)
> at org.apache.spark.sql.execution.command.SetDatabaseCommand.run(
> databases.scala:59)
> at org.apache.spark.sql.execution.command.ExecutedCommandExec.
> sideEffectResult$lzycompute(commands.scala:70)
> at org.apache.spark.sql.execution.command.ExecutedCommandExec.
> sideEffectResult(commands.scala:68)
> at org.apache.spark.sql.execution.command.ExecutedCommandExec.
> executeCollect(commands.scala:79)
> at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:190)
> at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:190)
> at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3253)
> at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(
> SQLExecution.scala:77)
> at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3252)
> at org.apache.spark.sql.Dataset.(Dataset.scala:190)
> at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:75)
> at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:638)
> at com.adbms.SpamFilter.main(SpamFilter.java:54)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at sun.reflect.NativeMethodAccessorImpl.invoke(
> NativeMethodAccessorImpl.java:62)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at org.apache.spark.deploy.JavaMainApplication.start(
> SparkApplication.scala:52)
> at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$
> deploy$SparkSubmit$$runMain(SparkSubmit.scala:879)
> at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:197)
> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:227)
> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:136)
> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>
>
> I request you to please check this and if anything is wrong then please
> suggest an ideal way to read Hive tables on Hadoop in Spark using Java. A
> link to a webpage having relevant info would also be appreciated.
> Thank you in anticipation.
> Regards,
> Rishikesh Gawade
>
>


Re: Does Pyspark Support Graphx?

2018-02-18 Thread Nicolas Paris
> Most likely not as most of the effort is currently on GraphFrames  - a great
> blog post on the what GraphFrames offers can be found at: https://

Is the graphframes package still active ? The github repository
indicates it's not extremelly active. Right now, there is no available
package for spark-2.2 so that one need to compile it from sources.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: ML:One vs Rest with crossValidator for multinomial in logistic regression

2018-02-09 Thread Nicolas Paris
Brian

This is absolutely this problem.

Good to hear it will be fix in 2.3 release


Le 09 févr. 2018 à 02:17, Bryan Cutler écrivait :
> Nicolas, are you referring to printing the model params in that example with
> "print(model1.extractParamMap())"?  There was a problem with pyspark models 
> not
> having params after being fit, which causes this example to show nothing for
> model paramMaps.  This was fixed in https://issues.apache.org/jira/browse/
> SPARK-10931 and the example now shows all model params.  The fix will be in 
> the
> Spark 2.3 release.
> 
> Bryan
> 
> On Wed, Jan 31, 2018 at 10:20 PM, Nicolas Paris <nipari...@gmail.com> wrote:
> 
> Hey
> 
> I am also interested in how to get those parameters.
> For example, the demo code spark-2.2.1-bin-hadoop2.7/examples/src/main/
> python/ml/estimator_transformer_param_example.py
> return empty parameters when  printing "lr.extractParamMap()"
> 
> That's weird
> 
> Thanks
> 
> Le 30 janv. 2018 à 23:10, Bryan Cutler écrivait :
> > Hi Michelle,
> >
> > Your original usage of ParamGridBuilder was not quite right, `addGrid`
> expects
> > (some parameter, array of values for that parameter).  If you want to do
> a grid
> > search with different regularization values, you would do the following:
> >
> > val paramMaps = new ParamGridBuilder().addGrid(logist.regParam, Array
> (0.1,
> > 0.3)).build()
> >
> > * don't forget to build the grid after adding values
> >
> > On Tue, Jan 30, 2018 at 6:55 AM, michelleyang <
> michelle1026sh...@gmail.com>
> > wrote:
> >
> >     I tried to use One vs Rest in spark ml with pipeline and
> crossValidator for
> >     multimultinomial in logistic regression.
> >
> >     It came out with empty coefficients. I figured out it was the 
> setting
> of
> >     ParamGridBuilder. Can anyone help me understand how does the
> parameter
> >     setting affect the crossValidator process?
> >
> >     the orginal code: //output empty coefficients.
> >
> >     val logist=new LogisticRegression
> >
> >     val ova = new OneVsRest().setClassifier(logist)
> >
> >     val paramMaps = new ParamGridBuilder().addGrid(ova.classifier,
> >     Array(logist.getRegParam))
> >
> >     New code://output multi classes coefficients
> >
> >     val logist=new LogisticRegression
> >
> >     val ova = new OneVsRest().setClassifier(logist)
> >
> >     val classifier1 = new LogisticRegression().setRegParam(2.0)
> >
> >     val classifier2 = new LogisticRegression().setRegParam(3.0)
> >
> >     val paramMaps = new ParamGridBuilder() .addGrid(ova.classifier,
> >     Array(classifier1, classifier2))
> >
> >     Please help Thanks.
> >
> >
> >
> >     --
> >     Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
> >
> >     
> -
> >     To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >
> >
> >
> 
> 

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: ML:One vs Rest with crossValidator for multinomial in logistic regression

2018-01-31 Thread Nicolas Paris
Hey 

I am also interested in how to get those parameters.
For example, the demo code 
spark-2.2.1-bin-hadoop2.7/examples/src/main/python/ml/estimator_transformer_param_example.py
return empty parameters when  printing "lr.extractParamMap()"

That's weird

Thanks

Le 30 janv. 2018 à 23:10, Bryan Cutler écrivait :
> Hi Michelle,
> 
> Your original usage of ParamGridBuilder was not quite right, `addGrid` expects
> (some parameter, array of values for that parameter).  If you want to do a 
> grid
> search with different regularization values, you would do the following:
> 
> val paramMaps = new ParamGridBuilder().addGrid(logist.regParam, Array(0.1,
> 0.3)).build()
> 
> * don't forget to build the grid after adding values
> 
> On Tue, Jan 30, 2018 at 6:55 AM, michelleyang 
> wrote:
> 
> I tried to use One vs Rest in spark ml with pipeline and crossValidator 
> for
> multimultinomial in logistic regression.
> 
> It came out with empty coefficients. I figured out it was the setting of
> ParamGridBuilder. Can anyone help me understand how does the parameter
> setting affect the crossValidator process?
> 
> the orginal code: //output empty coefficients.
> 
> val logist=new LogisticRegression
> 
> val ova = new OneVsRest().setClassifier(logist)
> 
> val paramMaps = new ParamGridBuilder().addGrid(ova.classifier,
> Array(logist.getRegParam))
> 
> New code://output multi classes coefficients
> 
> val logist=new LogisticRegression
> 
> val ova = new OneVsRest().setClassifier(logist)
> 
> val classifier1 = new LogisticRegression().setRegParam(2.0)
> 
> val classifier2 = new LogisticRegression().setRegParam(3.0)
> 
> val paramMaps = new ParamGridBuilder() .addGrid(ova.classifier,
> Array(classifier1, classifier2))
> 
> Please help Thanks.
> 
> 
> 
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 
> 
> 

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Vectorized ORC Reader in Apache Spark 2.3 with Apache ORC 1.4.1.

2018-01-28 Thread Nicolas Paris
Hi

Thanks for this work.

Will this affect both:
1) spark.read.format("orc").load("...")
2) spark.sql("select ... from my_orc_table_in_hive")

?


Le 10 janv. 2018 à 20:14, Dongjoon Hyun écrivait :
> Hi, All.
> 
> Vectorized ORC Reader is now supported in Apache Spark 2.3.
> 
>     https://issues.apache.org/jira/browse/SPARK-16060
> 
> It has been a long journey. From now, Spark can read ORC files faster without
> feature penalty.
> 
> Thank you for all your support, especially Wenchen Fan.
> 
> It's done by two commits.
> 
>     [SPARK-16060][SQL] Support Vectorized ORC Reader
>     https://github.com/apache/spark/commit/f44ba910f58083458e1133502e193a
> 9d6f2bf766
> 
>     [SPARK-16060][SQL][FOLLOW-UP] add a wrapper solution for vectorized orc
> reader
>     https://github.com/apache/spark/commit/eaac60a1e20e29084b7151ffca964c
> faa5ba99d1
> 
> Please check OrcReadBenchmark for the final speed-up from `Hive built-in ORC`
> to `Native ORC Vectorized`.
> 
>     https://github.com/apache/spark/blob/master/sql/hive/src/test/scala/org/
> apache/spark/sql/hive/orc/OrcReadBenchmark.scala
> 
> Thank you.
> 
> Bests,
> Dongjoon.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Hive From Spark: Jdbc VS sparkContext

2017-11-22 Thread Nicolas Paris
Hey

Finally I improved a lot the spark-hive sql performances.

I had some problem with some topology_script.py that made huge log error
trace and reduced spark performances in python mode. I just corrected
the python2 scripts to be python3 ready.
I had some problem with broadcast variable while joining tables. I just
deactivated this fucntionality.

As a result our users are now able to use spark-hive with very limited
resources (2 executors with 4core) and get decent performances for
analytics.

Compared to JDBC presto, this has several advantages:
- integrated solution
- single security layer (hive/kerberos)
- direct partitionned lazy datasets versus complicated jdbc dataset management
- more robust for analytics with less memory (apparently)

However presto still makes sence for sub second analytics, and oltp like
queries and data discovery.

Le 05 nov. 2017 à 13:57, Nicolas Paris écrivait :
> Hi
> 
> After some testing, I have been quite disapointed with hiveContext way of
> accessing hive tables.
> 
> The main problem is resource allocation: I have tons of users and they
> get a limited subset of workers. Then this does not allow to query huge
> datasetsn because to few memory allocated (or maybe I am missing
> something).
> 
> If using Hive jdbc, Hive resources are shared by all my users and then
> queries are able to finish.
> 
> Then I have been testing other jdbc based approach and for now, "presto"
> looks like the most appropriate solution to access hive tables.
> 
> In order to load huge datasets into spark, the proposed approach is to
> use presto distributed CTAS to build an ORC dataset, and access to that
> dataset from spark dataframe loader ability (instead of direct jdbc
> access tha would break the driver memory).
> 
> 
> 
> Le 15 oct. 2017 à 19:24, Gourav Sengupta écrivait :
> > Hi Nicolas,
> > 
> > without the hive thrift server, if you try to run a select * on a table 
> > which
> > has around 10,000 partitions, SPARK will give you some surprises. PRESTO 
> > works
> > fine in these scenarios, and I am sure SPARK community will soon learn from
> > their algorithms.
> > 
> > 
> > Regards,
> > Gourav
> > 
> > On Sun, Oct 15, 2017 at 3:43 PM, Nicolas Paris <nipari...@gmail.com> wrote:
> > 
> > > I do not think that SPARK will automatically determine the partitions.
> > Actually
> > > it does not automatically determine the partitions. In case a table 
> > has a
> > few
> > > million records, it all goes through the driver.
> > 
> > Hi Gourav
> > 
> > Actualy spark jdbc driver is able to deal direclty with partitions.
> > Sparks creates a jdbc connection for each partition.
> > 
> > All details explained in this post :
> > http://www.gatorsmile.io/numpartitionsinjdbc/
> > 
> > Also an example with greenplum database:
> > http://engineering.pivotal.io/post/getting-started-with-greenplum-spark/
> > 
> > 

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: pySpark driver memory limit

2017-11-08 Thread Nicolas Paris
Le 06 nov. 2017 à 19:56, Nicolas Paris écrivait :
> Can anyone clarify the driver memory aspects of pySpark?
> According to [1], spark.driver.memory limits JVM + python memory.
> 
> In case:
> spark.driver.memory=2G
> Then does it mean the user won't be able to use more than 2G, whatever
> the python code + the RDD stuff he is using ?
> 
> Thanks,
> 
> [1]: 
> http://apache-spark-user-list.1001560.n3.nabble.com/spark-is-running-extremely-slow-with-larger-data-set-like-2G-td17152.html
> 


after some testing, the python driver memory is not limited by
spark.driver.memory
instead, there is no limit at all for those processes. This may be
managed by cgroups however.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



pySpark driver memory limit

2017-11-06 Thread Nicolas Paris
hi there


Can anyone clarify the driver memory aspects of pySpark?
According to [1], spark.driver.memory limits JVM + python memory.

In case:
spark.driver.memory=2G
Then does it mean the user won't be able to use more than 2G, whatever
the python code + the RDD stuff he is using ?

Thanks,

[1]: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-is-running-extremely-slow-with-larger-data-set-like-2G-td17152.html



-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Hive From Spark: Jdbc VS sparkContext

2017-11-05 Thread Nicolas Paris
Le 05 nov. 2017 à 22:46, ayan guha écrivait :
> Thank you for the clarification. That was my understanding too. However how to
> provide the upper bound as it changes for every call in real life. For example
> it is not required for sqoop. 

True.  AFAIK sqoop begins with doing a  
"select min(column_split),max(column_split) 
from () as query;" 
and then splits the result.

I was thinking doing the same with wrapper with spark jdbc that would
infer the number partition, and the upper/lower bound itself. 


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Hive From Spark: Jdbc VS sparkContext

2017-11-05 Thread Nicolas Paris
Le 05 nov. 2017 à 22:02, ayan guha écrivait :
> Can you confirm if JDBC DF Reader actually loads all data from source to 
> driver
> memory and then distributes to the executors?

apparently yes when not using partition column


> And this is true even when a
> partition column is provided?

No, in this case, each worker send a jdbc call accordingly to
documentation 
https://spark.apache.org/docs/latest/sql-programming-guide.html#jdbc-to-other-databases


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Hive From Spark: Jdbc VS sparkContext

2017-11-05 Thread Nicolas Paris
Le 05 nov. 2017 à 14:11, Gourav Sengupta écrivait :
> thanks a ton for your kind response. Have you used SPARK Session ? I think 
> that
> hiveContext is a very old way of solving things in SPARK, and since then new
> algorithms have been introduced in SPARK. 

I will give a try out sparkSession. 

> It will be a lot of help, given how kind you have been by sharing your
> experience, if you could kindly share your code as well and provide details
> like SPARK , HADOOP, HIVE, and other environment version and details.

I am testing a HDP 2.6 distrib and also:
SPARK: 2.1.1
HADOOP: 2.7.3
HIVE: 1.2.1000
PRESTO: 1.87

> After all, no one wants to use SPARK 1.x version to solve problems anymore,
> though I have seen couple of companies who are stuck with these versions as
> they are using in house deployments which they cannot upgrade because of
> incompatibility issues.

Didn't know hiveContext was legacy spark way. I will give a try to
sparkSession and conclude. After all, I would prefer to provide our
users, a unique and uniform framework such spark, instead of multiple
complicated layers such spark + whatever jdbc access

> 
> 
> Regards,
> Gourav Sengupta
> 
> 
> On Sun, Nov 5, 2017 at 12:57 PM, Nicolas Paris <nipari...@gmail.com> wrote:
> 
> Hi
> 
> After some testing, I have been quite disapointed with hiveContext way of
> accessing hive tables.
> 
> The main problem is resource allocation: I have tons of users and they
> get a limited subset of workers. Then this does not allow to query huge
> datasetsn because to few memory allocated (or maybe I am missing
> something).
> 
> If using Hive jdbc, Hive resources are shared by all my users and then
> queries are able to finish.
> 
> Then I have been testing other jdbc based approach and for now, "presto"
> looks like the most appropriate solution to access hive tables.
> 
> In order to load huge datasets into spark, the proposed approach is to
> use presto distributed CTAS to build an ORC dataset, and access to that
> dataset from spark dataframe loader ability (instead of direct jdbc
> access tha would break the driver memory).
> 
> 
> 
> Le 15 oct. 2017 à 19:24, Gourav Sengupta écrivait :
> > Hi Nicolas,
> >
> > without the hive thrift server, if you try to run a select * on a table
> which
> > has around 10,000 partitions, SPARK will give you some surprises. PRESTO
> works
> > fine in these scenarios, and I am sure SPARK community will soon learn
> from
> > their algorithms.
> >
> >
> > Regards,
> > Gourav
> >
> > On Sun, Oct 15, 2017 at 3:43 PM, Nicolas Paris <nipari...@gmail.com>
> wrote:
> >
> >     > I do not think that SPARK will automatically determine the
> partitions.
> >     Actually
> >     > it does not automatically determine the partitions. In case a 
> table
> has a
> >     few
> >     > million records, it all goes through the driver.
> >
> >     Hi Gourav
> >
> >     Actualy spark jdbc driver is able to deal direclty with partitions.
> >     Sparks creates a jdbc connection for each partition.
> >
> >     All details explained in this post :
> >     http://www.gatorsmile.io/numpartitionsinjdbc/
> >
> >     Also an example with greenplum database:
> >     http://engineering.pivotal.io/post/getting-started-with-
> greenplum-spark/
> >
> >
> 
> 

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Hive From Spark: Jdbc VS sparkContext

2017-11-05 Thread Nicolas Paris
Hi

After some testing, I have been quite disapointed with hiveContext way of
accessing hive tables.

The main problem is resource allocation: I have tons of users and they
get a limited subset of workers. Then this does not allow to query huge
datasetsn because to few memory allocated (or maybe I am missing
something).

If using Hive jdbc, Hive resources are shared by all my users and then
queries are able to finish.

Then I have been testing other jdbc based approach and for now, "presto"
looks like the most appropriate solution to access hive tables.

In order to load huge datasets into spark, the proposed approach is to
use presto distributed CTAS to build an ORC dataset, and access to that
dataset from spark dataframe loader ability (instead of direct jdbc
access tha would break the driver memory).



Le 15 oct. 2017 à 19:24, Gourav Sengupta écrivait :
> Hi Nicolas,
> 
> without the hive thrift server, if you try to run a select * on a table which
> has around 10,000 partitions, SPARK will give you some surprises. PRESTO works
> fine in these scenarios, and I am sure SPARK community will soon learn from
> their algorithms.
> 
> 
> Regards,
> Gourav
> 
> On Sun, Oct 15, 2017 at 3:43 PM, Nicolas Paris <nipari...@gmail.com> wrote:
> 
> > I do not think that SPARK will automatically determine the partitions.
> Actually
> > it does not automatically determine the partitions. In case a table has 
> a
> few
> > million records, it all goes through the driver.
> 
> Hi Gourav
> 
> Actualy spark jdbc driver is able to deal direclty with partitions.
> Sparks creates a jdbc connection for each partition.
> 
> All details explained in this post :
> http://www.gatorsmile.io/numpartitionsinjdbc/
> 
> Also an example with greenplum database:
> http://engineering.pivotal.io/post/getting-started-with-greenplum-spark/
> 
> 

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Hive From Spark: Jdbc VS sparkContext

2017-10-15 Thread Nicolas Paris
> I do not think that SPARK will automatically determine the partitions. 
> Actually
> it does not automatically determine the partitions. In case a table has a few
> million records, it all goes through the driver.

Hi Gourav

Actualy spark jdbc driver is able to deal direclty with partitions.
Sparks creates a jdbc connection for each partition.

All details explained in this post : 
http://www.gatorsmile.io/numpartitionsinjdbc/

Also an example with greenplum database:
http://engineering.pivotal.io/post/getting-started-with-greenplum-spark/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Hive From Spark: Jdbc VS sparkContext

2017-10-15 Thread Nicolas Paris
Hi Gourav

> what if the table has partitions and sub-partitions? 

well this also work with multiple orc files having same schema:
val people = sqlContext.read.format("orc").load("hdfs://cluster/people*")
Am I missing something?

> And you do not want to access the entire data?

This works for static datasets, or when new data is comming by batch
processes, the spark application should be reloaded to get the new files
in the folder


>> On Sun, Oct 15, 2017 at 12:55 PM, Nicolas Paris <nipari...@gmail.com> wrote:
> 
> Le 03 oct. 2017 à 20:08, Nicolas Paris écrivait :
> > I wonder the differences accessing HIVE tables in two different ways:
> > - with jdbc access
> > - with sparkContext
> 
> Well there is also a third way to access the hive data from spark:
> - with direct file access (here ORC format)
> 
> 
> For example:
> 
> val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
> sqlContext.setConf("spark.sql.orc.filterPushdown", "true")
> val people = sqlContext.read.format("orc").load("hdfs://cluster//orc_
> people")
> people.createOrReplaceTempView("people")
> sqlContext.sql("SELECT count(1) FROM people WHERE ...").show()
> 
> 
> This method looks much faster than both:
> - with jdbc access
> - with sparkContext
> 
> Any experience on that ?
> 
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 
> 
> 

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Hive From Spark: Jdbc VS sparkContext

2017-10-15 Thread Nicolas Paris
Le 03 oct. 2017 à 20:08, Nicolas Paris écrivait :
> I wonder the differences accessing HIVE tables in two different ways:
> - with jdbc access
> - with sparkContext

Well there is also a third way to access the hive data from spark:
- with direct file access (here ORC format)


For example:

val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
sqlContext.setConf("spark.sql.orc.filterPushdown", "true")
val people = sqlContext.read.format("orc").load("hdfs://cluster//orc_people")
people.createOrReplaceTempView("people")
sqlContext.sql("SELECT count(1) FROM people WHERE ...").show()


This method looks much faster than both:
- with jdbc access
- with sparkContext

Any experience on that ?


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Hive From Spark: Jdbc VS sparkContext

2017-10-13 Thread Nicolas Paris
> In case a table has a few
> million records, it all goes through the driver.

This sounds clear in JDBC mode, the driver get all the rows and then it
spreads the RDD over the executors.

I d'say that most use cases deal with SQL to aggregate huge datasets,
and retrieve small amount of rows to be then transformed for ML tasks.
Then using JDBC offers the robustness of HIVE to produce a small aggregated
dataset into spark. While using SPARK SQL uses RDD to produce the small
one from huge.

Not very clear how SPARK SQL deal with huge HIVE table. Does it load
everything into memory and crash, or does this never happend?


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Hive From Spark: Jdbc VS sparkContext

2017-10-03 Thread Nicolas Paris
Hi

I wonder the differences accessing HIVE tables in two different ways:
- with jdbc access
- with sparkContext

I would say that jdbc is better since it uses HIVE that is based on
map-reduce / TEZ and then works on disk. 
Using spark rdd can lead to memory errors on very huge datasets.


Anybody knows or can point me to relevant documentation ?

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Working with hadoop har file in spark

2017-08-17 Thread Nicolas Paris
Hi

I put million files into a har archive on hdfs. I d'like to iterate over
their file paths, and read them. (Basically they are pdf, and I want to
transform them into text with apache pdfbox)

My first attempts has been to list them with hadoop command 
`hdfs dfs -ls har:///user//har/pdf.har` and this works fine.
However, when I try to replicate this in spark, I get an error:

```  
val hconf = SparkHadoopUtil.get.newConfiguration(sc.getConf)
val hdfs = FileSystem.get(hconf)
val test = hdfs.listFiles(new Path("har:///user//har/pdf.har"), false)
java.lang.IllegalArgumentException: Wrong FS:
har:/user//har/pdf.har, expected: hdfs://:
at
org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:661)
```  

However, I had been able to use the `sc.textFile` without problem:

```
val test = sc.textFile("har:///user//har/pdf.har").count
8000
```  

--
1) Is it easily solvable ?
2) Do I need to implement my own pdfFile reader, inspired from textFile ?
2) If not, does har the best way ? I have been looking at AVRO too

Thanks for any advice,

-- 
Nicolas

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org