Bellow is a little snippet of my Java Test Code. Any idea how I implement member wise vector multiplication?
Also notice the idf value for Chinese¹ is 0.0? The calculation is ln((4+1) / (6/4 + 1)) = ln(2) = 0.6931 ?? Also any idea if this code would work in a pipe line? I.E. Is the pipeline smart about using cache() ? Kind regards Andy transformed df printSchema() root |-- id: integer (nullable = false) |-- label: double (nullable = false) |-- words: array (nullable = false) | |-- element: string (containsNull = true) |-- tf: vector (nullable = true) |-- idf: vector (nullable = true) +---+-----+----------------------------+-------------------------+---------- ---------------------------------------------+ |id |label|words |tf |idf | +---+-----+----------------------------+-------------------------+---------- ---------------------------------------------+ |0 |0.0 |[Chinese, Beijing, Chinese] |(7,[1,2],[2.0,1.0]) |(7,[1,2],[0.0,0.9162907318741551]) | |1 |0.0 |[Chinese, Chinese, Shanghai]|(7,[1,4],[2.0,1.0]) |(7,[1,4],[0.0,0.9162907318741551]) | |2 |0.0 |[Chinese, Macao] |(7,[1,6],[1.0,1.0]) |(7,[1,6],[0.0,0.9162907318741551]) | |3 |1.0 |[Tokyo, Japan, Chinese] |(7,[1,3,5],[1.0,1.0,1.0])|(7,[1,3,5],[0.0,0.9162907318741551,0.916290731874 1551])| +---+-----+----------------------------+-------------------------+---------- ---------------------------------------------+ @Test public void test() { DataFrame rawTrainingDF = createTrainingData(); DataFrame trainingDF = runPipleLineTF_IDF(rawTrainingDF); . . . } private DataFrame runPipleLineTF_IDF(DataFrame rawDF) { HashingTF hashingTF = new HashingTF() .setInputCol("words") .setOutputCol("tf") .setNumFeatures(dictionarySize); DataFrame termFrequenceDF = hashingTF.transform(rawDF); termFrequenceDF.cache(); // idf needs to make 2 passes over data set IDFModel idf = new IDF() //.setMinDocFreq(1) // our vocabulary has 6 words we hash into 7 .setInputCol(hashingTF.getOutputCol()) .setOutputCol("idf") .fit(termFrequenceDF); DataFrame tmp = idf.transform(termFrequenceDF); DataFrame ret = tmp.withColumn("features", tmp.col("tf").multiply(tmp.col("idf"))); logger.warn("\ntransformed df printSchema()"); ret.printSchema(); ret.show(false); return ret; } org.apache.spark.sql.AnalysisException: cannot resolve '(tf * idf)' due to data type mismatch: '(tf * idf)' requires numeric type, not vector; private DataFrame createTrainingData() { // make sure we only use dictionarySize words JavaRDD<Row> rdd = javaSparkContext.parallelize(Arrays.asList( // 0 is Chinese // 1 in notChinese RowFactory.create(0, 0.0, Arrays.asList("Chinese", "Beijing", "Chinese")), RowFactory.create(1, 0.0, Arrays.asList("Chinese", "Chinese", "Shanghai")), RowFactory.create(2, 0.0, Arrays.asList("Chinese", "Macao")), RowFactory.create(3, 1.0, Arrays.asList("Tokyo", "Japan", "Chinese")))); return createData(rdd); } private DataFrame createTestData() { JavaRDD<Row> rdd = javaSparkContext.parallelize(Arrays.asList( // 0 is Chinese // 1 in notChinese // "bernoulli" requires label to be IntegerType RowFactory.create(4, 1.0, Arrays.asList("Chinese", "Chinese", "Chinese", "Tokyo", "Japan")))); return createData(rdd); }