Here's an example: http://hortonworks.com/blog/pig-as-duct-tape-part-three-tf-idf-topics-with-cassandra-python-streaming-and-flask/
On Mon, Mar 18, 2013 at 10:36 AM, Dan DeCapria, CivicScience < [email protected]> wrote: > So yes, this is just a copy&paste baseline for what someone could use to go > to/from Cassandra; the idea here is that you do require the correct > dependencies in the /lib directory, but also the correct aliasing of > exports and modifications to the config files on master and slaves. > Hopefully this is a good starting point for you. > > Good luck! > > -Dan > > On Mon, Mar 18, 2013 at 1:29 PM, Mohammed Abdelkhalek < > [email protected]> wrote: > > > Thank you. > > i'll try it ! > > > > > > 2013/3/18 Dan DeCapria, CivicScience <[email protected]> > > > > > Also, > > > > > > // ruby script modified for cassandra, from amazon > > > > > > #!/usr/bin/ruby > > > require 'hpricot' > > > require 'tempfile' > > > > > > CONFIG_HEADER = "<?xml version=\"1.0\"?>\n<?xml-stylesheet > > > type=\"text/xsl\" href=\"configuration.xsl\"?>" > > > > > > def parse_config_file(config_file_path) > > > ret = [] > > > if File.exist?(config_file_path) then > > > doc = open(config_file_path) { |f| Hpricot(f) } > > > (doc/"configuration"/"property").each do |property| > > > val = {:name => (property/"name").inner_html, :value => > > > (property/"value").inner_html } > > > if (property/"final").inner_html != "" then > > > val[:final] = (property/"final").inner_html > > > end > > > ret << val > > > end > > > else > > > puts "#{config_file_path} does not exist, assuming empty > > configuration" > > > end > > > return ret > > > end > > > > > > def dump_config_file(file_name, config) > > > open(file_name, 'w') do |f| > > > f.puts CONFIG_HEADER > > > f.puts '<configuration>' > > > for entry in config > > > f.print " > > > <property><name>#{entry[:name]}</name><value>#{entry[:value]}</value>" > > > if entry[:final] then > > > f.print "<final>#{entry[:final]}</final>" > > > end > > > f.puts '</property>' > > > end > > > f.puts '</configuration>' > > > end > > > end > > > > > > def merge_config(default, overwrite) > > > for entry in overwrite > > > cells = default.select { |x| x[:name] == entry[:name]} > > > if cells.size == 0 then > > > puts "'#{entry[:name]}': default does not have key, appending > value > > > '#{entry[:value]}'" > > > default << entry > > > elsif cells.size == 1 then > > > puts "'#{entry[:name]}': new value '#{entry[:value]}' overwriting > > > '#{cells[0][:value]}'" > > > cells[0].replace(entry) > > > else > > > raise "'#{entry[:name]}': default has #{cells.size} keys" > > > end > > > end > > > end > > > > > > def add_cassandra_settings() > > > file = "/home/hadoop/conf/mapred-site.xml" > > > default = parse_config_file(file) > > > merge_config(default,[{:name => "cassandra.thrift.address", :value => > > > "THISIPADDRESS" }]) > > > merge_config(default,[{:name => "cassandra.input.thrift.address", > > :value > > > => "THISIPADDRESS" }]) > > > merge_config(default,[{:name => "cassandra.output.thrift.address", > > :value > > > => "THISIPADDRESS" }]) > > > merge_config(default,[{:name => "cassandra.thrift.port", :value => > > "9160" > > > }]) > > > merge_config(default,[{:name => "cassandra.input.thrift.port", :value > > => > > > "9160" }]) > > > merge_config(default,[{:name => "cassandra.output.thrift.port", > :value > > => > > > "9160" }]) > > > merge_config(default,[{:name => "cassandra.partitioner.class", :value > > => > > > "org.apache.cassandra.dht.RandomPartitioner" }]) > > > merge_config(default,[{:name => "cassandra.input.partitioner.class", > > > :value => "org.apache.cassandra.dht.RandomPartitioner" }]) > > > merge_config(default,[{:name => "cassandra.output.partitioner.class", > > > :value => "org.apache.cassandra.dht.RandomPartitioner" }]) > > > dump_config_file(file + ".new", default) > > > if File.exist?(file) then > > > File.rename(file, file + ".old") > > > end > > > File.rename(file + ".new", file) > > > puts "Saved #{file} with overwrites. Original saved to #{file}.old" > > > end > > > > > > def warn(msg) > > > STDERR.puts "#{Time.now.utc} WARN " + msg > > > end > > > > > > add_cassandra_settings() > > > > > > > > > On Mon, Mar 18, 2013 at 1:24 PM, Dan DeCapria, CivicScience < > > > [email protected]> wrote: > > > > > > > So it appears that you need to configure Cassandra to run with > hadoop. > > > > There are a couple of things you will need to do here. > > > > In my case, I usually bootstrap these for my hadoop master and > slaves, > > > for > > > > the correct dependencies and pig IP touch points for cassandra. > > > > > > > > // install cassandra everywhere > > > > echo "deb http://debian.datastax.com/community stable main" > > > > > /tmp/cassandra.sources.list > > > > sudo mv /tmp/cassandra.sources.list > > > > /etc/apt/sources.list.d/cassandra.sources.list > > > > curl -L http://debian.datastax.com/debian/repo_key | sudo apt-key > add > > - > > > > sudo apt-get update > > > > sudo apt-get install -y cassandra > > > > sudo /etc/init.d/cassandra stop > > > > echo > > > > > > > > > > "HADOOP_CLASSPATH=/usr/share/cassandra/*:/usr/share/cassandra/lib/*:$HADOOP_CLASSPATH" > > > > >> /home/hadoop/conf/hadoop-user-env.sh > > > > echo "PIG_INITIAL_ADDRESS=MYIPGOESHERE" >> > > > > /home/hadoop/conf/hadoop-user-env.sh > > > > echo "PIG_RPC_PORT=9160" >> /home/hadoop/conf/hadoop-user-env.sh > > > > echo "PIG_ROOT_LOGGER=DEBUG,console" >> > > > > /home/hadoop/conf/hadoop-user-env.sh > > > > echo "PIG_PARTITIONER=org.apache.cassandra.dht.RandomPartitioner" >> > > > > /home/hadoop/conf/hadoop-user-env.sh > > > > echo "PIG_WIDEROW_INPUT=true" >> /home/hadoop/conf/hadoop-user-env.sh > > > > > > > > // > > > > > > > > On Mon, Mar 18, 2013 at 12:57 PM, Mohammed Abdelkhalek < > > > > [email protected]> wrote: > > > > > > > >> I have an error in the first line of the code: > > > >> *grunt> rows = LOAD 'cassandra://Keyspace1/Users' USING > > > >> org.apache.cassandra.hadoop.* > > > >> *pig.CassandraStorage() AS (key, columns: bag {T: tuple(name, > > value)});* > > > >> > > > >> ERROR 1070: Could not resolve > > > >> org.apache.cassandra.hadoop.pig.CassandraStorage using imports: [, > > > >> org.apache.pig.builtin., org.apache.pig.impl.builtin.] > > > >> > > > >> Perhaps i should add some jars or something to reference to > Cassandra > > > >> Storage !! > > > >> > > > >> > > > >> 2013/3/18 Dan DeCapria, CivicScience <[email protected] > > > > > >> > > > >> > Try something simple, in interactive mode, such as: > > > >> > > > > >> > grunt> rows = LOAD 'cassandra://Keyspace1/Users' USING > > > >> > org.apache.cassandra.hadoop.pig.CassandraStorage() AS (key, > columns: > > > bag > > > >> > {T: tuple(name, value)}); > > > >> > grunt> cols = FOREACH rows GENERATE flatten(columns); > > > >> > grunt> ILLUSTRATE cols; > > > >> > > > > >> > Check that 'cols' is of correct form before preceding, and that > data > > > is > > > >> > being accessed by pig through hadoop to Cassandra correctly. > > > >> > > > > >> > -Dan > > > >> > > > > >> > On Mon, Mar 18, 2013 at 12:20 PM, Mohammed Abdelkhalek < > > > >> > [email protected]> wrote: > > > >> > > > > >> > > How ? > > > >> > > > > > >> > > > > > >> > > 2013/3/18 Dan DeCapria, CivicScience < > > [email protected] > > > > > > > >> > > > > > >> > > > Try fully qualifying CassandraStorage() to > > > >> org.apache.cassandra.hadoop. > > > >> > > > pig.CassandraStorage(). > > > >> > > > > > > >> > > > -Dan > > > >> > > > > > > >> > > > On Mon, Mar 18, 2013 at 11:56 AM, Mohammed Abdelkhalek < > > > >> > > > [email protected]> wrote: > > > >> > > > > > > >> > > > > Thank you for replying, > > > >> > > > > In fact, i'm trying to run this script: > > > >> > > > > grunt> rows = LOAD 'cassandra://Keyspace1/Users' USING > > > >> > > CassandraStorage() > > > >> > > > > AS (key, columns: bag {T: tuple(name, value)}); > > > >> > > > > grunt> cols = FOREACH rows GENERATE flatten(columns); > > > >> > > > > grunt> colnames = FOREACH cols GENERATE $0; > > > >> > > > > grunt> namegroups = GROUP colnames BY (chararray) $0; > > > >> > > > > grunt> namecounts = FOREACH namegroups GENERATE COUNT($1), > > > group; > > > >> > > > > grunt> orderednames = ORDER namecounts BY $0; > > > >> > > > > grunt> topnames = LIMIT orderednames 50; > > > >> > > > > grunt> dump topnames; > > > >> > > > > > > > >> > > > > but i'm having this error: > > > >> > > > > ERROR org.apache.pig.tools.grunt.Grunt - ERROR 1070: Could > not > > > >> > resolve > > > >> > > > > CassandraStorage using imports: [, org.apache.pig.builtin., > > > >> > > > > org.apache.pig.impl.builtin.] > > > >> > > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > 2013/3/18 Dan DeCapria, CivicScience < > > > >> [email protected]> > > > >> > > > > > > > >> > > > > > Storing to Cassandra requires a key->column->value data > > > >> structure > > > >> > > from > > > >> > > > > pig. > > > >> > > > > > Here is one possible approach, requiring a udf to handle > > the > > > >> pig > > > >> > > > > > formatting interchange to cassandra: > > > >> > > > > > > > > >> > > > > > -- sample pig script > > > >> > > > > > A = LOAD 'foo' USING PigStorage() AS (key:chararray, > > > >> > name:chararray, > > > >> > > > > > value:chararray); > > > >> > > > > > B = FOREACH A GENERATE > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > > > > FLATTEN(com.to.udf.ToCassandraUDF(TOTUPLE('PotentialKeyManipulation/',$0,'/toSomethingElse'), > > > >> > > > > > TOTUPLE($1), $2)); > > > >> > > > > > STORE B INTO > > > 'cassandra://cassandraNamespace/myColumnFamilyName' > > > >> > > USING > > > >> > > > > > org.apache.cassandra.hadoop.pig.CassandraStorage(); > > > >> > > > > > > > > >> > > > > > -- sample toCassandraUDF > > > >> > > > > > package com.to.udf.ToCassandraUDF; > > > >> > > > > > > > > >> > > > > > public class ToCassandraUDF extends EvalFunc<Tuple> { > > > >> > > > > > > > > >> > > > > > public Tuple exec(Tuple input) throws IOException { > > > >> > > > > > Tuple row = TupleFactory.getInstance().newTuple(2); > > > >> > > > > > StringBuffer buf = null; > > > >> > > > > > Tuple keyParts = (Tuple) input.get(0); > > > >> > > > > > buf = new StringBuffer(); > > > >> > > > > > for (Object o : keyParts.getAll()) { > > > >> > > > > > if (o != null) { > > > >> > > > > > buf.append(o.toString()); > > > >> > > > > > } else { > > > >> > > > > > buf.append("null"); > > > >> > > > > > } > > > >> > > > > > } > > > >> > > > > > String key = buf.toString(); > > > >> > > > > > Tuple columnParts = (Tuple) input.get(1); > > > >> > > > > > buf = new StringBuffer(); > > > >> > > > > > for (Object o : columnParts.getAll()) { > > > >> > > > > > if (o != null) { > > > >> > > > > > buf.append(o.toString()); > > > >> > > > > > } else { > > > >> > > > > > buf.append("null"); > > > >> > > > > > } > > > >> > > > > > } > > > >> > > > > > String columnName = buf.toString(); > > > >> > > > > > > > > >> > > > > > byte[] columnValueBytes = null; > > > >> > > > > > if (input.size() > 2) { > > > >> > > > > > Object value = input.get(2); > > > >> > > > > > columnValueBytes = value.toString().getBytes(); > > > >> > > > > > } else { > > > >> > > > > > columnValueBytes = new byte[0]; > > > >> > > > > > } > > > >> > > > > > > > > >> > > > > > Tuple column = TupleFactory.getInstance().newTuple(2); > > > >> > > > > > column.set(0, new DataByteArray(columnName.getBytes())); > > > >> > > > > > column.set(1, new DataByteArray(columnValueBytes)); > > > >> > > > > > > > > >> > > > > > DataBag bagOfColumns = > > > BagFactory.getInstance().newDefaultBag(); > > > >> > > > > > bagOfColumns.add(column); > > > >> > > > > > > > > >> > > > > > row.set(0, key); > > > >> > > > > > row.set(1, bagOfColumns); > > > >> > > > > > > > > >> > > > > > return row; > > > >> > > > > > } > > > >> > > > > > > > > >> > > > > > public Schema outputSchema(Schema input) { > > > >> > > > > > try { > > > >> > > > > > Schema.FieldSchema keyField = new > Schema.FieldSchema("key", > > > >> > > > > > DataType.CHARARRAY); > > > >> > > > > > Schema.FieldSchema nameField = new > > Schema.FieldSchema("name", > > > >> > > > > > DataType.CHARARRAY); > > > >> > > > > > Schema.FieldSchema valueField = new > > > Schema.FieldSchema("value", > > > >> > > > > > DataType.BYTEARRAY); > > > >> > > > > > > > > >> > > > > > Schema bagSchema = new Schema(); > > > >> > > > > > bagSchema.add(nameField); > > > >> > > > > > bagSchema.add(valueField); > > > >> > > > > > > > > >> > > > > > Schema.FieldSchema columnsField = new > > > >> Schema.FieldSchema("columns", > > > >> > > > > > bagSchema, DataType.BAG); > > > >> > > > > > > > > >> > > > > > Schema innerSchema = new Schema(); > > > >> > > > > > innerSchema.add(keyField); > > > >> > > > > > innerSchema.add(columnsField); > > > >> > > > > > > > > >> > > > > > Schema.FieldSchema cassandraTuple = new > Schema.FieldSchema( > > > >> > > > > > "cassandra_tuple", innerSchema, DataType.TUPLE); > > > >> > > > > > > > > >> > > > > > Schema schema = new Schema(cassandraTuple); > > > >> > > > > > schema.setTwoLevelAccessRequired(true); > > > >> > > > > > return schema; > > > >> > > > > > } catch (Exception e) { > > > >> > > > > > return null; > > > >> > > > > > } > > > >> > > > > > } > > > >> > > > > > } > > > >> > > > > > > > > >> > > > > > Hope this helps. > > > >> > > > > > > > > >> > > > > > -Dan > > > >> > > > > > > > > >> > > > > > On Mon, Mar 18, 2013 at 11:15 AM, Mohammed Abdelkhalek < > > > >> > > > > > [email protected]> wrote: > > > >> > > > > > > > > >> > > > > > > Hi, > > > >> > > > > > > i'm using hadoop 1.0.4, cassandra 1.2.2 and pig 0.11.0. > > > >> > > > > > > Can any one help me with an example on how to use pig > > either > > > >> for > > > >> > > > > Storing > > > >> > > > > > to > > > >> > > > > > > cassandra from *pig* using Cassandrastorage, or Loading > > rows > > > >> from > > > >> > > > > > cassandra > > > >> > > > > > > in order to use them with pig. > > > >> > > > > > > Thanks. > > > >> > > > > > > > > > >> > > > > > > -- > > > >> > > > > > > Mohammed ABDELKHALEK > > > >> > > > > > > Ingénieur d'état de l’École Mohammadia d'Ingénieurs > > > >> > > > > > > Technologies et services de l'information > > > >> > > > > > > Téléphone: +212 6 45 64 65 68 > > > >> > > > > > > > > > >> > > > > > > Préservons l'environnement. N'imprimez ce courriel que > si > > > >> > > nécessaire. > > > >> > > > > > > Please consider the environment before printing this > > e-mail. > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > -- > > > >> > > > > Mohammed ABDELKHALEK > > > >> > > > > Ingénieur d'état de l’École Mohammadia d'Ingénieurs > > > >> > > > > Technologies et services de l'information > > > >> > > > > Téléphone: +212 6 45 64 65 68 > > > >> > > > > > > > >> > > > > Préservons l'environnement. N'imprimez ce courriel que si > > > >> nécessaire. > > > >> > > > > Please consider the environment before printing this e-mail. > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > > >> > > > > > >> > > -- > > > >> > > Mohammed ABDELKHALEK > > > >> > > Ingénieur d'état de l’École Mohammadia d'Ingénieurs > > > >> > > Technologies et services de l'information > > > >> > > Téléphone: +212 6 45 64 65 68 > > > >> > > > > > >> > > Préservons l'environnement. N'imprimez ce courriel que si > > > nécessaire. > > > >> > > Please consider the environment before printing this e-mail. > > > >> > > > > > >> > > > > >> > > > >> > > > >> > > > >> -- > > > >> Mohammed ABDELKHALEK > > > >> Ingénieur d'état de l’École Mohammadia d'Ingénieurs > > > >> Technologies et services de l'information > > > >> Téléphone: +212 6 45 64 65 68 > > > >> > > > >> Préservons l'environnement. N'imprimez ce courriel que si > nécessaire. > > > >> Please consider the environment before printing this e-mail. > > > >> > > > > > > > > > > > > > > > > -- > > Mohammed ABDELKHALEK > > Ingénieur d'état de l’École Mohammadia d'Ingénieurs > > Technologies et services de l'information > > Téléphone: +212 6 45 64 65 68 > > > > Préservons l'environnement. N'imprimez ce courriel que si nécessaire. > > Please consider the environment before printing this e-mail. > > > > > -- > Dan DeCapria > CivicScience, Inc. > Senior Informatics / DM / ML / BI Specialist > -- Russell Jurney twitter.com/rjurney [email protected] datasyndrome.com
