빅데이터

항공데이터 hbase상 맵리듀스 완료!!!

romance penguin 2017. 1. 6. 18:31
반응형

package MapReduce;

import java.io.IOException;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.Text;

public class CountMapper extends TableMapper<Text, FloatWritable>  {
//hbase가 지원하는 table형식의 맵퍼를 상속받는다.
   
    private Text key = new Text(); //text형식의 key로 리듀스에 보낼거고
    private FloatWritable output = new FloatWritable(); //float형식의 output으로 리듀스에 보낸다.
   
    @Override
    public void map(ImmutableBytesWritable row, Result value, Context context) throws InterruptedException, IOException{
        //hbase부터 들어올 데이터,key형식은 byte이고 value는 result(???뭔 타입인진 모르겠다.)
       
        String[] key_columns = (Bytes.toString(value.getRow())).split(",");
        key.set(key_columns[0]); //key 세팅
       
        for(Cell c : value.rawCells()){ //각 셀을 돔 빙글뱅글
            String qualifier = new String(CellUtil.cloneQualifier(c)); //qualifier 불러오기
           
            if(qualifier.equals("ad")){ //일치하는 qualitfier명
                Float f = Float.parseFloat(new String(CellUtil.cloneValue(c)));
                output.set(f);
                context.write(key, output); //reduce로 보내는 key와 output
            }
        }           
    }
}

일단 맵퍼 클래스

- hbase 데이터는 byte형식으로 들어가기 때문에( ts제외 ) hbase.util에서 제공하는 Bytes클래스는 어떤타입이는 byte로 바꾸고 다시 원래로 돌리는 메소드가 존재한다. --Bytes.toString(value.getRow()))


-value.getRow()는 해당 데이터(cell)의 로우키를 바이트형식으로 가져온다.


-지금 key의 경우 항공사번호,항공기번호 로 나뉘어져 있으므로 split해 항공사 번호만 write에 리듀서로 보내준다.




package MapReduce;

import java.io.IOException;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.Text;

public class CountReducer extends TableReducer<Text, FloatWritable, ImmutableBytesWritable>{
    private final byte[] COLUMN_FAMILY = "FLIGHT_COUNT".getBytes();
   
    public void reduce(Text key, Iterable<FloatWritable> values, Context context) throws IOException, InterruptedException{
        //key(항공사 번호), 도착지연시간 집합으로 구성된 value
        float sum = 0;
        int count = 0;
       
        Float avg = null;
       
        for(FloatWritable val : values){
            sum += val.get();
            count++;
        }
       
        Put put = new Put(Bytes.toBytes(key.toString())); //key넣고
        avg = new Float(sum/count);
       
        put.add(COLUMN_FAMILY,Bytes.toBytes("average"), Bytes.toBytes(avg.toString()));
        //quailfier는 average
        //System.out.println(key+" "+avg);

        context.write(null, put);
    }

}

리듀스 클래스

-별건 없고 맵퍼에서 항공사 번호 별로 묶은 ad(출발 지연 시간) 집합을 value로 받고

-key는 항공사 번호

-litrable<타입>은 집합

평균값을 구해서 put 객체로 삽입

wirte할때 왜 key가 없느냐? -put객체에 key,value가 모두 들어 있기 때문...




package MapReduce;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.FamilyFilter;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.RowFilter;
import org.apache.hadoop.hbase.filter.SubstringComparator;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;

import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;

public class countDriver {
    public static void main(String[] args) throws Exception{
        Configuration config = HBaseConfiguration.create();
        Job job = Job.getInstance(config, "HBase mapreduce Test");
        job.setJarByClass(countDriver.class); //jar실행, main들어있는 클래스 지정
       
        Scan scan = new Scan(); //scan객체, hbase내 data를 스캔하는 역활, select구문 같은 sql?
        scan.setRaw(true); //컬럼 버전을 하나만 최신의 것으로 하나만 가져오는것이 false, 그 반대가 true
        scan.setCaching(1000); //??? 메모리 캐싱할 data크기를 지정하는 거 같다. 하드웨어 환경에 따라 지정해주자
        scan.setCacheBlocks(false); //맵리듀스 작업을 위해 지정, 이유는 잘??
        scan.setMaxVersions(); //스캔이 version값을 최대값으로 지정해야 모든 버젼의 data를 긁어온다.
       
        //특정 데이터 스캔만을위한 필터링을 지정한다.
        //Filter filterTitle = new RowFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator("로우키"));
        //scan.setFilter(filterTitle);       
        //특정 데이터 스캔만을위한 필터링을 지정한다.
        Filter filterTitle = new FamilyFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator("AIRFLIGHT_DATA"));
        scan.setFilter(filterTitle);       
        //qualifier 필터도 나으하고 timestamp필터도 되고 value로 된다.
               
        //hbase상에서 맵퍼 실행
        TableMapReduceUtil.initTableMapperJob("AIRFLIGHT_TABLE", scan,
                CountMapper.class, Text.class, FloatWritable.class, job);
        //맵 아웃풋 키,value 포맷지정
        //reduce의 아웃풋 포맷을 지정안해도 되는 이유는?
        //data가 hbase로 들어갈거라...

        TableMapReduceUtil.initTableReducerJob("AIRFLIGHT_COUNT_TABLE",
                CountReducer.class, job);
       
        job.setNumReduceTasks(1); //리듀서 task수 지정, 리듀스작업을 할 task
        job.waitForCompletion(true); //완료될때 까지 기달
        System.out.println("맵리듀스 완료!!");
       
    }
}

카운트 드라이버

- TableMapReduceUtil.initTableMapperJob

- TableMapReduceUtil.initTableReducerJob

- input,output 포맷은 TableMapReduceUtil에서 제공하는 클래스를 쓴다.

- 기존 hdfs에서 쓰던 형식에서 리듀스 output 포맷방식 지정만 없을뿐.. 쓰기 쉽다.( hdfs 맵리듀스 )



항공기 번호별로 출발지연시간 평균을 구하는 맵리듀스

-도착지연시간 별로 평균을 구하는 맵리듀스를 구현하기 위해 기존코드를 최대한 활용할까 한다.

**이 맵리듀스는 cpu자원을 많이 안쓴다. count값을 구하는 hbase제공하는 클래스가 있긴 하지만 아직 익히지 않음. insert csv파일할때는 역시나 컴퓨터가 힘들어 죽을라 한다.

반응형