I have an error in the first line of the code:
*grunt> rows = LOAD 'cassandra://Keyspace1/Users' USING
org.apache.cassandra.hadoop.*
*pig.CassandraStorage() AS (key, columns: bag {T: tuple(name, value)});*ERROR 1070: Could not resolve org.apache.cassandra.hadoop.pig.CassandraStorage using imports: [, org.apache.pig.builtin., org.apache.pig.impl.builtin.] Perhaps i should add some jars or something to reference to Cassandra Storage !! 2013/3/18 Dan DeCapria, CivicScience <[email protected]> > Try something simple, in interactive mode, such as: > > grunt> rows = LOAD 'cassandra://Keyspace1/Users' USING > org.apache.cassandra.hadoop.pig.CassandraStorage() AS (key, columns: bag > {T: tuple(name, value)}); > grunt> cols = FOREACH rows GENERATE flatten(columns); > grunt> ILLUSTRATE cols; > > Check that 'cols' is of correct form before preceding, and that data is > being accessed by pig through hadoop to Cassandra correctly. > > -Dan > > On Mon, Mar 18, 2013 at 12:20 PM, Mohammed Abdelkhalek < > [email protected]> wrote: > > > How ? > > > > > > 2013/3/18 Dan DeCapria, CivicScience <[email protected]> > > > > > Try fully qualifying CassandraStorage() to org.apache.cassandra.hadoop. > > > pig.CassandraStorage(). > > > > > > -Dan > > > > > > On Mon, Mar 18, 2013 at 11:56 AM, Mohammed Abdelkhalek < > > > [email protected]> wrote: > > > > > > > Thank you for replying, > > > > In fact, i'm trying to run this script: > > > > grunt> rows = LOAD 'cassandra://Keyspace1/Users' USING > > CassandraStorage() > > > > AS (key, columns: bag {T: tuple(name, value)}); > > > > grunt> cols = FOREACH rows GENERATE flatten(columns); > > > > grunt> colnames = FOREACH cols GENERATE $0; > > > > grunt> namegroups = GROUP colnames BY (chararray) $0; > > > > grunt> namecounts = FOREACH namegroups GENERATE COUNT($1), group; > > > > grunt> orderednames = ORDER namecounts BY $0; > > > > grunt> topnames = LIMIT orderednames 50; > > > > grunt> dump topnames; > > > > > > > > but i'm having this error: > > > > ERROR org.apache.pig.tools.grunt.Grunt - ERROR 1070: Could not > resolve > > > > CassandraStorage using imports: [, org.apache.pig.builtin., > > > > org.apache.pig.impl.builtin.] > > > > > > > > > > > > > > > > > > > > 2013/3/18 Dan DeCapria, CivicScience <[email protected]> > > > > > > > > > Storing to Cassandra requires a key->column->value data structure > > from > > > > pig. > > > > > Here is one possible approach, requiring a udf to handle the pig > > > > > formatting interchange to cassandra: > > > > > > > > > > -- sample pig script > > > > > A = LOAD 'foo' USING PigStorage() AS (key:chararray, > name:chararray, > > > > > value:chararray); > > > > > B = FOREACH A GENERATE > > > > > > > > > > > > > > > > > > > > FLATTEN(com.to.udf.ToCassandraUDF(TOTUPLE('PotentialKeyManipulation/',$0,'/toSomethingElse'), > > > > > TOTUPLE($1), $2)); > > > > > STORE B INTO 'cassandra://cassandraNamespace/myColumnFamilyName' > > USING > > > > > org.apache.cassandra.hadoop.pig.CassandraStorage(); > > > > > > > > > > -- sample toCassandraUDF > > > > > package com.to.udf.ToCassandraUDF; > > > > > > > > > > public class ToCassandraUDF extends EvalFunc<Tuple> { > > > > > > > > > > public Tuple exec(Tuple input) throws IOException { > > > > > Tuple row = TupleFactory.getInstance().newTuple(2); > > > > > StringBuffer buf = null; > > > > > Tuple keyParts = (Tuple) input.get(0); > > > > > buf = new StringBuffer(); > > > > > for (Object o : keyParts.getAll()) { > > > > > if (o != null) { > > > > > buf.append(o.toString()); > > > > > } else { > > > > > buf.append("null"); > > > > > } > > > > > } > > > > > String key = buf.toString(); > > > > > Tuple columnParts = (Tuple) input.get(1); > > > > > buf = new StringBuffer(); > > > > > for (Object o : columnParts.getAll()) { > > > > > if (o != null) { > > > > > buf.append(o.toString()); > > > > > } else { > > > > > buf.append("null"); > > > > > } > > > > > } > > > > > String columnName = buf.toString(); > > > > > > > > > > byte[] columnValueBytes = null; > > > > > if (input.size() > 2) { > > > > > Object value = input.get(2); > > > > > columnValueBytes = value.toString().getBytes(); > > > > > } else { > > > > > columnValueBytes = new byte[0]; > > > > > } > > > > > > > > > > Tuple column = TupleFactory.getInstance().newTuple(2); > > > > > column.set(0, new DataByteArray(columnName.getBytes())); > > > > > column.set(1, new DataByteArray(columnValueBytes)); > > > > > > > > > > DataBag bagOfColumns = BagFactory.getInstance().newDefaultBag(); > > > > > bagOfColumns.add(column); > > > > > > > > > > row.set(0, key); > > > > > row.set(1, bagOfColumns); > > > > > > > > > > return row; > > > > > } > > > > > > > > > > public Schema outputSchema(Schema input) { > > > > > try { > > > > > Schema.FieldSchema keyField = new Schema.FieldSchema("key", > > > > > DataType.CHARARRAY); > > > > > Schema.FieldSchema nameField = new Schema.FieldSchema("name", > > > > > DataType.CHARARRAY); > > > > > Schema.FieldSchema valueField = new Schema.FieldSchema("value", > > > > > DataType.BYTEARRAY); > > > > > > > > > > Schema bagSchema = new Schema(); > > > > > bagSchema.add(nameField); > > > > > bagSchema.add(valueField); > > > > > > > > > > Schema.FieldSchema columnsField = new Schema.FieldSchema("columns", > > > > > bagSchema, DataType.BAG); > > > > > > > > > > Schema innerSchema = new Schema(); > > > > > innerSchema.add(keyField); > > > > > innerSchema.add(columnsField); > > > > > > > > > > Schema.FieldSchema cassandraTuple = new Schema.FieldSchema( > > > > > "cassandra_tuple", innerSchema, DataType.TUPLE); > > > > > > > > > > Schema schema = new Schema(cassandraTuple); > > > > > schema.setTwoLevelAccessRequired(true); > > > > > return schema; > > > > > } catch (Exception e) { > > > > > return null; > > > > > } > > > > > } > > > > > } > > > > > > > > > > Hope this helps. > > > > > > > > > > -Dan > > > > > > > > > > On Mon, Mar 18, 2013 at 11:15 AM, Mohammed Abdelkhalek < > > > > > [email protected]> wrote: > > > > > > > > > > > Hi, > > > > > > i'm using hadoop 1.0.4, cassandra 1.2.2 and pig 0.11.0. > > > > > > Can any one help me with an example on how to use pig either for > > > > Storing > > > > > to > > > > > > cassandra from *pig* using Cassandrastorage, or Loading rows from > > > > > cassandra > > > > > > in order to use them with pig. > > > > > > Thanks. > > > > > > > > > > > > -- > > > > > > Mohammed ABDELKHALEK > > > > > > Ingénieur d'état de l’École Mohammadia d'Ingénieurs > > > > > > Technologies et services de l'information > > > > > > Téléphone: +212 6 45 64 65 68 > > > > > > > > > > > > Préservons l'environnement. N'imprimez ce courriel que si > > nécessaire. > > > > > > Please consider the environment before printing this e-mail. > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > Mohammed ABDELKHALEK > > > > Ingénieur d'état de l’École Mohammadia d'Ingénieurs > > > > Technologies et services de l'information > > > > Téléphone: +212 6 45 64 65 68 > > > > > > > > Préservons l'environnement. N'imprimez ce courriel que si nécessaire. > > > > Please consider the environment before printing this e-mail. > > > > > > > > > > > > > > > -- > > Mohammed ABDELKHALEK > > Ingénieur d'état de l’École Mohammadia d'Ingénieurs > > Technologies et services de l'information > > Téléphone: +212 6 45 64 65 68 > > > > Préservons l'environnement. N'imprimez ce courriel que si nécessaire. > > Please consider the environment before printing this e-mail. > > > -- Mohammed ABDELKHALEK Ingénieur d'état de l’École Mohammadia d'Ingénieurs Technologies et services de l'information Téléphone: +212 6 45 64 65 68 Préservons l'environnement. N'imprimez ce courriel que si nécessaire. Please consider the environment before printing this e-mail.
