Thanks Todd, that solved my problem. Regards, Gaurav (please excuse spelling mistakes) Sent from phone On Jun 11, 2015 6:42 PM, "Todd Nist" <tsind...@gmail.com> wrote:
> Hi Gaurav, > > Seems like you could use a broadcast variable for this if I understand > your use case. Create it in the driver based on the CommandLineArguments > and then use it in the workers. > > > https://spark.apache.org/docs/latest/programming-guide.html#broadcast-variables > > So something like: > > Broadcast<Integer> cmdLineArg = sc.broadcast(Inetger.parseInd(args[12])); > > Then just reference the broadcast variable in you workers. It will get > shipped once to all nodes in the cluster and can be referenced by them. > > HTH. > > -Todd > > On Thu, Jun 11, 2015 at 8:23 AM, gaurav sharma <sharmagaura...@gmail.com> > wrote: > >> Hi, >> >> I am using Kafka Spark cluster for real time aggregation analytics use >> case in production. >> >> Cluster details >> 6 nodes, each node running 1 Spark and kafka processes each. >> Node1 -> 1 Master , 1 Worker, 1 Driver, >> 1 Kafka process >> Node 2,3,4,5,6 -> 1 Worker prcocess each >> 1 Kafka process each >> >> Spark version 1.3.0 >> Kafka Veriosn 0.8.1 >> >> I am using Kafka Directstream for Kafka Spark Integration. >> Analytics code is written in using Spark Java API. >> >> Problem Statement : >> >> I want to accept a paramter as command line argument, and pass on >> to the executors. >> (want to use the paramter in rdd.foreach method which is executed >> on executor) >> >> I understand that when driver is started, only the jar is >> transported to all the Workers. >> But i need to use the dynamically passed command line argument in >> the reduce operation executed on executors. >> >> >> Code Snippets for better understanding my problem : >> >> public class KafkaReconcilationJob { >> >> private static Logger logger = >> Logger.getLogger(KafkaReconcilationJob.class); >> public static void main(String[] args) throws Exception { >> CommandLineArguments.CLICK_THRESHOLD = >> Integer.parseInt(args[12]); >> -------------------------------------------------------> I want to use this >> command line argument >> } >> >> } >> >> >> >> JavaRDD<AggregatedAdeStats> adeAggregatedFilteredData = >> adeAudGeoAggDataRdd.filter(new Function<AggregatedAdeStats, Boolean>() { >> @Override >> public Boolean call(AggregatedAdeStats adeAggregatedObj) throws Exception >> { >> if(adeAggregatedObj.getImpr() > CommandLineArguments.IMPR_THRESHOLD || >> adeAggregatedObj.getClick() > CommandLineArguments.CLICK_THRESHOLD){ >> return true; >> }else { >> return false; >> } >> } >> }); >> >> >> >> The above mentioned Filter operation gets executed on executor which has >> 0 as the value of the static field CommandLineArguments.CLICK_THRESHOLD >> >> >> Regards, >> Gaurav >> > >