Re: Spark Structured Streaming org.apache.spark.sql.functions.input_file_name Intermittently Missing FileName
Looks like somehow related to API unable to send data from executor to driver If I set spark master to local I get these 6 files When spark.master is local&&&&&&&&&&&&& InputReportAndFileName fileName file:///Users/abc/Desktop/test/Streaming/d&&&&&&&&&&&&& InputReportAndFileName fileName file:///Users/abc/Desktop/test/Streaming/a&&&&&&&&&&&&& InputReportAndFileName fileName file:///Users/abc/Desktop/test/Streaming/b&&&&&&&&&&&&& InputReportAndFileName fileName file:///Users/abc/Desktop/test/Streaming/g&&&&&&&&&&&&& InputReportAndFileName fileName file:///Users/abc/Desktop/test/Streaming/e&&&&&&&&&&&&& InputReportAndFileName fileName file:///Users/abc/Desktop/test/Streaming/f&&&&&&&&&&&&& InputReportAndFileName fileName If I set spark master local[*] I get these file...&&&&&&&&&&&&& InputReportAndFileName fileName &&&&&&&&&&&&& InputReportAndFileName fileName file:///Users/abc/Desktop/test/Streaming/b&&&&&&&&&&&&& InputReportAndFileName fileName file:///Users/abc/Desktop/test/Streaming/e&&&&&&&&&&&&& InputReportAndFileName fileName file:///Users/abc/Desktop/test/Streaming/d&&&&&&&&&&&&& InputReportAndFileName fileName &&&&&&&&&&&&& InputReportAndFileName fileName &&&&&&&&&&&&& InputReportAndFileName fileName On Tuesday, October 12, 2021, 05:23:45 AM PDT, Alchemist wrote: Here is Spark's API definition, unable to understand what does it mean to have "unknown" file. We are processing file we will have fileName I have 7 files it can print 3 and miss other 4 /** * Returns the holding file name or empty string if it is unknown. */ def getInputFilePath: UTF8String = inputBlock.get().get().filePath Can anyone help me understand what does it mean by file name unknown hence above API returning blank filename below. On Monday, October 11, 2021, 08:43:42 PM PDT, Alchemist wrote: Hello all, I am trying to extract file name like following but intermittently we are getting empty file name. Step 1: Get SchemaStructType jsonSchema = sparkSession.read() .option("multiLine", true) .json("src/main/resources/sample.json") .schema();Step2: Get Input DataSetDataset inputDS = sparkSession .readStream() .format("text") .option("multiLine", true) .schema(jsonSchema) .json(inputPath + "/*");Step3: Add fileName columnDataset inputDf= inputDS.select(functions.col("Report")).toJSON() .withColumn("FileName", org.apache.spark.sql.functions.input_file_name());Step4: Print fileNameDataset inputDF = inputDf .as(ExpressionEncoder.javaBean(InputReportAndFileName.class)).map((MapFunction) inputReportAndFileName ->{ System.out.println("&&&&&&&&&&&&& InputReportAndFileName fileName " + inputReportAndFileName.getFileName()); return inputReportAndFileName;}, ExpressionEncoder.javaBean(InputReportAndFileName.class)); Output: Here we see missing fileName&&&&&&&&&&&&& InputReportAndFileName fileName &&&&&&&&&&&&& InputReportAndFileName fileName file:///Users/abc/Desktop/test/Streaming/2021-Aug-14-042000_001E46_1420254%202&&&&&&&&&&&&& InputReportAndFileName fileName file:///Users/abc/Desktop/test/Streaming/2021-Aug-14-042000_001E46_14202040&&&&&&&&&&&&& InputReportAndFileName fileName file:///Users/abc/Desktop/test/Streaming/2021-Aug-14-042000_001E46_142720%202&&&&&&&&&&&&& InputReportAndFileName fileName &&&&&&&&&&&&& InputReportAndFileName fileName &&&&&&&&&&&&& InputReportAndFileName fileName
Re: Spark Structured Streaming org.apache.spark.sql.functions.input_file_name Intermittently Missing FileName
Here is Spark's API definition, unable to understand what does it mean to have "unknown" file. We are processing file we will have fileName I have 7 files it can print 3 and miss other 4 /** * Returns the holding file name or empty string if it is unknown. */ def getInputFilePath: UTF8String = inputBlock.get().get().filePath Can anyone help me understand what does it mean by file name unknown hence above API returning blank filename below. On Monday, October 11, 2021, 08:43:42 PM PDT, Alchemist wrote: Hello all, I am trying to extract file name like following but intermittently we are getting empty file name. Step 1: Get SchemaStructType jsonSchema = sparkSession.read() .option("multiLine", true) .json("src/main/resources/sample.json") .schema();Step2: Get Input DataSetDataset inputDS = sparkSession .readStream() .format("text") .option("multiLine", true) .schema(jsonSchema) .json(inputPath + "/*");Step3: Add fileName columnDataset inputDf= inputDS.select(functions.col("Report")).toJSON() .withColumn("FileName", org.apache.spark.sql.functions.input_file_name());Step4: Print fileNameDataset inputDF = inputDf .as(ExpressionEncoder.javaBean(InputReportAndFileName.class)).map((MapFunction) inputReportAndFileName ->{ System.out.println("&&&&&&&&&&&&& InputReportAndFileName fileName " + inputReportAndFileName.getFileName()); return inputReportAndFileName;}, ExpressionEncoder.javaBean(InputReportAndFileName.class)); Output: Here we see missing fileName&&&&&&&&&&&&& InputReportAndFileName fileName &&&&&&&&&&&&& InputReportAndFileName fileName file:///Users/abc/Desktop/test/Streaming/2021-Aug-14-042000_001E46_1420254%202&&&&&&&&&&&&& InputReportAndFileName fileName file:///Users/abc/Desktop/test/Streaming/2021-Aug-14-042000_001E46_14202040&&&&&&&&&&&&& InputReportAndFileName fileName file:///Users/abc/Desktop/test/Streaming/2021-Aug-14-042000_001E46_142720%202&&&&&&&&&&&&& InputReportAndFileName fileName &&&&&&&&&&&&& InputReportAndFileName fileName &&&&&&&&&&&&& InputReportAndFileName fileName
Spark Structured Streaming org.apache.spark.sql.functions.input_file_name Intermittently Missing FileName
Hello all, I am trying to extract file name like following but intermittanly we are getting empty file name. Step 1: Get SchemaStructType jsonSchema = sparkSession.read() .option("multiLine", true) .json("src/main/resources/sample.json") .schema();Step2: Get Input DataSetDataset inputDS = sparkSession .readStream() .format("text") .option("multiLine", true) .schema(jsonSchema) .json(inputPath + "/*");Step3: Add fileName columnDataset inputDf= inputDS.select(functions.col("Report")).toJSON() .withColumn("FileName", org.apache.spark.sql.functions.input_file_name());Step4: Print fileNameDataset inputDF = inputDf .as(ExpressionEncoder.javaBean(InputReportAndFileName.class)).map((MapFunction) inputReportAndFileName ->{ System.out.println("& InputReportAndFileName fileName " + inputReportAndFileName.getFileName()); return inputReportAndFileName;}, ExpressionEncoder.javaBean(InputReportAndFileName.class)); Output: Here we see missing fileName& InputReportAndFileName fileName & InputReportAndFileName fileName file:///Users/abc/Desktop/test/Streaming/2021-Aug-14-042000_001E46_1420254%202& InputReportAndFileName fileName file:///Users/abc/Desktop/test/Streaming/2021-Aug-14-042000_001E46_14202040& InputReportAndFileName fileName file:///Users/abc/Desktop/test/Streaming/2021-Aug-14-042000_001E46_142720%202& InputReportAndFileName fileName & InputReportAndFileName fileName & InputReportAndFileName fileName
How to process S3 data in Scalable Manner Using Spark API (wholeTextFile VERY SLOW and NOT scalable)
Issue: We are using wholeTextFile() API to read files from S3. But this API is extremely SLOW due to reasons mentioned below. Question is how to fix this issue? Here is our analysis so FAR: Issue is we are using Spark WholeTextFile API to read s3 files. WholeTextFile API works in two step. First step driver/master tries to list all the S3 files second step is driver/master tries to split the list of files and distribute those files to number of worker nodes and executor to process). STEP 1. List all the s3 files in the given paths (we pass this path when we run the every single gw/device/app step). Issue is every single batch of every single report is first listing number of files. Main problem that we have is we are using S3 where listing files in a bucket is single threaded. This is because the S3 API for listing the keys in a bucket only returns keys by chunks of 1000 per call. Single threaded S3 API just tries to list files 1000 at a time, so for a million files we are looking at 1000 S3 single threaded API call. STEP 2. Control the number of splits depends on number of input partitions and distribute the load to worker nodes to process.
How to make bucket listing faster while using S3 with wholeTextFile
How to optimize s3 list S3 file using wholeTextFile(): We are using wholeTextFile to read data from S3. As per my understanding wholeTextFile first list files of given path. Since we are using S3 as input source, then listing files in a bucket is single-threaded, the S3 API for listing the keys in a bucket only returns keys by chunks of 1000 per call. Since we have at millions of files, we are making thousands API calls. This listing make our processing very slow. How can we make listing of S3 faster? Thanks, Rachana
Spark stuck at removing broadcast variable
I am running a simple Spark structured streaming application that is pulling data from a Kafka Topic. I have a Kafka Topic with nearly 1000 partitions. I am running this app on 6 node EMR cluster with 4 cores and 16GB RAM. I observed that Spark is trying to pull data from all 1024 Kafka partition and after running successful for few iteration it is stuck with following exception: 20/04/18 00:51:41 INFO ContextCleaner: Cleaned accumulator 10120/04/18 00:51:41 INFO ContextCleaner: Cleaned accumulator 6620/04/18 00:51:41 INFO ContextCleaner: Cleaned accumulator 7720/04/18 00:51:41 INFO ContextCleaner: Cleaned accumulator 78 20/04/18 00:51:41 INFO BlockManagerInfo: Removed broadcast_2_piece0 on in memory (size: 4.5 KB, free: 2.7 GB)20/04/18 00:51:41 INFO BlockManagerInfo: Removed broadcast_2_piece0 on ip- in memory (size: 4.5 KB, free: 2.7 GB)20/04/18 00:51:41 INFO BlockManagerInfo: Removed broadcast_2_piece0 on ip- in memory (size: 4.5 KB, free: 2.7 GB)Then Sparks show RUNNING but it is NOT Processing any data.
How to repartition Spark DStream Kafka ConsumerRecord RDD.
How to repartition Spark DStream Kafka ConsumerRecord RDD. I am getting uneven size of Kafka topics.. We want to repartition the input RDD based on some logic. But when I try to apply the repartition I am getting "object not serializable (class: org.apache.kafka.clients.consumer.ConsumerRecord" error, I found following workaround https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/troubleshooting/javaionotserializableexception.html Call rdd.forEachPartition and create the NotSerializable object in there like this:rdd.forEachPartition(iter -> { NotSerializable notSerializable = new NotSerializable(); // ...Now process iter}); APPLIED HERE val stream =KafkaUtils.createDirectStream[String, String]( ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParam) ).map(_.value()) stream.foreachRDD { rdd => val repartitionRDD = flow.repartitionRDD(rdd,1) println("&& repartitionRDD " + repartitionRDD.count()) val modifiedRDD = rdd.mapPartitions { iter =>{ val customerRecords: List[ConsumerRecord[String, String]] = List[ConsumerRecord[String, String]]() while(iter.hasNext){ val consumerRecord :ConsumerRecord[String, String] = iter.next() customerRecords:+ consumerRecord } customerRecords.iterator } } val r = modifiedRDD.repartition(1) println("* after repartition " + r.count()) BUT still getting same object not Serializable error. Any help is greatly appreciated.
Write data from Hbase using Spark Failing with NPE
aI am using Spark to write data to Hbase, I can read data just fine but write is failing with following exception. I found simila issue that got resolved by adding *site.xml and hbase JARs. But it is npot working for me. JavaPairRDD tablePuts = hBaseRDD.mapToPair(new PairFunction, ImmutableBytesWritable, Put>() { @Override public Tuple2 call(Tuple2 results) throws Exception { byte[] accountId = results._2().getValue(Bytes.toBytes(COLFAMILY), Bytes.toBytes("accountId")); String rowKey = new String(results._2().getRow()); String accountId2 = (Bytes.toString(accountId)); String vbMedia2 = Bytes.toString(vbmedia); System.out.println(" accountId " + accountId2); //int prefix = getHash(rowKey); String prefix = getMd5Hash(rowKey); String newrowKey = prefix + rowKey; System.out.println(" newrowKey &&&" + newrowKey); LOG.info(" newrowKey &&&" + newrowKey); // Add a single cell def:vbmedia Put put = new Put( Bytes.toBytes(newrowKey) ); put.addColumn(Bytes.toBytes("def"), Bytes.toBytes("accountId"), accountId); } }); Job newAPIJobConfiguration = Job.getInstance(conf); newAPIJobConfiguration.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, OUT_TABLE_NAME); newAPIJobConfiguration.setOutputFormatClass(org.apache.hadoop.hbase.mapreduce.TableOutputFormat.class); newAPIJobConfiguration.setOutputKeyClass(org.apache.hadoop.hbase.io.ImmutableBytesWritable.class); newAPIJobConfiguration.setOutputValueClass(org.apache.hadoop.io.Writable.class); tablePuts.saveAsNewAPIHadoopDataset(newAPIJobConfiguration.getConfiguration()); Exception in thread "main" java.lang.NullPointerException at org.apache.hadoop.hbase.security.UserProvider.instantiate(UserProvider.java:123) at org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(ConnectionFactory.java:214) at org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(ConnectionFactory.java:119) at org.apache.hadoop.hbase.mapreduce.TableOutputFormat.checkOutputSpecs(TableOutputFormat.java:177) at org.apache.spark.internal.io.HadoopMapReduceWriteConfigUtil.assertConf(SparkHadoopWriter.scala:387) at org.apache.spark.internal.io.SparkHadoopWriter$.write(SparkHadoopWriter.scala:71) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1083) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1081) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1081) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:363) at org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:1081) at org.apache.spark.api.java.JavaPairRDD.saveAsNewAPIHadoopDataset(JavaPairRDD.scala:831) at com.voicebase.etl.s3tohbase.HbaseScan2.main(HbaseScan2.java:148) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:879) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:197) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:227) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:136) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Spark UNEVENLY distributing data
I am trying to parallelize a simple Spark program processes HBASE data in parallel.// Get Hbase RDD JavaPairRDD hBaseRDD = jsc .newAPIHadoopRDD(conf, TableInputFormat.class, ImmutableBytesWritable.class, Result.class); long count = hBaseRDD.count(); Only two lines I see in the logs. Zookeeper starts and Zookeeper stops The problem is my program is as SLOW as the largest bar. Found that ZK is taking long time before shutting.18/05/19 17:26:55 INFO zookeeper.ClientCnxn: Session establishment complete on server :2181, sessionid = 0x163662b64eb046d, negotiated timeout = 4 18/05/19 17:38:00 INFO zookeeper.ZooKeeper: Session: 0x163662b64eb046d closed - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Malformed URL Exception when connecting to Phoenix to Spark
CodeJavaSparkContext sc = new JavaSparkContext(sparkConf); SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc); Map map = new HashMap(); map.put("zkUrl", args[2]); map.put("table", args[1]); map.put("driver", "org.apache.phoenix.jdbc.PhoenixDriver"); // long cnt = sqlContext.read().format("org.apache.phoenix.spark").options(map).load().count() DataFrameReader reader = sqlContext.read(); DataFrameReader readerM = reader.format("org.apache.phoenix.spark"); DataFrameReader readerM2 = readerM.options(map); Dataset ds = readerM2.load(); ds.logicalPlan(); long cnt = ds.count(); // format("org.apache.phoenix.spark").options(map).load().count(); System.out.println(" cnt " + cnt); Exception18/05/09 12:31:23 INFO RecoverableZooKeeper: Process identifier=hconnection-0x5ebbde60 connecting to ZooKeeper ensemble=10.16.129.152:2181Exception in thread "main" java.sql.SQLNonTransientConnectionException: Cannot load connection class because of underlying exception: com.mysql.cj.core.exceptions.WrongArgumentException: Malformed database URL, failed to parse the main URL sections. at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:526) at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:513) at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:505) at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:479) at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:489) at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:72) at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:124) at com.mysql.cj.jdbc.NonRegisteringDriver.connect(NonRegisteringDriver.java:224) at java.sql.DriverManager.getConnection(DriverManager.java:664) at java.sql.DriverManager.getConnection(DriverManager.java:208) at org.apache.phoenix.mapreduce.util.ConnectionUtil.getConnection(ConnectionUtil.java:98) at org.apache.phoenix.mapreduce.util.ConnectionUtil.getInputConnection(ConnectionUtil.java:57) at org.apache.phoenix.mapreduce.util.ConnectionUtil.getInputConnection(ConnectionUtil.java:45) at org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.getSelectColumnMetadataList(PhoenixConfigurationUtil.java:338) at org.apache.phoenix.spark.PhoenixRDD.toDataFrame(PhoenixRDD.scala:118) at org.apache.phoenix.spark.PhoenixRelation.schema(PhoenixRelation.scala:60) at org.apache.spark.sql.execution.datasources.LogicalRelation$.apply(LogicalRelation.scala:77) at org.apache.spark.sql.SparkSession.baseRelationToDataFrame(SparkSession.scala:429) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:172) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:146) at PhoenixToDataFrame.main(PhoenixToDataFrame.java:41) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:775) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)Caused by: com.mysql.cj.core.exceptions.UnableToConnectException: Cannot load connection class because of underlying exception: com.mysql.cj.core.exceptions.WrongArgumentException: Malformed database URL, failed to parse the main URL sections. at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at com.mysql.cj.core.exceptions.ExceptionFactory.createException(ExceptionFactory.java:54) at com.mysql.cj.core.exceptions.ExceptionFactory.createException(ExceptionFactory.java:93) ... 23 moreCaused by: com.mysql.cj.core.exceptions.WrongArgumentException: Malformed database URL, failed to parse the main URL sections. at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at com.mysql.cj.core.exceptions.ExceptionFactory.createException(ExceptionFactory.java:54) at com.mysql.cj.core.conf.url.ConnectionUrlPa