Re: Synonym handling replacement issue with UDF in Apache Spark
Hi, in case you're still struggling with this, I wrote a blog post explaining Spark Joins and UDFs, http://alberto-computerengineering.blogspot.com.ar/2017/05/custom-joins-in-spark-sql-spark-has.html <http://alberto-computerengineering.blogspot.com.ar/2017/05/custom-joins-in-spark-sql-spark-has.html> Alberto. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Synonym-handling-replacement-issue-with-UDF-in-Apache-Spark-tp28638p28675.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Synonym handling replacement issue with UDF in Apache Spark
You need to understand how join works to make sense of it. Logically, a join does a cartesian product of the 2 tables, and then filters the rows that satisfy the contains UDF. So, let's say you have Input Allen Armstrong nishanth hemanth Allen shivu Armstrong nishanth shree shivu DeWALT Replacement of words The word in LHS has to replace with the words in RHS given in the input sentence Allen=> Apex Tool Group Armstrong => Apex Tool Group DeWALT=> StanleyBlack Logically speaking it will first do a cartesian product, which will give you this Input x Replacement Allen Armstrong nishanth hemanth Allen, Allen, Apex Tool Group Allen Armstrong nishanth hemanth Allen, Armstrong, Apex Tool Group Allen Armstrong nishanth hemanth Allen, DeWalt, Apex Tool Group shivu Armstrong nishanth, Allen, Apex Tool Group shivu Armstrong nishanth, Armstrong, Apex Tool Group shivu Armstrong nishanth, DeWalt, Apex Tool Group shree shivu DeWALT, Allen, Apex Tool Group shree shivu DeWALT, Armstrong, Apex Tool Group shree shivu DeWALT, DeWalt, Apex Tool Group Then it will filter and keep only the records that satisfies contains Join output Allen Armstrong nishanth hemanth Allen, Allen, Apex Tool Group Allen Armstrong nishanth hemanth Allen, Armstrong, Apex Tool Group shivu Armstrong nishanth, Armstrong, Apex Tool Group shree shivu DeWALT, DeWalt, Apex Tool Group So, as you can see you have 4 output rows instead of 3. Now when ir performs the replace WithTerm operation, you get the output that you are getting -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Synonym-handling-replacement-issue-with-UDF-in-Apache-Spark-tp28638p28648.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Synonym handling replacement issue with UDF in Apache Spark
I am facing a major issue on replacement of Synonyms in my DataSet. I am trying to replace the synonym of the Brand names to its equivalent names. I have tried 2 methods to solve this issue. Method 1 (regexp_replace) Here i am using the regexp_replace method. Hashtable manufacturerNames = new Hashtable(); Enumeration names; String str; double bal; manufacturerNames.put("Allen","Apex Tool Group"); manufacturerNames.put("Armstrong","Apex Tool Group"); manufacturerNames.put("Campbell","Apex Tool Group"); manufacturerNames.put("Lubriplate","Apex Tool Group"); manufacturerNames.put("Delta","Apex Tool Group"); manufacturerNames.put("Gearwrench","Apex Tool Group"); manufacturerNames.put("H.K. Porter","Apex Tool Group"); /*100 MORE*/ manufacturerNames.put("Stanco","Stanco Mfg"); manufacturerNames.put("Stanco","Stanco Mfg"); manufacturerNames.put("Standard Safety","Standard Safety Equipment Company"); manufacturerNames.put("Standard Safety","Standard Safety Equipment Company"); // Show all balances in hash table. names = manufacturerNames.keys(); Dataset dataFileContent = sqlContext.load("com.databricks.spark.csv", options); while(names.hasMoreElements()) { str = (String) names.nextElement(); dataFileContent=dataFileContent.withColumn("ManufacturerSource", regexp_replace(col("ManufacturerSource"),str,manufacturerNames.get(str).toString())); } dataFileContent.show(); I got to know that the amount of data is too huge for regexp_replace so got a solution to use UDFhttp://stackoverflow.com/questions/43413513/issue-in-regex-replace-in-apache-spark-java Method 2 (UDF) List data2 = Arrays.asList( RowFactory.create("Allen", "Apex Tool Group"), RowFactory.create("Armstrong","Apex Tool Group"), RowFactory.create("DeWALT","StanleyBlack") ); StructType schema2 = new StructType(new StructField[] { new StructField("label2", DataTypes.StringType, false, Metadata.empty()), new StructField("sentence2", DataTypes.StringType, false, Metadata.empty()) }); Dataset sentenceDataFrame2 = spark.createDataFrame(data2, schema2); UDF2contains = new UDF2 () { private static final long serialVersionUID = -5239951370238629896L; @Override public Boolean call(String t1, String t2) throws Exception { return t1.contains(t2); } }; spark.udf().register("contains", contains, DataTypes.BooleanType); UDF3 replaceWithTerm = new UDF3 () { private static final long serialVersionUID = -2882956931420910207L; @Override public String call(String t1, String t2, String t3) throws Exception { return t1.replaceAll(t2, t3); } }; spark.udf().register("replaceWithTerm", replaceWithTerm, DataTypes.StringType); Dataset joined = sentenceDataFrame.join(sentenceDataFrame2, callUDF("contains", sentenceDataFrame.col("sentence"), sentenceDataFrame2.col("label2"))) .withColumn("sentence_replaced", callUDF("replaceWithTerm", sentenceDataFrame.col("sentence"), sentenceDataFrame2.col("label2"), sentenceDataFrame2.col("sentence2"))) .select(col("sentence_replaced")); joined.show(false);} Input Allen Armstrong nishanth hemanth Allenshivu Armstrong nishanthshree shivu DeWALT Replacement of wordsThe word in LHS has to replace with the words in RHS given in the input sentenceAllen => Apex Tool GroupArmstrong => Apex Tool GroupDeWALT => StanleyBlack Output +-+--+-+---++ |label|sentence_replaced +-+--+-+---++ |0 |Apex Tool Group Armstrong nishanth hemanth Apex Tool Group |0 |Allen Apex Tool Group nishanth hemanth Allen |1 |shivu Apex Tool Group nishanth |2 |shree shivu StanleyBlack +-+--+-+---++ Expected Output +-+--+-+---++ |label| sentence_replaced +-+--+-+---++ |0 |Apex Tool Group Apex Tool Group nishanth hemanth Apex Tool Group |1 |shivu Apex Tool Group nishanth
Re: Synonym handling replacement issue with UDF in Apache Spark
What about JOIN your table with a map table? On Thu, Apr 27, 2017 at 9:58 PM, Nishanthwrote: > I am facing a major issue on replacement of Synonyms in my DataSet. > > I am trying to replace the synonym of the Brand names to its equivalent > names. > > I have tried 2 methods to solve this issue. > > Method 1 (regexp_replace) > > Here i am using the regexp_replace method. > > Hashtable manufacturerNames = new Hashtable(); > Enumeration names; > String str; > double bal; > > manufacturerNames.put("Allen","Apex Tool Group"); > manufacturerNames.put("Armstrong","Apex Tool Group"); > manufacturerNames.put("Campbell","Apex Tool Group"); > manufacturerNames.put("Lubriplate","Apex Tool Group"); > manufacturerNames.put("Delta","Apex Tool Group"); > manufacturerNames.put("Gearwrench","Apex Tool Group"); > manufacturerNames.put("H.K. Porter","Apex Tool Group"); > /*100 MORE*/ > manufacturerNames.put("Stanco","Stanco Mfg"); > manufacturerNames.put("Stanco","Stanco Mfg"); > manufacturerNames.put("Standard Safety","Standard Safety > Equipment Company"); > manufacturerNames.put("Standard Safety","Standard Safety > Equipment Company"); > > > > // Show all balances in hash table. > names = manufacturerNames.keys(); > Dataset dataFileContent = > sqlContext.load("com.databricks.spark.csv", > options); > > > while(names.hasMoreElements()) { > str = (String) names.nextElement(); > dataFileContent=dataFileContent.withColumn("ManufacturerSource", > regexp_replace(col("ManufacturerSource"),str,manufacturerNames.get(str). > toString())); > } > dataFileContent.show(); > > I got to know that the amount of data is too huge for regexp_replace so > got a solution to use UDF > http://stackoverflow.com/questions/43413513/issue-in- > regex-replace-in-apache-spark-java > > > Method 2 (UDF) > > List data2 = Arrays.asList( > RowFactory.create("Allen", "Apex Tool Group"), > RowFactory.create("Armstrong","Apex Tool Group"), > RowFactory.create("DeWALT","StanleyBlack") > ); > > StructType schema2 = new StructType(new StructField[] { > new StructField("label2", DataTypes.StringType, false, > Metadata.empty()), > new StructField("sentence2", DataTypes.StringType, false, > Metadata.empty()) > }); > Dataset sentenceDataFrame2 = spark.createDataFrame(data2, > schema2); > > UDF2 contains = new UDF2 Boolean>() { > private static final long serialVersionUID = -5239951370238629896L; > > @Override > public Boolean call(String t1, String t2) throws Exception { > return t1.contains(t2); > } > }; > spark.udf().register("contains", contains, DataTypes.BooleanType); > > UDF3 replaceWithTerm = new > UDF3 () { > private static final long serialVersionUID = -2882956931420910207L; > > @Override > public String call(String t1, String t2, String t3) throws > Exception { > return t1.replaceAll(t2, t3); > } > }; > spark.udf().register("replaceWithTerm", replaceWithTerm, > DataTypes.StringType); > > Dataset joined = sentenceDataFrame.join(sentenceDataFrame2, > callUDF("contains", sentenceDataFrame.col("sentence"), > sentenceDataFrame2.col("label2"))) > .withColumn("sentence_replaced", > callUDF("replaceWithTerm", sentenceDataFrame.col("sentence"), > sentenceDataFrame2.col("label2"), sentenceDataFrame2.col("sentence2"))) > .select(col("sentence_replaced")); > > joined.show(false); > } > > > Input > > Allen Armstrong nishanth hemanth Allen > shivu Armstrong nishanth > shree shivu DeWALT > > Replacement of words > The word in LHS has to replace with the words in RHS given in the input > sentence > Allen => Apex Tool Group > Armstrong => Apex Tool Group > DeWALT => StanleyBlack > >Output > > +-+--+-+ > ---++ > |label|sentence_replaced > | > +-+--+-+ > ---++ > |0|Apex Tool Group Armstrong nishanth hemanth Apex Tool Group > | > |0|Allen Apex Tool Group nishanth hemanth Allen > | > |1|shivu Apex Tool Group nishanth > | > |2|shree shivu StanleyBlack > | > +-+--+-+ > ---++ > > Expected Output > +-+--+-+ > ---++ > |label| sentence_replaced >| >
Synonym handling replacement issue with UDF in Apache Spark
I am facing a major issue on replacement of Synonyms in my DataSet. I am trying to replace the synonym of the Brand names to its equivalent names. I have tried 2 methods to solve this issue. Method 1 (regexp_replace) Here i am using the regexp_replace method. Hashtable manufacturerNames = new Hashtable(); Enumeration names; String str; double bal; manufacturerNames.put("Allen","Apex Tool Group"); manufacturerNames.put("Armstrong","Apex Tool Group"); manufacturerNames.put("Campbell","Apex Tool Group"); manufacturerNames.put("Lubriplate","Apex Tool Group"); manufacturerNames.put("Delta","Apex Tool Group"); manufacturerNames.put("Gearwrench","Apex Tool Group"); manufacturerNames.put("H.K. Porter","Apex Tool Group"); /*100 MORE*/ manufacturerNames.put("Stanco","Stanco Mfg"); manufacturerNames.put("Stanco","Stanco Mfg"); manufacturerNames.put("Standard Safety","Standard Safety Equipment Company"); manufacturerNames.put("Standard Safety","Standard Safety Equipment Company"); // Show all balances in hash table. names = manufacturerNames.keys(); Dataset dataFileContent = sqlContext.load("com.databricks.spark.csv", options); while(names.hasMoreElements()) { str = (String) names.nextElement(); dataFileContent=dataFileContent.withColumn("ManufacturerSource", regexp_replace(col("ManufacturerSource"),str,manufacturerNames.get(str).toString())); } dataFileContent.show(); I got to know that the amount of data is too huge for regexp_replace so got a solution to use UDFhttp://stackoverflow.com/questions/43413513/issue-in-regex-replace-in-apache-spark-java Method 2 (UDF) List data2 = Arrays.asList( RowFactory.create("Allen", "Apex Tool Group"), RowFactory.create("Armstrong","Apex Tool Group"), RowFactory.create("DeWALT","StanleyBlack") ); StructType schema2 = new StructType(new StructField[] { new StructField("label2", DataTypes.StringType, false, Metadata.empty()), new StructField("sentence2", DataTypes.StringType, false, Metadata.empty()) }); Dataset sentenceDataFrame2 = spark.createDataFrame(data2, schema2); UDF2contains = new UDF2 () { private static final long serialVersionUID = -5239951370238629896L; @Override public Boolean call(String t1, String t2) throws Exception { return t1.contains(t2); } }; spark.udf().register("contains", contains, DataTypes.BooleanType); UDF3 replaceWithTerm = new UDF3 () { private static final long serialVersionUID = -2882956931420910207L; @Override public String call(String t1, String t2, String t3) throws Exception { return t1.replaceAll(t2, t3); } }; spark.udf().register("replaceWithTerm", replaceWithTerm, DataTypes.StringType); Dataset joined = sentenceDataFrame.join(sentenceDataFrame2, callUDF("contains", sentenceDataFrame.col("sentence"), sentenceDataFrame2.col("label2"))) .withColumn("sentence_replaced", callUDF("replaceWithTerm", sentenceDataFrame.col("sentence"), sentenceDataFrame2.col("label2"), sentenceDataFrame2.col("sentence2"))) .select(col("sentence_replaced")); joined.show(false);} Input Allen Armstrong nishanth hemanth Allenshivu Armstrong nishanthshree shivu DeWALT Replacement of wordsThe word in LHS has to replace with the words in RHS given in the input sentenceAllen => Apex Tool GroupArmstrong => Apex Tool GroupDeWALT => StanleyBlack Output +-+--+-+---++ |label|sentence_replaced | +-+--+-+---++ |0 |Apex Tool Group Armstrong nishanth hemanth Apex Tool Group | |0 |Allen Apex Tool Group nishanth hemanth Allen | |1 |shivu Apex Tool Group nishanth | |2 |shree shivu StanleyBlack | +-+--+-+---++ Expected Output +-+--+-+---++ |label| sentence_replaced | +-+--+-+---++ |0 |Apex Tool Group Apex Tool Group nishanth hemanth Apex Tool Group | |1 |shivu Apex Tool Group nishanth