Also,
// ruby script modified for cassandra, from amazon
#!/usr/bin/ruby
require 'hpricot'
require 'tempfile'
CONFIG_HEADER = "<?xml version=\"1.0\"?>\n<?xml-stylesheet
type=\"text/xsl\" href=\"configuration.xsl\"?>"
def parse_config_file(config_file_path)
ret = []
if File.exist?(config_file_path) then
doc = open(config_file_path) { |f| Hpricot(f) }
(doc/"configuration"/"property").each do |property|
val = {:name => (property/"name").inner_html, :value =>
(property/"value").inner_html }
if (property/"final").inner_html != "" then
val[:final] = (property/"final").inner_html
end
ret << val
end
else
puts "#{config_file_path} does not exist, assuming empty configuration"
end
return ret
end
def dump_config_file(file_name, config)
open(file_name, 'w') do |f|
f.puts CONFIG_HEADER
f.puts '<configuration>'
for entry in config
f.print "
<property><name>#{entry[:name]}</name><value>#{entry[:value]}</value>"
if entry[:final] then
f.print "<final>#{entry[:final]}</final>"
end
f.puts '</property>'
end
f.puts '</configuration>'
end
end
def merge_config(default, overwrite)
for entry in overwrite
cells = default.select { |x| x[:name] == entry[:name]}
if cells.size == 0 then
puts "'#{entry[:name]}': default does not have key, appending value
'#{entry[:value]}'"
default << entry
elsif cells.size == 1 then
puts "'#{entry[:name]}': new value '#{entry[:value]}' overwriting
'#{cells[0][:value]}'"
cells[0].replace(entry)
else
raise "'#{entry[:name]}': default has #{cells.size} keys"
end
end
end
def add_cassandra_settings()
file = "/home/hadoop/conf/mapred-site.xml"
default = parse_config_file(file)
merge_config(default,[{:name => "cassandra.thrift.address", :value =>
"THISIPADDRESS" }])
merge_config(default,[{:name => "cassandra.input.thrift.address", :value
=> "THISIPADDRESS" }])
merge_config(default,[{:name => "cassandra.output.thrift.address", :value
=> "THISIPADDRESS" }])
merge_config(default,[{:name => "cassandra.thrift.port", :value => "9160"
}])
merge_config(default,[{:name => "cassandra.input.thrift.port", :value =>
"9160" }])
merge_config(default,[{:name => "cassandra.output.thrift.port", :value =>
"9160" }])
merge_config(default,[{:name => "cassandra.partitioner.class", :value =>
"org.apache.cassandra.dht.RandomPartitioner" }])
merge_config(default,[{:name => "cassandra.input.partitioner.class",
:value => "org.apache.cassandra.dht.RandomPartitioner" }])
merge_config(default,[{:name => "cassandra.output.partitioner.class",
:value => "org.apache.cassandra.dht.RandomPartitioner" }])
dump_config_file(file + ".new", default)
if File.exist?(file) then
File.rename(file, file + ".old")
end
File.rename(file + ".new", file)
puts "Saved #{file} with overwrites. Original saved to #{file}.old"
end
def warn(msg)
STDERR.puts "#{Time.now.utc} WARN " + msg
end
add_cassandra_settings()
On Mon, Mar 18, 2013 at 1:24 PM, Dan DeCapria, CivicScience <
[email protected]> wrote:
> So it appears that you need to configure Cassandra to run with hadoop.
> There are a couple of things you will need to do here.
> In my case, I usually bootstrap these for my hadoop master and slaves, for
> the correct dependencies and pig IP touch points for cassandra.
>
> // install cassandra everywhere
> echo "deb http://debian.datastax.com/community stable main" >
> /tmp/cassandra.sources.list
> sudo mv /tmp/cassandra.sources.list
> /etc/apt/sources.list.d/cassandra.sources.list
> curl -L http://debian.datastax.com/debian/repo_key | sudo apt-key add -
> sudo apt-get update
> sudo apt-get install -y cassandra
> sudo /etc/init.d/cassandra stop
> echo
> "HADOOP_CLASSPATH=/usr/share/cassandra/*:/usr/share/cassandra/lib/*:$HADOOP_CLASSPATH"
> >> /home/hadoop/conf/hadoop-user-env.sh
> echo "PIG_INITIAL_ADDRESS=MYIPGOESHERE" >>
> /home/hadoop/conf/hadoop-user-env.sh
> echo "PIG_RPC_PORT=9160" >> /home/hadoop/conf/hadoop-user-env.sh
> echo "PIG_ROOT_LOGGER=DEBUG,console" >>
> /home/hadoop/conf/hadoop-user-env.sh
> echo "PIG_PARTITIONER=org.apache.cassandra.dht.RandomPartitioner" >>
> /home/hadoop/conf/hadoop-user-env.sh
> echo "PIG_WIDEROW_INPUT=true" >> /home/hadoop/conf/hadoop-user-env.sh
>
> //
>
> On Mon, Mar 18, 2013 at 12:57 PM, Mohammed Abdelkhalek <
> [email protected]> wrote:
>
>> I have an error in the first line of the code:
>> *grunt> rows = LOAD 'cassandra://Keyspace1/Users' USING
>> org.apache.cassandra.hadoop.*
>> *pig.CassandraStorage() AS (key, columns: bag {T: tuple(name, value)});*
>>
>> ERROR 1070: Could not resolve
>> org.apache.cassandra.hadoop.pig.CassandraStorage using imports: [,
>> org.apache.pig.builtin., org.apache.pig.impl.builtin.]
>>
>> Perhaps i should add some jars or something to reference to Cassandra
>> Storage !!
>>
>>
>> 2013/3/18 Dan DeCapria, CivicScience <[email protected]>
>>
>> > Try something simple, in interactive mode, such as:
>> >
>> > grunt> rows = LOAD 'cassandra://Keyspace1/Users' USING
>> > org.apache.cassandra.hadoop.pig.CassandraStorage() AS (key, columns: bag
>> > {T: tuple(name, value)});
>> > grunt> cols = FOREACH rows GENERATE flatten(columns);
>> > grunt> ILLUSTRATE cols;
>> >
>> > Check that 'cols' is of correct form before preceding, and that data is
>> > being accessed by pig through hadoop to Cassandra correctly.
>> >
>> > -Dan
>> >
>> > On Mon, Mar 18, 2013 at 12:20 PM, Mohammed Abdelkhalek <
>> > [email protected]> wrote:
>> >
>> > > How ?
>> > >
>> > >
>> > > 2013/3/18 Dan DeCapria, CivicScience <[email protected]>
>> > >
>> > > > Try fully qualifying CassandraStorage() to
>> org.apache.cassandra.hadoop.
>> > > > pig.CassandraStorage().
>> > > >
>> > > > -Dan
>> > > >
>> > > > On Mon, Mar 18, 2013 at 11:56 AM, Mohammed Abdelkhalek <
>> > > > [email protected]> wrote:
>> > > >
>> > > > > Thank you for replying,
>> > > > > In fact, i'm trying to run this script:
>> > > > > grunt> rows = LOAD 'cassandra://Keyspace1/Users' USING
>> > > CassandraStorage()
>> > > > > AS (key, columns: bag {T: tuple(name, value)});
>> > > > > grunt> cols = FOREACH rows GENERATE flatten(columns);
>> > > > > grunt> colnames = FOREACH cols GENERATE $0;
>> > > > > grunt> namegroups = GROUP colnames BY (chararray) $0;
>> > > > > grunt> namecounts = FOREACH namegroups GENERATE COUNT($1), group;
>> > > > > grunt> orderednames = ORDER namecounts BY $0;
>> > > > > grunt> topnames = LIMIT orderednames 50;
>> > > > > grunt> dump topnames;
>> > > > >
>> > > > > but i'm having this error:
>> > > > > ERROR org.apache.pig.tools.grunt.Grunt - ERROR 1070: Could not
>> > resolve
>> > > > > CassandraStorage using imports: [, org.apache.pig.builtin.,
>> > > > > org.apache.pig.impl.builtin.]
>> > > > >
>> > > > >
>> > > > >
>> > > > >
>> > > > > 2013/3/18 Dan DeCapria, CivicScience <
>> [email protected]>
>> > > > >
>> > > > > > Storing to Cassandra requires a key->column->value data
>> structure
>> > > from
>> > > > > pig.
>> > > > > > Here is one possible approach, requiring a udf to handle the
>> pig
>> > > > > > formatting interchange to cassandra:
>> > > > > >
>> > > > > > -- sample pig script
>> > > > > > A = LOAD 'foo' USING PigStorage() AS (key:chararray,
>> > name:chararray,
>> > > > > > value:chararray);
>> > > > > > B = FOREACH A GENERATE
>> > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> FLATTEN(com.to.udf.ToCassandraUDF(TOTUPLE('PotentialKeyManipulation/',$0,'/toSomethingElse'),
>> > > > > > TOTUPLE($1), $2));
>> > > > > > STORE B INTO 'cassandra://cassandraNamespace/myColumnFamilyName'
>> > > USING
>> > > > > > org.apache.cassandra.hadoop.pig.CassandraStorage();
>> > > > > >
>> > > > > > -- sample toCassandraUDF
>> > > > > > package com.to.udf.ToCassandraUDF;
>> > > > > >
>> > > > > > public class ToCassandraUDF extends EvalFunc<Tuple> {
>> > > > > >
>> > > > > > public Tuple exec(Tuple input) throws IOException {
>> > > > > > Tuple row = TupleFactory.getInstance().newTuple(2);
>> > > > > > StringBuffer buf = null;
>> > > > > > Tuple keyParts = (Tuple) input.get(0);
>> > > > > > buf = new StringBuffer();
>> > > > > > for (Object o : keyParts.getAll()) {
>> > > > > > if (o != null) {
>> > > > > > buf.append(o.toString());
>> > > > > > } else {
>> > > > > > buf.append("null");
>> > > > > > }
>> > > > > > }
>> > > > > > String key = buf.toString();
>> > > > > > Tuple columnParts = (Tuple) input.get(1);
>> > > > > > buf = new StringBuffer();
>> > > > > > for (Object o : columnParts.getAll()) {
>> > > > > > if (o != null) {
>> > > > > > buf.append(o.toString());
>> > > > > > } else {
>> > > > > > buf.append("null");
>> > > > > > }
>> > > > > > }
>> > > > > > String columnName = buf.toString();
>> > > > > >
>> > > > > > byte[] columnValueBytes = null;
>> > > > > > if (input.size() > 2) {
>> > > > > > Object value = input.get(2);
>> > > > > > columnValueBytes = value.toString().getBytes();
>> > > > > > } else {
>> > > > > > columnValueBytes = new byte[0];
>> > > > > > }
>> > > > > >
>> > > > > > Tuple column = TupleFactory.getInstance().newTuple(2);
>> > > > > > column.set(0, new DataByteArray(columnName.getBytes()));
>> > > > > > column.set(1, new DataByteArray(columnValueBytes));
>> > > > > >
>> > > > > > DataBag bagOfColumns = BagFactory.getInstance().newDefaultBag();
>> > > > > > bagOfColumns.add(column);
>> > > > > >
>> > > > > > row.set(0, key);
>> > > > > > row.set(1, bagOfColumns);
>> > > > > >
>> > > > > > return row;
>> > > > > > }
>> > > > > >
>> > > > > > public Schema outputSchema(Schema input) {
>> > > > > > try {
>> > > > > > Schema.FieldSchema keyField = new Schema.FieldSchema("key",
>> > > > > > DataType.CHARARRAY);
>> > > > > > Schema.FieldSchema nameField = new Schema.FieldSchema("name",
>> > > > > > DataType.CHARARRAY);
>> > > > > > Schema.FieldSchema valueField = new Schema.FieldSchema("value",
>> > > > > > DataType.BYTEARRAY);
>> > > > > >
>> > > > > > Schema bagSchema = new Schema();
>> > > > > > bagSchema.add(nameField);
>> > > > > > bagSchema.add(valueField);
>> > > > > >
>> > > > > > Schema.FieldSchema columnsField = new
>> Schema.FieldSchema("columns",
>> > > > > > bagSchema, DataType.BAG);
>> > > > > >
>> > > > > > Schema innerSchema = new Schema();
>> > > > > > innerSchema.add(keyField);
>> > > > > > innerSchema.add(columnsField);
>> > > > > >
>> > > > > > Schema.FieldSchema cassandraTuple = new Schema.FieldSchema(
>> > > > > > "cassandra_tuple", innerSchema, DataType.TUPLE);
>> > > > > >
>> > > > > > Schema schema = new Schema(cassandraTuple);
>> > > > > > schema.setTwoLevelAccessRequired(true);
>> > > > > > return schema;
>> > > > > > } catch (Exception e) {
>> > > > > > return null;
>> > > > > > }
>> > > > > > }
>> > > > > > }
>> > > > > >
>> > > > > > Hope this helps.
>> > > > > >
>> > > > > > -Dan
>> > > > > >
>> > > > > > On Mon, Mar 18, 2013 at 11:15 AM, Mohammed Abdelkhalek <
>> > > > > > [email protected]> wrote:
>> > > > > >
>> > > > > > > Hi,
>> > > > > > > i'm using hadoop 1.0.4, cassandra 1.2.2 and pig 0.11.0.
>> > > > > > > Can any one help me with an example on how to use pig either
>> for
>> > > > > Storing
>> > > > > > to
>> > > > > > > cassandra from *pig* using Cassandrastorage, or Loading rows
>> from
>> > > > > > cassandra
>> > > > > > > in order to use them with pig.
>> > > > > > > Thanks.
>> > > > > > >
>> > > > > > > --
>> > > > > > > Mohammed ABDELKHALEK
>> > > > > > > Ingénieur d'état de l’École Mohammadia d'Ingénieurs
>> > > > > > > Technologies et services de l'information
>> > > > > > > Téléphone: +212 6 45 64 65 68
>> > > > > > >
>> > > > > > > Préservons l'environnement. N'imprimez ce courriel que si
>> > > nécessaire.
>> > > > > > > Please consider the environment before printing this e-mail.
>> > > > > > >
>> > > > > >
>> > > > >
>> > > > >
>> > > > >
>> > > > > --
>> > > > > Mohammed ABDELKHALEK
>> > > > > Ingénieur d'état de l’École Mohammadia d'Ingénieurs
>> > > > > Technologies et services de l'information
>> > > > > Téléphone: +212 6 45 64 65 68
>> > > > >
>> > > > > Préservons l'environnement. N'imprimez ce courriel que si
>> nécessaire.
>> > > > > Please consider the environment before printing this e-mail.
>> > > > >
>> > > >
>> > >
>> > >
>> > >
>> > > --
>> > > Mohammed ABDELKHALEK
>> > > Ingénieur d'état de l’École Mohammadia d'Ingénieurs
>> > > Technologies et services de l'information
>> > > Téléphone: +212 6 45 64 65 68
>> > >
>> > > Préservons l'environnement. N'imprimez ce courriel que si nécessaire.
>> > > Please consider the environment before printing this e-mail.
>> > >
>> >
>>
>>
>>
>> --
>> Mohammed ABDELKHALEK
>> Ingénieur d'état de l’École Mohammadia d'Ingénieurs
>> Technologies et services de l'information
>> Téléphone: +212 6 45 64 65 68
>>
>> Préservons l'environnement. N'imprimez ce courriel que si nécessaire.
>> Please consider the environment before printing this e-mail.
>>
>
>
>
> --
> Dan DeCapria
> CivicScience, Inc.
> Senior Informatics / DM / ML / BI Specialist
>
--
Dan DeCapria
CivicScience, Inc.
Senior Informatics / DM / ML / BI Specialist