MapReduce综合应用案例 — 招聘数据清洗
admin
2024-01-21 04:17:46

根据提示,在右侧编辑器补充代码,对数据按照一定规则进行清洗。

数据说明如下:data.json

数据所在位置:/root/data/data.json

{"id":4,"company_name":"智联招聘网/Zhaopin.com","eduLevel_name":"本科","emplType":"全职","jobName":"大数据工程师010","salary":"20K-30K","createDate":"2019-04-21T12:14:27.000+08:00","endDate":"2019-05-21T12:14:27.000+08:00","city_code":"530","companySize":"1000-9999人","welfare":"","responsibility":"岗位职责:1、负责体系大数据分析的ETL的代码开发及优化;2、...","place":"北京市朝阳区望京阜荣街10号首开广场5层","workingExp":"1-3年"
}
idcompany_nameeduLevel_nameemplTypejobNamesalarycreateDateendDatecity_codecompanySizewelfareresponsibilityplaceworkingExp
id编号公司名称学历要求工作类型工作名称薪资发布时间截止时间城市编码公司规模福利岗位职责地区工作经验

Mysql数据库:

用户名:root; 密码:123123

数据库名:mydb

城市编码表:province

列名类型非空是否自增介绍
city_codevarchar(255)城市编码
city_namevarchar(255)城市名称

HBase数据库:

最终结果表:job 列族:info

清洗规则:

  • 若某个属性为空则删除这条数据;

  • 处理数据中的salary

    1)mK-nK:(m+n)/2; 2)其余即为0

  • 按照MySQLprovince 将城市编码转化为城市名;

  • 将结果存入HBasejob中;

  • 设置数据来源文件路径及清洗后的数据存储路径: 数据来源路径为: /root/data/data.json; 清洗后的数据存放于:HBasejob

(1)DBHelper类代码:

package com;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
public class DBHelper {/********** begin **********/private static final String driver = "com.mysql.jdbc.Driver";private static final String url = "jdbc:mysql://localhost:3306/mydb?useUnicode=true&characterEncoding=UTF-8";private static final String username = "root";private static final String password = "123123";private static Connection conn = null;static {try {Class.forName(driver);} catch (Exception ex) {ex.printStackTrace();}}public static Connection getConnection() {if (conn == null) {try {conn = DriverManager.getConnection(url, username, password);} catch (SQLException e) {e.printStackTrace();}return conn;}return conn;}public static void main(String[] args) {Connection connection = DBHelper.getConnection();}/********** end **********/
}

(2)JsonMap类代码:

