Cassandra Map Reduce для данных временных рядов

Как получить доступ к семейству столбцов Cassandra из картографа? В частности, как вы конвертируете аргументы метода map() обратно в ожидаемые типы Java?

Ключ {logType} -> {имя столбца: timeUUID, значение столбца: строка журнала csv, ttl: 1year}


Спасибо @Chris и @rs_atl

I successfully run hadoop job, here is complete code:

package com.xxx.hadoop;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.SortedMap;


import org.apache.cassandra.db.IColumn;
import org.apache.cassandra.hadoop.ColumnFamilyInputFormat;
import org.apache.cassandra.hadoop.ConfigHelper;
import org.apache.cassandra.thrift.SlicePredicate;
import org.apache.cassandra.thrift.SliceRange;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.thrift.TBaseHelper;

import com.xxx.parser.LogParser;
import com.netflix.astyanax.serializers.StringSerializer;

public class LogTypeCounterByDate extends Configured implements Tool {
    private static final String KEYSPACE = "LogKS";
    private static final String COLUMN_FAMILY = "LogBlock";
    private static final String JOB_NAME = "LOG_LINE_COUNT";
    private static final String INPUT_PARTITIONER = "org.apache.cassandra.dht.RandomPartitioner";
    private static final String INPUT_RPC_PORT = "9160";
    private static final String INPUT_INITIAL_ADDRESS = "192.168.1.21";
    private static final String OUTPUT_PATH = "/logOutput/results";

    @Override
    public int run(String[] args) throws Exception {

        //Configuration conf = new Configuration();

        Job job = new Job(getConf(), JOB_NAME);
        job.setJarByClass(LogTypeCounterByDate.class);
        job.setMapperClass(LogTypeCounterByDateMapper.class);       
        job.setReducerClass(LogTypeCounterByDateReducer.class);

        job.setInputFormatClass(ColumnFamilyInputFormat.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);
        job.setNumReduceTasks(1);
        ConfigHelper.setRangeBatchSize(getConf(), 1000);

        /*SlicePredicate predicate = new SlicePredicate().setSlice_range(new SliceRange(ByteBuffer.wrap(new byte[0]), 
                ByteBuffer.wrap(new byte[0]), true, 1));*/
        SliceRange sliceRange = new SliceRange(ByteBuffer.wrap(new byte[0]), 
                ByteBuffer.wrap(new byte[0]), true, 1000);

        SlicePredicate slicePredicate = new SlicePredicate();
        slicePredicate.setSlice_range(sliceRange);


        ConfigHelper.setInputColumnFamily(job.getConfiguration(), KEYSPACE, COLUMN_FAMILY);
        ConfigHelper.setInputRpcPort(job.getConfiguration(), INPUT_RPC_PORT);
        ConfigHelper.setInputInitialAddress(job.getConfiguration(), INPUT_INITIAL_ADDRESS);
        ConfigHelper.setInputPartitioner(job.getConfiguration(), INPUT_PARTITIONER);
        ConfigHelper.setInputSlicePredicate(job.getConfiguration(), slicePredicate);

        FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));

        job.waitForCompletion(true);
        return job.isSuccessful() ? 0 : 1;
    }

    public static void main(String[] args) throws Exception{
        ToolRunner.run(new Configuration(), new LogTypeCounterByDate(), args);
        System.exit(0);
    }


    public static class LogTypeCounterByDateMapper extends Mapper<ByteBuffer, SortedMap<ByteBuffer, IColumn>, Text, LongWritable>
    {

        @SuppressWarnings("rawtypes")
        @Override
        protected void setup(Mapper.Context context){

        }

        @SuppressWarnings({ })
        public void map(ByteBuffer key, SortedMap<ByteBuffer, IColumn> columns, Context context) throws IOException, InterruptedException{
            //String[] lines = columns.;
            String rowkey = StringSerializer.get().fromByteBuffer(TBaseHelper.rightSize(key));  
            Iterator<ByteBuffer> iter = columns.keySet().iterator();
            IColumn column;
            String line;
            LogParser lp = null;

            while(iter.hasNext()){
                column = columns.get(iter.next());
                line = StringSerializer.get().fromByteBuffer(TBaseHelper.rightSize(column.value()));
                lp = new LogParser(line);               
                context.write(new Text(rowkey + "\t" + "LineCount"), new LongWritable(1L));
                context.write(new Text(rowkey + "\t" + "Minutes"), new LongWritable(lp.getTotalDuration()));
            }
        }
    }

    public static class LogTypeCounterByDateReducer extends Reducer<Text, LongWritable, Text, LongWritable>
    {           

        public void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException{
            long total = 0;
            for(LongWritable val : values){
                total += val.get();
            }
            context.write(key, new LongWritable(total));
        }
    }               
}

