Here's an example:
http://hortonworks.com/blog/pig-as-duct-tape-part-three-tf-idf-topics-with-cassandra-python-streaming-and-flask/


On Mon, Mar 18, 2013 at 10:36 AM, Dan DeCapria, CivicScience <
[email protected]> wrote:

> So yes, this is just a copy&paste baseline for what someone could use to go
> to/from Cassandra; the idea here is that you do require the correct
> dependencies in the /lib directory, but also the correct aliasing of
> exports and modifications to the config files on master and slaves.
>  Hopefully this is a good starting point for you.
>
> Good luck!
>
> -Dan
>
> On Mon, Mar 18, 2013 at 1:29 PM, Mohammed Abdelkhalek <
> [email protected]> wrote:
>
> > Thank you.
> > i'll try it !
> >
> >
> > 2013/3/18 Dan DeCapria, CivicScience <[email protected]>
> >
> > > Also,
> > >
> > > // ruby script modified for cassandra, from amazon
> > >
> > > #!/usr/bin/ruby
> > > require 'hpricot'
> > > require 'tempfile'
> > >
> > > CONFIG_HEADER = "<?xml version=\"1.0\"?>\n<?xml-stylesheet
> > > type=\"text/xsl\" href=\"configuration.xsl\"?>"
> > >
> > > def parse_config_file(config_file_path)
> > >   ret = []
> > >   if File.exist?(config_file_path) then
> > >     doc = open(config_file_path) { |f| Hpricot(f) }
> > >     (doc/"configuration"/"property").each do |property|
> > >       val = {:name => (property/"name").inner_html, :value =>
> > > (property/"value").inner_html }
> > >       if (property/"final").inner_html != "" then
> > >         val[:final] =  (property/"final").inner_html
> > >       end
> > >       ret << val
> > >     end
> > >   else
> > >     puts "#{config_file_path} does not exist, assuming empty
> > configuration"
> > >   end
> > >   return ret
> > > end
> > >
> > > def dump_config_file(file_name, config)
> > >   open(file_name, 'w') do |f|
> > >     f.puts CONFIG_HEADER
> > >     f.puts '<configuration>'
> > >     for entry in config
> > >       f.print "
> > >  <property><name>#{entry[:name]}</name><value>#{entry[:value]}</value>"
> > >       if entry[:final] then
> > >         f.print "<final>#{entry[:final]}</final>"
> > >       end
> > >       f.puts '</property>'
> > >     end
> > >     f.puts '</configuration>'
> > >   end
> > > end
> > >
> > > def merge_config(default, overwrite)
> > >   for entry in overwrite
> > >     cells = default.select { |x| x[:name] == entry[:name]}
> > >     if cells.size == 0 then
> > >       puts "'#{entry[:name]}': default does not have key, appending
> value
> > > '#{entry[:value]}'"
> > >       default << entry
> > >     elsif cells.size == 1 then
> > >       puts "'#{entry[:name]}': new value '#{entry[:value]}' overwriting
> > > '#{cells[0][:value]}'"
> > >       cells[0].replace(entry)
> > >     else
> > >       raise "'#{entry[:name]}': default has #{cells.size} keys"
> > >     end
> > >   end
> > > end
> > >
> > > def add_cassandra_settings()
> > >   file = "/home/hadoop/conf/mapred-site.xml"
> > >   default = parse_config_file(file)
> > >   merge_config(default,[{:name => "cassandra.thrift.address", :value =>
> > > "THISIPADDRESS" }])
> > >   merge_config(default,[{:name => "cassandra.input.thrift.address",
> > :value
> > > => "THISIPADDRESS" }])
> > >   merge_config(default,[{:name => "cassandra.output.thrift.address",
> > :value
> > > => "THISIPADDRESS" }])
> > >   merge_config(default,[{:name => "cassandra.thrift.port", :value =>
> > "9160"
> > > }])
> > >   merge_config(default,[{:name => "cassandra.input.thrift.port", :value
> > =>
> > > "9160" }])
> > >   merge_config(default,[{:name => "cassandra.output.thrift.port",
> :value
> > =>
> > > "9160" }])
> > >   merge_config(default,[{:name => "cassandra.partitioner.class", :value
> > =>
> > > "org.apache.cassandra.dht.RandomPartitioner" }])
> > >   merge_config(default,[{:name => "cassandra.input.partitioner.class",
> > > :value => "org.apache.cassandra.dht.RandomPartitioner" }])
> > >   merge_config(default,[{:name => "cassandra.output.partitioner.class",
> > > :value => "org.apache.cassandra.dht.RandomPartitioner" }])
> > >   dump_config_file(file + ".new", default)
> > >   if File.exist?(file) then
> > >     File.rename(file, file + ".old")
> > >   end
> > >   File.rename(file + ".new", file)
> > >   puts "Saved #{file} with overwrites. Original saved to #{file}.old"
> > > end
> > >
> > > def warn(msg)
> > >   STDERR.puts "#{Time.now.utc} WARN " + msg
> > > end
> > >
> > > add_cassandra_settings()
> > >
> > >
> > > On Mon, Mar 18, 2013 at 1:24 PM, Dan DeCapria, CivicScience <
> > > [email protected]> wrote:
> > >
> > > > So it appears that you need to configure Cassandra to run with
> hadoop.
> > > >  There are a couple of things you will need to do here.
> > > > In my case, I usually bootstrap these for my hadoop master and
> slaves,
> > > for
> > > > the correct dependencies and pig IP touch points for cassandra.
> > > >
> > > > // install cassandra everywhere
> > > > echo "deb http://debian.datastax.com/community stable main" >
> > > > /tmp/cassandra.sources.list
> > > > sudo mv /tmp/cassandra.sources.list
> > > > /etc/apt/sources.list.d/cassandra.sources.list
> > > > curl -L http://debian.datastax.com/debian/repo_key | sudo apt-key
> add
> > -
> > > > sudo apt-get update
> > > > sudo apt-get install -y cassandra
> > > > sudo /etc/init.d/cassandra stop
> > > > echo
> > > >
> > >
> >
> "HADOOP_CLASSPATH=/usr/share/cassandra/*:/usr/share/cassandra/lib/*:$HADOOP_CLASSPATH"
> > > > >> /home/hadoop/conf/hadoop-user-env.sh
> > > > echo "PIG_INITIAL_ADDRESS=MYIPGOESHERE" >>
> > > > /home/hadoop/conf/hadoop-user-env.sh
> > > > echo "PIG_RPC_PORT=9160" >> /home/hadoop/conf/hadoop-user-env.sh
> > > > echo "PIG_ROOT_LOGGER=DEBUG,console" >>
> > > > /home/hadoop/conf/hadoop-user-env.sh
> > > > echo "PIG_PARTITIONER=org.apache.cassandra.dht.RandomPartitioner" >>
> > > > /home/hadoop/conf/hadoop-user-env.sh
> > > > echo "PIG_WIDEROW_INPUT=true" >> /home/hadoop/conf/hadoop-user-env.sh
> > > >
> > > > //
> > > >
> > > > On Mon, Mar 18, 2013 at 12:57 PM, Mohammed Abdelkhalek <
> > > > [email protected]> wrote:
> > > >
> > > >> I have an error in the first line of the code:
> > > >> *grunt> rows = LOAD 'cassandra://Keyspace1/Users' USING
> > > >> org.apache.cassandra.hadoop.*
> > > >> *pig.CassandraStorage() AS (key, columns: bag {T: tuple(name,
> > value)});*
> > > >>
> > > >> ERROR 1070: Could not resolve
> > > >> org.apache.cassandra.hadoop.pig.CassandraStorage using imports: [,
> > > >> org.apache.pig.builtin., org.apache.pig.impl.builtin.]
> > > >>
> > > >> Perhaps i should add some jars or something to reference to
> Cassandra
> > > >> Storage !!
> > > >>
> > > >>
> > > >> 2013/3/18 Dan DeCapria, CivicScience <[email protected]
> >
> > > >>
> > > >> > Try something simple, in interactive mode, such as:
> > > >> >
> > > >> > grunt> rows = LOAD 'cassandra://Keyspace1/Users' USING
> > > >> > org.apache.cassandra.hadoop.pig.CassandraStorage() AS (key,
> columns:
> > > bag
> > > >> > {T: tuple(name, value)});
> > > >> > grunt> cols = FOREACH rows GENERATE flatten(columns);
> > > >> > grunt> ILLUSTRATE cols;
> > > >> >
> > > >> > Check that 'cols' is of correct form before preceding, and that
> data
> > > is
> > > >> > being accessed by pig through hadoop to Cassandra correctly.
> > > >> >
> > > >> > -Dan
> > > >> >
> > > >> > On Mon, Mar 18, 2013 at 12:20 PM, Mohammed Abdelkhalek <
> > > >> > [email protected]> wrote:
> > > >> >
> > > >> > > How ?
> > > >> > >
> > > >> > >
> > > >> > > 2013/3/18 Dan DeCapria, CivicScience <
> > [email protected]
> > > >
> > > >> > >
> > > >> > > > Try fully qualifying CassandraStorage() to
> > > >> org.apache.cassandra.hadoop.
> > > >> > > > pig.CassandraStorage().
> > > >> > > >
> > > >> > > > -Dan
> > > >> > > >
> > > >> > > > On Mon, Mar 18, 2013 at 11:56 AM, Mohammed Abdelkhalek <
> > > >> > > > [email protected]> wrote:
> > > >> > > >
> > > >> > > > > Thank you for replying,
> > > >> > > > > In fact, i'm trying to run this script:
> > > >> > > > > grunt> rows = LOAD 'cassandra://Keyspace1/Users' USING
> > > >> > > CassandraStorage()
> > > >> > > > > AS (key, columns: bag {T: tuple(name, value)});
> > > >> > > > > grunt> cols = FOREACH rows GENERATE flatten(columns);
> > > >> > > > > grunt> colnames = FOREACH cols GENERATE $0;
> > > >> > > > > grunt> namegroups = GROUP colnames BY (chararray) $0;
> > > >> > > > > grunt> namecounts = FOREACH namegroups GENERATE COUNT($1),
> > > group;
> > > >> > > > > grunt> orderednames = ORDER namecounts BY $0;
> > > >> > > > > grunt> topnames = LIMIT orderednames 50;
> > > >> > > > > grunt> dump topnames;
> > > >> > > > >
> > > >> > > > > but i'm having this error:
> > > >> > > > > ERROR org.apache.pig.tools.grunt.Grunt - ERROR 1070: Could
> not
> > > >> > resolve
> > > >> > > > > CassandraStorage using imports: [, org.apache.pig.builtin.,
> > > >> > > > > org.apache.pig.impl.builtin.]
> > > >> > > > >
> > > >> > > > >
> > > >> > > > >
> > > >> > > > >
> > > >> > > > > 2013/3/18 Dan DeCapria, CivicScience <
> > > >> [email protected]>
> > > >> > > > >
> > > >> > > > > > Storing to Cassandra requires a key->column->value data
> > > >> structure
> > > >> > > from
> > > >> > > > > pig.
> > > >> > > > > >  Here is one possible approach, requiring a udf to handle
> > the
> > > >> pig
> > > >> > > > > > formatting interchange to cassandra:
> > > >> > > > > >
> > > >> > > > > > -- sample pig script
> > > >> > > > > > A = LOAD 'foo' USING PigStorage() AS (key:chararray,
> > > >> > name:chararray,
> > > >> > > > > > value:chararray);
> > > >> > > > > > B = FOREACH A GENERATE
> > > >> > > > > >
> > > >> > > > > >
> > > >> > > > >
> > > >> > > >
> > > >> > >
> > > >> >
> > > >>
> > >
> >
> FLATTEN(com.to.udf.ToCassandraUDF(TOTUPLE('PotentialKeyManipulation/',$0,'/toSomethingElse'),
> > > >> > > > > > TOTUPLE($1), $2));
> > > >> > > > > > STORE B INTO
> > > 'cassandra://cassandraNamespace/myColumnFamilyName'
> > > >> > > USING
> > > >> > > > > > org.apache.cassandra.hadoop.pig.CassandraStorage();
> > > >> > > > > >
> > > >> > > > > > -- sample toCassandraUDF
> > > >> > > > > > package com.to.udf.ToCassandraUDF;
> > > >> > > > > >
> > > >> > > > > > public class ToCassandraUDF extends EvalFunc<Tuple> {
> > > >> > > > > >
> > > >> > > > > > public Tuple exec(Tuple input) throws IOException {
> > > >> > > > > > Tuple row = TupleFactory.getInstance().newTuple(2);
> > > >> > > > > > StringBuffer buf = null;
> > > >> > > > > >  Tuple keyParts = (Tuple) input.get(0);
> > > >> > > > > > buf = new StringBuffer();
> > > >> > > > > > for (Object o : keyParts.getAll()) {
> > > >> > > > > > if (o != null) {
> > > >> > > > > > buf.append(o.toString());
> > > >> > > > > > } else {
> > > >> > > > > > buf.append("null");
> > > >> > > > > > }
> > > >> > > > > > }
> > > >> > > > > > String key = buf.toString();
> > > >> > > > > >  Tuple columnParts = (Tuple) input.get(1);
> > > >> > > > > > buf = new StringBuffer();
> > > >> > > > > > for (Object o : columnParts.getAll()) {
> > > >> > > > > > if (o != null) {
> > > >> > > > > > buf.append(o.toString());
> > > >> > > > > > } else {
> > > >> > > > > > buf.append("null");
> > > >> > > > > > }
> > > >> > > > > > }
> > > >> > > > > > String columnName = buf.toString();
> > > >> > > > > >
> > > >> > > > > > byte[] columnValueBytes = null;
> > > >> > > > > > if (input.size() > 2) {
> > > >> > > > > > Object value = input.get(2);
> > > >> > > > > > columnValueBytes = value.toString().getBytes();
> > > >> > > > > > } else {
> > > >> > > > > > columnValueBytes = new byte[0];
> > > >> > > > > > }
> > > >> > > > > >
> > > >> > > > > > Tuple column = TupleFactory.getInstance().newTuple(2);
> > > >> > > > > > column.set(0, new DataByteArray(columnName.getBytes()));
> > > >> > > > > > column.set(1, new DataByteArray(columnValueBytes));
> > > >> > > > > >
> > > >> > > > > > DataBag bagOfColumns =
> > > BagFactory.getInstance().newDefaultBag();
> > > >> > > > > > bagOfColumns.add(column);
> > > >> > > > > >
> > > >> > > > > > row.set(0, key);
> > > >> > > > > > row.set(1, bagOfColumns);
> > > >> > > > > >
> > > >> > > > > > return row;
> > > >> > > > > > }
> > > >> > > > > >
> > > >> > > > > > public Schema outputSchema(Schema input) {
> > > >> > > > > > try {
> > > >> > > > > > Schema.FieldSchema keyField = new
> Schema.FieldSchema("key",
> > > >> > > > > > DataType.CHARARRAY);
> > > >> > > > > > Schema.FieldSchema nameField = new
> > Schema.FieldSchema("name",
> > > >> > > > > > DataType.CHARARRAY);
> > > >> > > > > > Schema.FieldSchema valueField = new
> > > Schema.FieldSchema("value",
> > > >> > > > > > DataType.BYTEARRAY);
> > > >> > > > > >
> > > >> > > > > > Schema bagSchema = new Schema();
> > > >> > > > > > bagSchema.add(nameField);
> > > >> > > > > > bagSchema.add(valueField);
> > > >> > > > > >
> > > >> > > > > > Schema.FieldSchema columnsField = new
> > > >> Schema.FieldSchema("columns",
> > > >> > > > > > bagSchema, DataType.BAG);
> > > >> > > > > >
> > > >> > > > > > Schema innerSchema = new Schema();
> > > >> > > > > > innerSchema.add(keyField);
> > > >> > > > > > innerSchema.add(columnsField);
> > > >> > > > > >
> > > >> > > > > > Schema.FieldSchema cassandraTuple = new
> Schema.FieldSchema(
> > > >> > > > > > "cassandra_tuple", innerSchema, DataType.TUPLE);
> > > >> > > > > >
> > > >> > > > > > Schema schema = new Schema(cassandraTuple);
> > > >> > > > > > schema.setTwoLevelAccessRequired(true);
> > > >> > > > > > return schema;
> > > >> > > > > > } catch (Exception e) {
> > > >> > > > > > return null;
> > > >> > > > > > }
> > > >> > > > > > }
> > > >> > > > > > }
> > > >> > > > > >
> > > >> > > > > > Hope this helps.
> > > >> > > > > >
> > > >> > > > > > -Dan
> > > >> > > > > >
> > > >> > > > > > On Mon, Mar 18, 2013 at 11:15 AM, Mohammed Abdelkhalek <
> > > >> > > > > > [email protected]> wrote:
> > > >> > > > > >
> > > >> > > > > > > Hi,
> > > >> > > > > > > i'm using hadoop 1.0.4, cassandra 1.2.2 and pig 0.11.0.
> > > >> > > > > > > Can any one help me with an example on how to use pig
> > either
> > > >> for
> > > >> > > > > Storing
> > > >> > > > > > to
> > > >> > > > > > > cassandra from *pig* using Cassandrastorage, or Loading
> > rows
> > > >> from
> > > >> > > > > > cassandra
> > > >> > > > > > > in order to use them with pig.
> > > >> > > > > > > Thanks.
> > > >> > > > > > >
> > > >> > > > > > > --
> > > >> > > > > > > Mohammed ABDELKHALEK
> > > >> > > > > > > Ingénieur d'état de l’École Mohammadia d'Ingénieurs
> > > >> > > > > > > Technologies et services de l'information
> > > >> > > > > > > Téléphone: +212 6 45 64 65 68
> > > >> > > > > > >
> > > >> > > > > > > Préservons l'environnement. N'imprimez ce courriel que
> si
> > > >> > > nécessaire.
> > > >> > > > > > > Please consider the environment before printing this
> > e-mail.
> > > >> > > > > > >
> > > >> > > > > >
> > > >> > > > >
> > > >> > > > >
> > > >> > > > >
> > > >> > > > > --
> > > >> > > > > Mohammed ABDELKHALEK
> > > >> > > > > Ingénieur d'état de l’École Mohammadia d'Ingénieurs
> > > >> > > > > Technologies et services de l'information
> > > >> > > > > Téléphone: +212 6 45 64 65 68
> > > >> > > > >
> > > >> > > > > Préservons l'environnement. N'imprimez ce courriel que si
> > > >> nécessaire.
> > > >> > > > > Please consider the environment before printing this e-mail.
> > > >> > > > >
> > > >> > > >
> > > >> > >
> > > >> > >
> > > >> > >
> > > >> > > --
> > > >> > > Mohammed ABDELKHALEK
> > > >> > > Ingénieur d'état de l’École Mohammadia d'Ingénieurs
> > > >> > > Technologies et services de l'information
> > > >> > > Téléphone: +212 6 45 64 65 68
> > > >> > >
> > > >> > > Préservons l'environnement. N'imprimez ce courriel que si
> > > nécessaire.
> > > >> > > Please consider the environment before printing this e-mail.
> > > >> > >
> > > >> >
> > > >>
> > > >>
> > > >>
> > > >> --
> > > >> Mohammed ABDELKHALEK
> > > >> Ingénieur d'état de l’École Mohammadia d'Ingénieurs
> > > >> Technologies et services de l'information
> > > >> Téléphone: +212 6 45 64 65 68
> > > >>
> > > >> Préservons l'environnement. N'imprimez ce courriel que si
> nécessaire.
> > > >> Please consider the environment before printing this e-mail.
> > > >>
> > > >
> > > >
> > > >
> >
> > --
> > Mohammed ABDELKHALEK
> > Ingénieur d'état de l’École Mohammadia d'Ingénieurs
> > Technologies et services de l'information
> > Téléphone: +212 6 45 64 65 68
> >
> > Préservons l'environnement. N'imprimez ce courriel que si nécessaire.
> > Please consider the environment before printing this e-mail.
> >
>
>
> --
> Dan DeCapria
> CivicScience, Inc.
> Senior Informatics / DM / ML / BI Specialist
>



-- 
Russell Jurney twitter.com/rjurney [email protected] datasyndrome.com

Reply via email to