package com;
import com.alibaba.fastjson.JSONObject;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.HashMap;
import java.util.Map;
public class JsonMap  extends Mapper {/********** begin **********/Map pro = new HashMap();Put put;@Overrideprotected void setup(Context context) throws IOException, InterruptedException {Connection connection = DBHelper.getConnection();try {Statement statement = connection.createStatement();String sql = "select * from province";ResultSet resultSetA = statement.executeQuery(sql);while (resultSetA.next()) {String city_code = resultSetA.getString(1);String city_name = resultSetA.getString(2);pro.put(city_code, city_name);}} catch (SQLException e) {e.printStackTrace();}}public void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException {String line = value.toString();//解析json数据JSONObject jsonObject = JSONObject.parseObject(line);String[] data = new String[14];data[0] = jsonObject.getString("id");data[1] = jsonObject.getString("company_name");data[2] = jsonObject.getString("eduLevel_name");data[3] = jsonObject.getString("emplType");data[4] = jsonObject.getString("jobName");String salary=jsonObject.getString("salary");if (salary.contains("K-")) {Double a =Double.valueOf(salary.substring(0,salary.indexOf("K")));Double b =Double.valueOf(salary.substring(salary.indexOf("-")+1,salary.lastIndexOf("K")));data[5] = (a+b)/2+"";}else {data[5]="0";}data[6] = jsonObject.getString("createDate");data[7] = jsonObject.getString("endDate");String code = jsonObject.getString("city_code");//data[8] = pro.get(code);data[8] = code;data[9] = jsonObject.getString("companySize");data[10] = jsonObject.getString("welfare");data[11] = jsonObject.getString("responsibility");data[12] = jsonObject.getString("place");data[13] = jsonObject.getString("workingExp");//循环判空for(String i : data) {if(i==null||i.equals("")) {return;}}String columnFamily = "info";put= new Put(data[0].getBytes());put.addColumn(columnFamily.getBytes(), "company_name".getBytes(), data[1].getBytes());put.addColumn(columnFamily.getBytes(), "eduLevel_name".getBytes(), data[2].getBytes());put.addColumn(columnFamily.getBytes(), "emplType".getBytes(), data[3].getBytes());put.addColumn(columnFamily.getBytes(), "jobName".getBytes(), data[4].getBytes());put.addColumn(columnFamily.getBytes(), "salary".getBytes(), data[5].getBytes());put.addColumn(columnFamily.getBytes(), "createDate".getBytes(), data[6].getBytes());put.addColumn(columnFamily.getBytes(), "endDate".getBytes(), data[7].getBytes());put.addColumn(columnFamily.getBytes(), "city_name".getBytes(), data[8].getBytes());put.addColumn(columnFamily.getBytes(), "companySize".getBytes(), data[9].getBytes());put.addColumn(columnFamily.getBytes(), "welfare".getBytes(), data[10].getBytes());put.addColumn(columnFamily.getBytes(), "responsibility".getBytes(), data[11].getBytes());put.addColumn(columnFamily.getBytes(), "place".getBytes(), data[12].getBytes());put.addColumn(columnFamily.getBytes(), "workingExp".getBytes(), data[13].getBytes());context.write(NullWritable.get(), put);}/********** end **********/
}

(3)PhoneLog类代码:

package com;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
public class JsonTest {public static void main(String[] args) throws Exception{Configuration config = HBaseConfiguration.create();//设置zookeeper的配置config.set("hbase.zookeeper.quorum", "127.0.0.1");Connection connection = ConnectionFactory.createConnection(config);Admin admin = connection.getAdmin();TableName tableName = TableName.valueOf("job");boolean isExists = admin.tableExists(tableName);if (!isExists) {TableDescriptorBuilder tableDescriptor = TableDescriptorBuilder.newBuilder(tableName);ColumnFamilyDescriptor family = ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("info")).build();// 构建列族对象tableDescriptor.setColumnFamily(family); // 设置列族admin.createTable(tableDescriptor.build()); // 创建表} else {admin.disableTable(tableName);admin.deleteTable(tableName);TableDescriptorBuilder tableDescriptor = TableDescriptorBuilder.newBuilder(tableName);ColumnFamilyDescriptor family = ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("info")).build();// 构建列族对象tableDescriptor.setColumnFamily(family); // 设置列族admin.createTable(tableDescriptor.build()); // 创建表}/********** begin **********/Job job = Job.getInstance(config);job.setJarByClass(JsonTest.class);job.setMapperClass(JsonMap.class);job.setMapOutputKeyClass(NullWritable.class);//只有map没有reduce,所以设置reduce的数目为0job.setNumReduceTasks(0);//设置数据的输入路径,没有使用参数,直接在程序中写入HDFS的路径FileInputFormat.setInputPaths(job, new Path("/root/data/data.json"));//驱动函数TableMapReduceUtil.initTableReducerJob("job",null, job);TableMapReduceUtil.addDependencyJars(job);job.waitForCompletion(true);/********** end **********/}
}

启动HBASE#start-hbase.sh

相关内容

热门资讯

海东旅游收入达294亿元 本报讯(西海新闻记者 吴予琴)12月18日,记者从“‘十四五’发展成就”系列主题新闻发布会海东市专场...
湖北二本院校名单和排名最新或2...  最新或2023(历届)高考越来越近不仅考试忙家长也忙,忙着帮助孩子选大学。到底给孩子选哪个大学好哪...
湖北省三本大学名单和排名最新或...  为了满足我国两岸四地高考考生及家长了解湖北省高校的办学水平和办学实力,艾瑞深中国校友会网最新发布最...
湖北艺术类大学排名最新或202...  最新或2023(历届)高考一日日临近,又到了一年一度的高校招生季,要想走好高考的第一步,就必须要了...
湖北体育类大学排名最新或202...   最新或2023(历届)高考一日日临近,又到了一年一度的高校招生季,要想走好高考的第一步,就必须要...