항공데이터 hbase상 맵리듀스 완료!!!
package MapReduce;
import java.io.IOException;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.Text;
public class CountMapper extends TableMapper<Text, FloatWritable> {
//hbase가 지원하는 table형식의 맵퍼를 상속받는다.
private Text key = new Text(); //text형식의 key로 리듀스에 보낼거고
private FloatWritable output = new FloatWritable(); //float형식의 output으로 리듀스에 보낸다.
@Override
public void map(ImmutableBytesWritable row, Result value, Context context) throws InterruptedException, IOException{
//hbase부터 들어올 데이터,key형식은 byte이고 value는 result(???뭔 타입인진 모르겠다.)
String[] key_columns = (Bytes.toString(value.getRow())).split(",");
key.set(key_columns[0]); //key 세팅
for(Cell c : value.rawCells()){ //각 셀을 돔 빙글뱅글
String qualifier = new String(CellUtil.cloneQualifier(c)); //qualifier 불러오기
if(qualifier.equals("ad")){ //일치하는 qualitfier명
Float f = Float.parseFloat(new String(CellUtil.cloneValue(c)));
output.set(f);
context.write(key, output); //reduce로 보내는 key와 output
}
}
}
}
일단 맵퍼 클래스
- hbase 데이터는 byte형식으로 들어가기 때문에( ts제외 ) hbase.util에서 제공하는 Bytes클래스는 어떤타입이는 byte로 바꾸고 다시 원래로 돌리는 메소드가 존재한다. --Bytes.toString(value.getRow()))
-value.getRow()는 해당 데이터(cell)의 로우키를 바이트형식으로 가져온다.
-지금 key의 경우 항공사번호,항공기번호 로 나뉘어져 있으므로 split해 항공사 번호만 write에 리듀서로 보내준다.
package MapReduce;
import java.io.IOException;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.Text;
public class CountReducer extends TableReducer<Text, FloatWritable, ImmutableBytesWritable>{
private final byte[] COLUMN_FAMILY = "FLIGHT_COUNT".getBytes();
public void reduce(Text key, Iterable<FloatWritable> values, Context context) throws IOException, InterruptedException{
//key(항공사 번호), 도착지연시간 집합으로 구성된 value
float sum = 0;
int count = 0;
Float avg = null;
for(FloatWritable val : values){
sum += val.get();
count++;
}
Put put = new Put(Bytes.toBytes(key.toString())); //key넣고
avg = new Float(sum/count);
put.add(COLUMN_FAMILY,Bytes.toBytes("average"), Bytes.toBytes(avg.toString()));
//quailfier는 average
//System.out.println(key+" "+avg);
context.write(null, put);
}
}
리듀스 클래스
-별건 없고 맵퍼에서 항공사 번호 별로 묶은 ad(출발 지연 시간) 집합을 value로 받고
-key는 항공사 번호
-litrable<타입>은 집합
평균값을 구해서 put 객체로 삽입
wirte할때 왜 key가 없느냐? -put객체에 key,value가 모두 들어 있기 때문...
package MapReduce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.FamilyFilter;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.RowFilter;
import org.apache.hadoop.hbase.filter.SubstringComparator;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
public class countDriver {
public static void main(String[] args) throws Exception{
Configuration config = HBaseConfiguration.create();
Job job = Job.getInstance(config, "HBase mapreduce Test");
job.setJarByClass(countDriver.class); //jar실행, main들어있는 클래스 지정
Scan scan = new Scan(); //scan객체, hbase내 data를 스캔하는 역활, select구문 같은 sql?
scan.setRaw(true); //컬럼 버전을 하나만 최신의 것으로 하나만 가져오는것이 false, 그 반대가 true
scan.setCaching(1000); //??? 메모리 캐싱할 data크기를 지정하는 거 같다. 하드웨어 환경에 따라 지정해주자
scan.setCacheBlocks(false); //맵리듀스 작업을 위해 지정, 이유는 잘??
scan.setMaxVersions(); //스캔이 version값을 최대값으로 지정해야 모든 버젼의 data를 긁어온다.
//특정 데이터 스캔만을위한 필터링을 지정한다.
//Filter filterTitle = new RowFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator("로우키"));
//scan.setFilter(filterTitle);
//특정 데이터 스캔만을위한 필터링을 지정한다.
Filter filterTitle = new FamilyFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator("AIRFLIGHT_DATA"));
scan.setFilter(filterTitle);
//qualifier 필터도 나으하고 timestamp필터도 되고 value로 된다.
//hbase상에서 맵퍼 실행
TableMapReduceUtil.initTableMapperJob("AIRFLIGHT_TABLE", scan,
CountMapper.class, Text.class, FloatWritable.class, job);
//맵 아웃풋 키,value 포맷지정
//reduce의 아웃풋 포맷을 지정안해도 되는 이유는?
//data가 hbase로 들어갈거라...
TableMapReduceUtil.initTableReducerJob("AIRFLIGHT_COUNT_TABLE",
CountReducer.class, job);
job.setNumReduceTasks(1); //리듀서 task수 지정, 리듀스작업을 할 task
job.waitForCompletion(true); //완료될때 까지 기달
System.out.println("맵리듀스 완료!!");
}
}
카운트 드라이버
- TableMapReduceUtil.initTableMapperJob
- TableMapReduceUtil.initTableReducerJob
- input,output 포맷은 TableMapReduceUtil에서 제공하는 클래스를 쓴다.
- 기존 hdfs에서 쓰던 형식에서 리듀스 output 포맷방식 지정만 없을뿐.. 쓰기 쉽다.( hdfs 맵리듀스 )
항공기 번호별로 출발지연시간 평균을 구하는 맵리듀스
-도착지연시간 별로 평균을 구하는 맵리듀스를 구현하기 위해 기존코드를 최대한 활용할까 한다.
**이 맵리듀스는 cpu자원을 많이 안쓴다. count값을 구하는 hbase제공하는 클래스가 있긴 하지만 아직 익히지 않음. insert csv파일할때는 역시나 컴퓨터가 힘들어 죽을라 한다.