Can you encapsulate your map function such that it returns data type other than Get ?
You can perform query to hbase but don't return Get. Cheers On Fri, Jan 15, 2016 at 6:46 AM, beeshma r <beeshm...@gmail.com> wrote: > Hi Ted , > > Any suggestions for changing this piece of code? > > public static JavaRDD<Get> getdocs(JavaRDD< > SolrDocumentList> li) > { > > JavaRDD<SolrDocumentList> newdocs=li; > > JavaRDD<Get> n=newdocs.map(new Function<SolrDocumentList,Get>(){ > > public Get call(SolrDocumentList si) throws IOException > { > Get get = null; > > for (SolrDocument doc : si) { > get = new Get(Bytes.toBytes(((String) > doc.getFieldValue("id")))); > > } > > return get; > > } > > > } > > > > On Fri, Jan 15, 2016 at 6:40 AM, Ted Yu <yuzhih...@gmail.com> wrote: > >> Here is signature for Get: >> >> public class Get extends Query >> >> implements Row, Comparable<Row> { >> >> It is not Serializable. >> >> >> FYI >> >> On Fri, Jan 15, 2016 at 6:37 AM, beeshma r <beeshm...@gmail.com> wrote: >> >>> HI i am trying to get data from Solr server . >>> >>> This is my code >>> >>> /*input is JavaRDD<SolrDocumentList> li >>> *output is JavaRDD<Get> for scanning Hbase*/ >>> >>> >>> public static JavaRDD<Get> getdocs(JavaRDD<SolrDocumentList> li) >>> { >>> >>> JavaRDD<SolrDocumentList> newdocs=li; >>> >>> JavaRDD<Get> n=newdocs.map(new Function<SolrDocumentList,Get>(){ >>> >>> public Get call(SolrDocumentList si) throws IOException >>> { >>> Get get = null; >>> >>> for (SolrDocument doc : si) { >>> get = new Get(Bytes.toBytes(((String) >>> doc.getFieldValue("id")))); >>> >>> } >>> >>> return get; >>> >>> } >>> >>> >>> } >>> >>> >>> issue am getting below error >>> >>> 16/01/15 06:05:55 INFO TaskSetManager: Starting task 0.0 in stage 0.0 >>> (TID 0, localhost, PROCESS_LOCAL, 2815 bytes) >>> 16/01/15 06:05:55 INFO Executor: Running task 0.0 in stage 0.0 (TID 0) >>> 16/01/15 06:05:55 INFO HttpClientUtil: Creating new http client, >>> config:maxConnections=128&maxConnectionsPerHost=32&followRedirects=false >>> 16/01/15 06:05:56 INFO HttpClientUtil: Creating new http client, >>> config:maxConnections=128&maxConnectionsPerHost=32&followRedirects=false >>> 16/01/15 06:05:56 INFO HttpClientUtil: Creating new http client, >>> config:maxConnections=128&maxConnectionsPerHost=32&followRedirects=false >>> 16/01/15 06:05:56 INFO HttpClientUtil: Creating new http client, >>> config:maxConnections=128&maxConnectionsPerHost=32&followRedirects=false >>> 16/01/15 06:05:56 INFO HttpClientUtil: Creating new http client, >>> config:maxConnections=128&maxConnectionsPerHost=32&followRedirects=false >>> 16/01/15 06:05:56 INFO HttpClientUtil: Creating new http client, >>> config:maxConnections=128&maxConnectionsPerHost=32&followRedirects=false >>> 16/01/15 06:05:57 ERROR Executor: Exception in task 0.0 in stage 0.0 >>> (TID 0) >>> >>> >>> >>> *java.io.NotSerializableException: >>> org.apache.hadoop.hbase.client.GetSerialization stack: - object not >>> serializable (class: org.apache.hadoop.hbase.client.Get, value: >>> {"timeRange":[0,9223372036854775807],"totalColumns":0,"cacheBlocks":true,"families":{},"maxVersions":1,"row":"row47777"}) >>> - element of array (index: 0)* >>> - array (class [Ljava.lang.Object;, size 6) >>> at >>> org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) >>> at >>> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) >>> at >>> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:84) >>> at >>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:240) >>> at >>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >>> at >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >>> at java.lang.Thread.run(Thread.java:724) >>> 16/01/15 06:05:57 ERROR TaskSetManager: Task 0.0 in stage 0.0 (TID 0) >>> had a not serializable result: org.apache.hadoop.hbase.client.Get >>> Serialization stack: >>> - object not serializable (class: >>> org.apache.hadoop.hbase.client.Get, value: >>> {"timeRange":[0,9223372036854775807],"totalColumns":0,"cacheBlocks":true,"families":{},"maxVersions":1,"row":"row47777"}) >>> - element of array (index: 0) >>> - array (class [Ljava.lang.Object;, size 6); not retrying >>> 16/01/15 06:05:57 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose >>> tasks have all completed, from pool >>> 16/01/15 06:05:57 INFO TaskSchedulerImpl: Cancelling stage 0 >>> 16/01/15 06:05:57 INFO DAGScheduler: ResultStage 0 (collect at >>> App.java:278) failed in 2.481 s >>> 16/01/15 06:05:57 INFO DAGScheduler: Job 0 failed: collect at >>> App.java:278, took 3.378240 s >>> [WARNING] >>> java.lang.reflect.InvocationTargetException >>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>> at >>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) >>> at >>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>> at java.lang.reflect.Method.invoke(Method.java:606) >>> at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:293) >>> at java.lang.Thread.run(Thread.java:724) >>> Caused by: org.apache.spark.SparkException: Job aborted due to stage >>> failure: Task 0.0 in stage 0.0 (TID 0) had a not serializable result: >>> org.apache.hadoop.hbase.client.Get >>> Serialization stack: >>> - object not serializable (class: >>> org.apache.hadoop.hbase.client.Get, value: >>> {"timeRange":[0,9223372036854775807],"totalColumns":0,"cacheBlocks":true,"families":{},"maxVersions":1,"row":"row47777"}) >>> - element of array (index: 0) >>> - array (class [Ljava.lang.Object;, size 6) >>> at org.apache.spark.scheduler.DAGScheduler.org >>> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283) >>> at >>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271) >>> at >>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270) >>> at >>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) >>> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >>> at >>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270) >>> at >>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697) >>> at >>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697) >>> at scala.Option.foreach(Option.scala:236) >>> at >>> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697) >>> at >>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496) >>> at >>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458) >>> at >>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447) >>> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) >>> at >>> org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567) >>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1824) >>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1837) >>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1850) >>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1921) >>> at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:909) >>> at >>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) >>> at >>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108) >>> at org.apache.spark.rdd.RDD.withScope(RDD.scala:310) >>> at org.apache.spark.rdd.RDD.collect(RDD.scala:908) >>> at >>> org.apache.spark.api.java.JavaRDDLike$class.collect(JavaRDDLike.scala:338) >>> at >>> org.apache.spark.api.java.AbstractJavaRDDLike.collect(JavaRDDLike.scala:47) >>> at com.hbaseresult.test.App.last(App.java:278) >>> at com.hbaseresult.test.App.main(App.java:327) >>> ... 6 more >>> >>> >>> >>> So where do am missing? >>> >> >> > > > -- > > > > > >