ConfigHelper.setRangeBatchSize(getConf(), 1000);

        /*SlicePredicate predicate = new   SlicePredicate().setSlice_range(new SliceRange(ByteBuffer.wrap(new byte[0]), 
                ByteBuffer.wrap(new byte[0]), true, 1));*/
        SliceRange sliceRange = new SliceRange(ByteBuffer.wrap(new byte[0]), 
                ByteBuffer.wrap(new byte[0]), true, 1000);

        SlicePredicate slicePredicate = new SlicePredicate();
        slicePredicate.setSlice_range(sliceRange);

приведенный выше код передает только 1000 столбцов для сопоставления для каждой строки, где я хочу каждый раз передавать все столбцы для каждой строки партиями по 1000 столбцов.

Пожалуйста, помогите мне в этом.


person user1793389    schedule 16.11.2012    source источник


Ответы (1)


Данные параметры:

ByteBuffer key;
SortedMap<ByteBuffer, IColumn> columns;

Вы бы использовали:

String rowkey = StringSerializer.get().fromByteBuffer(TBaseHelper.rightSize(key))

чтобы получить десериализованное значение ключа. Обратите внимание, что здесь предполагается, что ключ строки является строкой. Если это какой-то другой тип, вам придется использовать соответствующий класс сериализатора.

Чтобы получить значения столбца, сделайте что-то вроде:

Iterator<ByteBuffer> = columns.keySet().iterator(); 
while (iter.hasNext()) {
    IColumn col = columns.get(iter.next()); 
    xxx colVal = xxxSerializer.get().fromByteBuffer(TBaseHelper.rightSize(col.value()));
}

Где xxx — это тип Java значения столбца, а xxxSerializer — соответствующий сериализатор.

Между прочим, класс TBaseHelper используется для исправления смещения значения во внутреннем массиве байтов до нуля, обеспечивая выполнение предположения, сделанного реализациями сериализатора.

Итак, еще одна вещь... Если вы извлекаете временные ряды, то каждый столбец представляет собой собственное значение временного ряда, и вы захотите включить соответствующую логику сопоставления (например, какие-то математические операции и запись в контекст) внутри итерации петля по столбикам. Если вместо этого у вас есть более статическое семейство столбцов (больше похожее на традиционные таблицы sql), то у вас, вероятно, будет одна запись в контекст для всей строки.

person Chris Gerken    schedule 16.11.2012
comment
Спасибо @Chris, не могли бы вы просмотреть мой код, исходный вопрос обновлен. Я не понимаю, что входит в метод установки как для Mapper, так и для Reducer. - person user1793389; 16.11.2012
comment
Вам не нужно ничего для установки или закрытия методов. Они нужны на тот случай, если вы захотите запустить какую-либо логику до или после обработки разделения ввода соответственно. - person Chris Gerken; 16.11.2012
comment
Спасибо за ваше подтверждение, я запущу свою первую работу Map Reduce для Cassandra и поделюсь результатами здесь. Еще раз спасибо. - person user1793389; 16.11.2012
comment
Я уже поднимал другой вопрос [ссылка] (stackoverflow .com/questions/13330960/), где вы можете найти вопрос в разделе комментариев ответа @rs_atl - person user1793389; 17.11.2012