Bellow is a little snippet of my Java Test Code. Any idea how I implement
member wise vector multiplication?

Also notice the idf value for ŒChinese¹ is 0.0? The calculation is ln((4+1)
/ (6/4 + 1)) = ln(2) = 0.6931 ??

Also any idea if this code would work in a pipe line? I.E. Is the pipeline
smart about using cache() ?

Kind regards

Andy

transformed df printSchema()

root

 |-- id: integer (nullable = false)

 |-- label: double (nullable = false)

 |-- words: array (nullable = false)

 |    |-- element: string (containsNull = true)

 |-- tf: vector (nullable = true)

 |-- idf: vector (nullable = true)



+---+-----+----------------------------+-------------------------+----------
---------------------------------------------+

|id |label|words                       |tf                       |idf
|

+---+-----+----------------------------+-------------------------+----------
---------------------------------------------+

|0  |0.0  |[Chinese, Beijing, Chinese] |(7,[1,2],[2.0,1.0])
|(7,[1,2],[0.0,0.9162907318741551])                     |

|1  |0.0  |[Chinese, Chinese, Shanghai]|(7,[1,4],[2.0,1.0])
|(7,[1,4],[0.0,0.9162907318741551])                     |

|2  |0.0  |[Chinese, Macao]            |(7,[1,6],[1.0,1.0])
|(7,[1,6],[0.0,0.9162907318741551])                     |

|3  |1.0  |[Tokyo, Japan, Chinese]
|(7,[1,3,5],[1.0,1.0,1.0])|(7,[1,3,5],[0.0,0.9162907318741551,0.916290731874
1551])|

+---+-----+----------------------------+-------------------------+----------
---------------------------------------------+


    @Test

    public void test() {

        DataFrame rawTrainingDF = createTrainingData();

        DataFrame trainingDF = runPipleLineTF_IDF(rawTrainingDF);

. . .

}

   private DataFrame runPipleLineTF_IDF(DataFrame rawDF) {

        HashingTF hashingTF = new HashingTF()

                                    .setInputCol("words")

                                    .setOutputCol("tf")

                                    .setNumFeatures(dictionarySize);

        

        DataFrame termFrequenceDF = hashingTF.transform(rawDF);

        

        termFrequenceDF.cache(); // idf needs to make 2 passes over data set

        IDFModel idf = new IDF()

                        //.setMinDocFreq(1) // our vocabulary has 6 words we
hash into 7

                        .setInputCol(hashingTF.getOutputCol())

                        .setOutputCol("idf")

                        .fit(termFrequenceDF);



        DataFrame tmp = idf.transform(termFrequenceDF);

        

        DataFrame ret = tmp.withColumn("features",
tmp.col("tf").multiply(tmp.col("idf")));

        logger.warn("\ntransformed df printSchema()");

        ret.printSchema();

        ret.show(false);

        

        return ret;

    }



org.apache.spark.sql.AnalysisException: cannot resolve '(tf * idf)' due to
data type mismatch: '(tf * idf)' requires numeric type, not vector;





    private DataFrame createTrainingData() {

        // make sure we only use dictionarySize words

        JavaRDD<Row> rdd = javaSparkContext.parallelize(Arrays.asList(

                // 0 is Chinese

                // 1 in notChinese

                RowFactory.create(0, 0.0, Arrays.asList("Chinese",
"Beijing", "Chinese")),

                RowFactory.create(1, 0.0, Arrays.asList("Chinese",
"Chinese", "Shanghai")),

                RowFactory.create(2, 0.0, Arrays.asList("Chinese",
"Macao")),

                RowFactory.create(3, 1.0, Arrays.asList("Tokyo", "Japan",
"Chinese"))));

               

        return createData(rdd);

    }

    

    private DataFrame createTestData() {

        JavaRDD<Row> rdd = javaSparkContext.parallelize(Arrays.asList(

                // 0 is Chinese

                // 1 in notChinese

                // "bernoulli" requires label to be IntegerType

                RowFactory.create(4, 1.0, Arrays.asList("Chinese",
"Chinese", "Chinese", "Tokyo", "Japan"))));

        return createData(rdd);

    }


Reply via email to