Thank you. i'll try it !
2013/3/18 Dan DeCapria, CivicScience <[email protected]> > Also, > > // ruby script modified for cassandra, from amazon > > #!/usr/bin/ruby > require 'hpricot' > require 'tempfile' > > CONFIG_HEADER = "<?xml version=\"1.0\"?>\n<?xml-stylesheet > type=\"text/xsl\" href=\"configuration.xsl\"?>" > > def parse_config_file(config_file_path) > ret = [] > if File.exist?(config_file_path) then > doc = open(config_file_path) { |f| Hpricot(f) } > (doc/"configuration"/"property").each do |property| > val = {:name => (property/"name").inner_html, :value => > (property/"value").inner_html } > if (property/"final").inner_html != "" then > val[:final] = (property/"final").inner_html > end > ret << val > end > else > puts "#{config_file_path} does not exist, assuming empty configuration" > end > return ret > end > > def dump_config_file(file_name, config) > open(file_name, 'w') do |f| > f.puts CONFIG_HEADER > f.puts '<configuration>' > for entry in config > f.print " > <property><name>#{entry[:name]}</name><value>#{entry[:value]}</value>" > if entry[:final] then > f.print "<final>#{entry[:final]}</final>" > end > f.puts '</property>' > end > f.puts '</configuration>' > end > end > > def merge_config(default, overwrite) > for entry in overwrite > cells = default.select { |x| x[:name] == entry[:name]} > if cells.size == 0 then > puts "'#{entry[:name]}': default does not have key, appending value > '#{entry[:value]}'" > default << entry > elsif cells.size == 1 then > puts "'#{entry[:name]}': new value '#{entry[:value]}' overwriting > '#{cells[0][:value]}'" > cells[0].replace(entry) > else > raise "'#{entry[:name]}': default has #{cells.size} keys" > end > end > end > > def add_cassandra_settings() > file = "/home/hadoop/conf/mapred-site.xml" > default = parse_config_file(file) > merge_config(default,[{:name => "cassandra.thrift.address", :value => > "THISIPADDRESS" }]) > merge_config(default,[{:name => "cassandra.input.thrift.address", :value > => "THISIPADDRESS" }]) > merge_config(default,[{:name => "cassandra.output.thrift.address", :value > => "THISIPADDRESS" }]) > merge_config(default,[{:name => "cassandra.thrift.port", :value => "9160" > }]) > merge_config(default,[{:name => "cassandra.input.thrift.port", :value => > "9160" }]) > merge_config(default,[{:name => "cassandra.output.thrift.port", :value => > "9160" }]) > merge_config(default,[{:name => "cassandra.partitioner.class", :value => > "org.apache.cassandra.dht.RandomPartitioner" }]) > merge_config(default,[{:name => "cassandra.input.partitioner.class", > :value => "org.apache.cassandra.dht.RandomPartitioner" }]) > merge_config(default,[{:name => "cassandra.output.partitioner.class", > :value => "org.apache.cassandra.dht.RandomPartitioner" }]) > dump_config_file(file + ".new", default) > if File.exist?(file) then > File.rename(file, file + ".old") > end > File.rename(file + ".new", file) > puts "Saved #{file} with overwrites. Original saved to #{file}.old" > end > > def warn(msg) > STDERR.puts "#{Time.now.utc} WARN " + msg > end > > add_cassandra_settings() > > > On Mon, Mar 18, 2013 at 1:24 PM, Dan DeCapria, CivicScience < > [email protected]> wrote: > > > So it appears that you need to configure Cassandra to run with hadoop. > > There are a couple of things you will need to do here. > > In my case, I usually bootstrap these for my hadoop master and slaves, > for > > the correct dependencies and pig IP touch points for cassandra. > > > > // install cassandra everywhere > > echo "deb http://debian.datastax.com/community stable main" > > > /tmp/cassandra.sources.list > > sudo mv /tmp/cassandra.sources.list > > /etc/apt/sources.list.d/cassandra.sources.list > > curl -L http://debian.datastax.com/debian/repo_key | sudo apt-key add - > > sudo apt-get update > > sudo apt-get install -y cassandra > > sudo /etc/init.d/cassandra stop > > echo > > > "HADOOP_CLASSPATH=/usr/share/cassandra/*:/usr/share/cassandra/lib/*:$HADOOP_CLASSPATH" > > >> /home/hadoop/conf/hadoop-user-env.sh > > echo "PIG_INITIAL_ADDRESS=MYIPGOESHERE" >> > > /home/hadoop/conf/hadoop-user-env.sh > > echo "PIG_RPC_PORT=9160" >> /home/hadoop/conf/hadoop-user-env.sh > > echo "PIG_ROOT_LOGGER=DEBUG,console" >> > > /home/hadoop/conf/hadoop-user-env.sh > > echo "PIG_PARTITIONER=org.apache.cassandra.dht.RandomPartitioner" >> > > /home/hadoop/conf/hadoop-user-env.sh > > echo "PIG_WIDEROW_INPUT=true" >> /home/hadoop/conf/hadoop-user-env.sh > > > > // > > > > On Mon, Mar 18, 2013 at 12:57 PM, Mohammed Abdelkhalek < > > [email protected]> wrote: > > > >> I have an error in the first line of the code: > >> *grunt> rows = LOAD 'cassandra://Keyspace1/Users' USING > >> org.apache.cassandra.hadoop.* > >> *pig.CassandraStorage() AS (key, columns: bag {T: tuple(name, value)});* > >> > >> ERROR 1070: Could not resolve > >> org.apache.cassandra.hadoop.pig.CassandraStorage using imports: [, > >> org.apache.pig.builtin., org.apache.pig.impl.builtin.] > >> > >> Perhaps i should add some jars or something to reference to Cassandra > >> Storage !! > >> > >> > >> 2013/3/18 Dan DeCapria, CivicScience <[email protected]> > >> > >> > Try something simple, in interactive mode, such as: > >> > > >> > grunt> rows = LOAD 'cassandra://Keyspace1/Users' USING > >> > org.apache.cassandra.hadoop.pig.CassandraStorage() AS (key, columns: > bag > >> > {T: tuple(name, value)}); > >> > grunt> cols = FOREACH rows GENERATE flatten(columns); > >> > grunt> ILLUSTRATE cols; > >> > > >> > Check that 'cols' is of correct form before preceding, and that data > is > >> > being accessed by pig through hadoop to Cassandra correctly. > >> > > >> > -Dan > >> > > >> > On Mon, Mar 18, 2013 at 12:20 PM, Mohammed Abdelkhalek < > >> > [email protected]> wrote: > >> > > >> > > How ? > >> > > > >> > > > >> > > 2013/3/18 Dan DeCapria, CivicScience <[email protected] > > > >> > > > >> > > > Try fully qualifying CassandraStorage() to > >> org.apache.cassandra.hadoop. > >> > > > pig.CassandraStorage(). > >> > > > > >> > > > -Dan > >> > > > > >> > > > On Mon, Mar 18, 2013 at 11:56 AM, Mohammed Abdelkhalek < > >> > > > [email protected]> wrote: > >> > > > > >> > > > > Thank you for replying, > >> > > > > In fact, i'm trying to run this script: > >> > > > > grunt> rows = LOAD 'cassandra://Keyspace1/Users' USING > >> > > CassandraStorage() > >> > > > > AS (key, columns: bag {T: tuple(name, value)}); > >> > > > > grunt> cols = FOREACH rows GENERATE flatten(columns); > >> > > > > grunt> colnames = FOREACH cols GENERATE $0; > >> > > > > grunt> namegroups = GROUP colnames BY (chararray) $0; > >> > > > > grunt> namecounts = FOREACH namegroups GENERATE COUNT($1), > group; > >> > > > > grunt> orderednames = ORDER namecounts BY $0; > >> > > > > grunt> topnames = LIMIT orderednames 50; > >> > > > > grunt> dump topnames; > >> > > > > > >> > > > > but i'm having this error: > >> > > > > ERROR org.apache.pig.tools.grunt.Grunt - ERROR 1070: Could not > >> > resolve > >> > > > > CassandraStorage using imports: [, org.apache.pig.builtin., > >> > > > > org.apache.pig.impl.builtin.] > >> > > > > > >> > > > > > >> > > > > > >> > > > > > >> > > > > 2013/3/18 Dan DeCapria, CivicScience < > >> [email protected]> > >> > > > > > >> > > > > > Storing to Cassandra requires a key->column->value data > >> structure > >> > > from > >> > > > > pig. > >> > > > > > Here is one possible approach, requiring a udf to handle the > >> pig > >> > > > > > formatting interchange to cassandra: > >> > > > > > > >> > > > > > -- sample pig script > >> > > > > > A = LOAD 'foo' USING PigStorage() AS (key:chararray, > >> > name:chararray, > >> > > > > > value:chararray); > >> > > > > > B = FOREACH A GENERATE > >> > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > FLATTEN(com.to.udf.ToCassandraUDF(TOTUPLE('PotentialKeyManipulation/',$0,'/toSomethingElse'), > >> > > > > > TOTUPLE($1), $2)); > >> > > > > > STORE B INTO > 'cassandra://cassandraNamespace/myColumnFamilyName' > >> > > USING > >> > > > > > org.apache.cassandra.hadoop.pig.CassandraStorage(); > >> > > > > > > >> > > > > > -- sample toCassandraUDF > >> > > > > > package com.to.udf.ToCassandraUDF; > >> > > > > > > >> > > > > > public class ToCassandraUDF extends EvalFunc<Tuple> { > >> > > > > > > >> > > > > > public Tuple exec(Tuple input) throws IOException { > >> > > > > > Tuple row = TupleFactory.getInstance().newTuple(2); > >> > > > > > StringBuffer buf = null; > >> > > > > > Tuple keyParts = (Tuple) input.get(0); > >> > > > > > buf = new StringBuffer(); > >> > > > > > for (Object o : keyParts.getAll()) { > >> > > > > > if (o != null) { > >> > > > > > buf.append(o.toString()); > >> > > > > > } else { > >> > > > > > buf.append("null"); > >> > > > > > } > >> > > > > > } > >> > > > > > String key = buf.toString(); > >> > > > > > Tuple columnParts = (Tuple) input.get(1); > >> > > > > > buf = new StringBuffer(); > >> > > > > > for (Object o : columnParts.getAll()) { > >> > > > > > if (o != null) { > >> > > > > > buf.append(o.toString()); > >> > > > > > } else { > >> > > > > > buf.append("null"); > >> > > > > > } > >> > > > > > } > >> > > > > > String columnName = buf.toString(); > >> > > > > > > >> > > > > > byte[] columnValueBytes = null; > >> > > > > > if (input.size() > 2) { > >> > > > > > Object value = input.get(2); > >> > > > > > columnValueBytes = value.toString().getBytes(); > >> > > > > > } else { > >> > > > > > columnValueBytes = new byte[0]; > >> > > > > > } > >> > > > > > > >> > > > > > Tuple column = TupleFactory.getInstance().newTuple(2); > >> > > > > > column.set(0, new DataByteArray(columnName.getBytes())); > >> > > > > > column.set(1, new DataByteArray(columnValueBytes)); > >> > > > > > > >> > > > > > DataBag bagOfColumns = > BagFactory.getInstance().newDefaultBag(); > >> > > > > > bagOfColumns.add(column); > >> > > > > > > >> > > > > > row.set(0, key); > >> > > > > > row.set(1, bagOfColumns); > >> > > > > > > >> > > > > > return row; > >> > > > > > } > >> > > > > > > >> > > > > > public Schema outputSchema(Schema input) { > >> > > > > > try { > >> > > > > > Schema.FieldSchema keyField = new Schema.FieldSchema("key", > >> > > > > > DataType.CHARARRAY); > >> > > > > > Schema.FieldSchema nameField = new Schema.FieldSchema("name", > >> > > > > > DataType.CHARARRAY); > >> > > > > > Schema.FieldSchema valueField = new > Schema.FieldSchema("value", > >> > > > > > DataType.BYTEARRAY); > >> > > > > > > >> > > > > > Schema bagSchema = new Schema(); > >> > > > > > bagSchema.add(nameField); > >> > > > > > bagSchema.add(valueField); > >> > > > > > > >> > > > > > Schema.FieldSchema columnsField = new > >> Schema.FieldSchema("columns", > >> > > > > > bagSchema, DataType.BAG); > >> > > > > > > >> > > > > > Schema innerSchema = new Schema(); > >> > > > > > innerSchema.add(keyField); > >> > > > > > innerSchema.add(columnsField); > >> > > > > > > >> > > > > > Schema.FieldSchema cassandraTuple = new Schema.FieldSchema( > >> > > > > > "cassandra_tuple", innerSchema, DataType.TUPLE); > >> > > > > > > >> > > > > > Schema schema = new Schema(cassandraTuple); > >> > > > > > schema.setTwoLevelAccessRequired(true); > >> > > > > > return schema; > >> > > > > > } catch (Exception e) { > >> > > > > > return null; > >> > > > > > } > >> > > > > > } > >> > > > > > } > >> > > > > > > >> > > > > > Hope this helps. > >> > > > > > > >> > > > > > -Dan > >> > > > > > > >> > > > > > On Mon, Mar 18, 2013 at 11:15 AM, Mohammed Abdelkhalek < > >> > > > > > [email protected]> wrote: > >> > > > > > > >> > > > > > > Hi, > >> > > > > > > i'm using hadoop 1.0.4, cassandra 1.2.2 and pig 0.11.0. > >> > > > > > > Can any one help me with an example on how to use pig either > >> for > >> > > > > Storing > >> > > > > > to > >> > > > > > > cassandra from *pig* using Cassandrastorage, or Loading rows > >> from > >> > > > > > cassandra > >> > > > > > > in order to use them with pig. > >> > > > > > > Thanks. > >> > > > > > > > >> > > > > > > -- > >> > > > > > > Mohammed ABDELKHALEK > >> > > > > > > Ingénieur d'état de l’École Mohammadia d'Ingénieurs > >> > > > > > > Technologies et services de l'information > >> > > > > > > Téléphone: +212 6 45 64 65 68 > >> > > > > > > > >> > > > > > > Préservons l'environnement. N'imprimez ce courriel que si > >> > > nécessaire. > >> > > > > > > Please consider the environment before printing this e-mail. > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > > >> > > > > > >> > > > > -- > >> > > > > Mohammed ABDELKHALEK > >> > > > > Ingénieur d'état de l’École Mohammadia d'Ingénieurs > >> > > > > Technologies et services de l'information > >> > > > > Téléphone: +212 6 45 64 65 68 > >> > > > > > >> > > > > Préservons l'environnement. N'imprimez ce courriel que si > >> nécessaire. > >> > > > > Please consider the environment before printing this e-mail. > >> > > > > > >> > > > > >> > > > >> > > > >> > > > >> > > -- > >> > > Mohammed ABDELKHALEK > >> > > Ingénieur d'état de l’École Mohammadia d'Ingénieurs > >> > > Technologies et services de l'information > >> > > Téléphone: +212 6 45 64 65 68 > >> > > > >> > > Préservons l'environnement. N'imprimez ce courriel que si > nécessaire. > >> > > Please consider the environment before printing this e-mail. > >> > > > >> > > >> > >> > >> > >> -- > >> Mohammed ABDELKHALEK > >> Ingénieur d'état de l’École Mohammadia d'Ingénieurs > >> Technologies et services de l'information > >> Téléphone: +212 6 45 64 65 68 > >> > >> Préservons l'environnement. N'imprimez ce courriel que si nécessaire. > >> Please consider the environment before printing this e-mail. > >> > > > > > > > > -- > > Dan DeCapria > > CivicScience, Inc. > > Senior Informatics / DM / ML / BI Specialist > > > > > > -- > Dan DeCapria > CivicScience, Inc. > Senior Informatics / DM / ML / BI Specialist > -- Mohammed ABDELKHALEK Ingénieur d'état de l’École Mohammadia d'Ingénieurs Technologies et services de l'information Téléphone: +212 6 45 64 65 68 Préservons l'environnement. N'imprimez ce courriel que si nécessaire. Please consider the environment before printing this e-mail.
