协同过滤算法:
基于物品的协同过滤算法主要有两步:
1、计算物品之间的相似度:可依据物品共现次数、余弦夹角、欧氏距离这三种方法计算得到物品之间的相似度。
2、根据物品的相似度和用户的历史购买记录给用户生成推荐列表
最终推荐的是什么物品,是由推荐度决定的。
核心:找出所有两两同时被购买商品出现的次数, 现在其中有用户购买了其中一个商品,推荐该商品组合另外一件商品
项目包结构:

项目第一步:模拟数据并建立对应的数据库表
//创建用户表
create table s_user(
id int primary key auto_increment,
name varchar(20),
age int,
phone varchar(20)
);
insert into s_user values(10001,'jake',20,'15023453003'),(10002,'rose',22,'18923452343'),(10003,'tom',21,'15113453001'),(10004,'briup',22,'18823452456'),(10005,'kevin',24,'15925671003'),(10006,'patel',28,'15983432459');
//创建商品表
create table s_product(
id int primary key auto_increment,
name varchar(20),
price double,
descrition varchar(100),
kc double
);
insert into s_product values(20001,'hadoop',89,'bigdata',1000),(20002,'hbase',36,'bigdata',110),(20003,'mysql',58,'bigdata',190),(20004,'sqoop',28,'bigdata',70),(20005,'flume',34,'bigdata',109),(20006,'kafka',29,'bigdata',78),(20007,'hive',31,'bigdata',83);
//创建订单表
create table s_order(
id int primary key auto_increment,
name varchar(20),
order_date timestamp default current_timestamp on update current_timestamp,
user_id int references s_user(id)
);
insert into s_order(id,name,user_id) values(1,'briup_store',10001),(2,'briup_store',10002),(3,'briup_store',10003),(4,'briup_store',10004),(5,'briup_store',10005),(6,'briup_store',10006),(7,'briup_store',10007);
//创建订单表和用户表之间的桥表
create table order_line(
order_id int references s_order(id),
product_id int references s_product(id),
num double,
primary key(order_id,product_id)
);
insert into order_line values(1,20001,1),(1,20002,1),(1,20005,1),(1,20006,1),(1,20007,1),(2,20003,1),(2,20004,1),(2,20006,1),(3,20002,1),(3,20007,1),(4,20001,1),(4,20002,1),(4,20005,1),(4,20006,1),(5,20001,1),(6,20004,1),(6,20007,1);
//创建最终形成商品推荐结果表
create table recommend(
uid int references s_user(id),
gid int references s_product(id),
nums double,
primary key(uid,gid)
);
第二步:将mysql中的数据迁移到hdfs分布式文件系统中
sqoop import --connect jdbc:mysql://192.168.43.158:3306/briup --username root --password root --delete-target-dir --target-dir /user/hdfs/recommend --query 'select s.user_id,d.product_id,d.num from s_order s,order_line d where s.id=d.order_id and $CONDITIONS' --m 1
第三步:开始编程
@原始数据:用户 商品id 购买次数
10001 20001 1
10001 20002 1
10001 20005 1
10001 20006 1
10001 20007 1
10002 20003 1
10002 20004 1
10002 20006 1
10003 20002 1
10003 20007 1
10004 20001 1
10004 20002 1
10004 20005 1
10004 20006 1
10005 20001 1
10006 20004 1
10006 20007 1
step1:计算用户购买商品的列表
? 数据来源于原始数据。
package com.briup.mr.one;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;
public class UserBuyGoodsList{
public static class UserBuyGoodsListMapper extends Mapper<LongWritable, Text, Text, Text> {
private Text outK = new Text();
private Text outV = new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] line = value.toString().split("\t");
outK.set(line[0]);
outV.set(line[1]);
context.write(outK, outV);
}
}
public static class UserBuyGoodsListReducer extends Reducer<Text, Text, Text, Text> {
private Text outV = new Text();
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
StringBuffer sb = new StringBuffer();
for (Text value : values) {
sb.append(value.toString() + ",");
}
sb.setLength(sb.length() - 1);
outV.set(sb.toString());
context.write(key, new Text(sb.toString()));
outV.clear();
}
}
}
结果数据:
10001 20001,20005,20006,20007,20002
10002 20006,20003,20004
10003 20002,20007
10004 20001,20002,20005,20006
10005 20001
10006 20004,20007
step2:计算商品的共现关系
数据来源:第1步的计算结果
package com.briup.mr.two;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class GoodsCooccurrenceList {
public static class GoodsCooccurrenceListMapper extends Mapper<Text, Text, Text, NullWritable> {
private StringBuffer sb = new StringBuffer();
private Text outK = new Text();
@Override
protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
String[] line = value.toString().split(",");
for (String s : line) {
for (String s1 : line) {
sb.append(s).append("\t").append(s1);
outK.set(sb.toString());
context.write(outK, NullWritable.get());
sb.setLength(0);
outK.clear();
}
}
}
}
}
计算结果:
20001 20001
20001 20002
20001 20005
20001 20006
20001 20007
20001 20001
20001 20006
20001 20005
20001 20002
20002 20007
20002 20001
20002 20005
20002 20006
20002 20007
20002 20002
20002 20006
20002 20005
... ...
step3: 计算商品的共现次数(共现矩阵)
数据来源:第2步的结果
package com.briup.mr.three;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
public class GoodsCooccurrenceMatrix {
public static class GoodsCooccurrenceMatrixMapper extends Mapper<Text, NullWritable, Text, Text> {
private Text outK = new Text();
private Text outV = new Text();
@Override
protected void map(Text key, NullWritable value, Context context) throws IOException, InterruptedException {
String[] line = key.toString().split("\t");
outK.set(line[0]);
outV.set(line[1]);
context.write(outK, outV);
}
}
public static class GoodsCooccurrenceMatrixReducer extends Reducer<Text, Text, Text, Text> {
private Map<String, Integer> map = new HashMap<String, Integer>();
private StringBuffer sb = new StringBuffer();
private Text outV = new Text();
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
for (Text val : values) {
if (map.containsKey(val.toString())){
map.put(val.toString(),map.get(val.toString())+1);
}else {
map.put(val.toString(),1);
}
}
for (Map.Entry<String, Integer> en : map.entrySet()) {
sb.append(en.getKey()).append(":").append(en.getValue()).append(",");
}
sb.setLength(sb.length()-1);
outV.set(sb.toString());
context.write(key,outV);
sb.setLength(0);
map.clear();
outV.clear();
}
}
}
计算结果:
20001 20001:3,20002:2,20005:2,20006:2,20007:1
20002 20001:2,20002:3,20005:2,20006:2,20007:2
20003 20003:1,20004:1,20006:1
20004 20003:1,20004:2,20006:1,20007:1
20005 20001:2,20002:2,20005:2,20006:2,20007:1
20006 20001:2,20002:2,20003:1,20004:1,20005:2,20006:3,20007:1
20007 20001:1,20002:2,20004:1,20005:1,20006:1,20007:3
step4:计算用户的购买向量
数据来源:第1步的结果或者最原始数据。
package com.briup.mr.four;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
public class UserBuyGoodsVector {
public static class UserBuyGoodsVectorMapper extends Mapper<LongWritable, Text, Text, Text> {
private Text outK = new Text();
private Text outV = new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] line = value.toString().split("\t");
outK.set(line[1]);
outV.set(line[0]);
context.write(outK, outV);
}
}
public static class UserBuyGoodsVectorReducer extends Reducer<Text, Text, Text, Text> {
private Text outV = new Text();
private Map<String, Integer> map = new HashMap<>();
private StringBuffer sb = new StringBuffer();
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
for (Text value : values) {
if (map.containsKey(value.toString())) {
map.put(value.toString(), map.get(value.toString()) + 1);
} else {
map.put(value.toString(), 1);
}
}
for (Map.Entry<String, Integer> en : map.entrySet()) {
sb.append(en.getKey()).append(":").append(en.getValue()).append(",");
}
sb.setLength(sb.length()-1);
outV.set(sb.toString());
context.write(key,outV);
sb.setLength(0);
map.clear();
outV.clear();
}
}
}
结果数据:
20001 10001:1,10004:1,10005:1
20002 10001:1,10003:1,10004:1
20003 10002:1
20004 10002:1,10006:1
20005 10001:1,10004:1
20006 10001:1,10002:1,10004:1
20007 10001:1,10003:1,10006:1
step5:商品共现矩阵乘以用户购买向量,形成临时的推荐结果。
原始数据:第3步和第4步的结果数据
思考:文件的来源,来自于两个文件,第一个是第3步的结果(物品的共现矩阵),第二个文件是第4步的结果(用户的购买向量)。所以在一个MR程序中,需要使用两个自定义Mapper分别处理,然后定义一个自定义Reducer来处理这两个Mapper的中间结果。
GoodsBean类:
package com.briup.mr.five;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class GoodsBean implements WritableComparable<GoodsBean> {
private String g_id;
private int flag;
public GoodsBean() {
}
public GoodsBean(String g_id, int flag) {
this.g_id = g_id;
this.flag = flag;
}
public String getG_id() {
return g_id;
}
public void setG_id(String g_id) {
this.g_id = g_id;
}
public int getFlag() {
return flag;
}
public void setFlag(int flag) {
this.flag = flag;
}
@Override
public int compareTo(GoodsBean o) {
int n = this.g_id.compareTo(o.g_id);
if (n != 0) {
return n;
} else {
return -(this.flag - o.flag);
}
}
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeUTF(g_id);
dataOutput.writeInt(flag);
}
@Override
public void readFields(DataInput dataInput) throws IOException {
this.g_id = dataInput.readUTF();
this.flag = dataInput.readInt();
}
}
mapredecu分区类GoodsPartitioner:
package com.briup.mr.five;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.lib.HashPartitioner;
import org.apache.hadoop.mapreduce.Partitioner;
public class GoodsPartitioner extends Partitioner<GoodsBean, Text> {
@Override
public int getPartition(GoodsBean goodsBean, Text text, int numPartitions) {
return Math.abs(Integer.parseInt(goodsBean.getG_id()) * 127) % numPartitions;
}
}
mapreduce分组类GoodsGroup:
package com.briup.mr.five;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
public class GoodsGroup extends WritableComparator {
public GoodsGroup() {
super(GoodsBean.class, true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
GoodsBean o = (GoodsBean) a;
GoodsBean o1 = (GoodsBean) b;
return o.getG_id().compareTo(o1.getG_id());
}
}
mapreduce类:
package com.briup.mr.five;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.util.*;
public class MultiplyGoodsMatrixAndUserVector {
public static class MultiplyGoodsMatrixAndUserVectorFirstMapper extends Mapper<Text, Text, GoodsBean, Text> {
@Override
protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
context.write(new GoodsBean(key.toString(), 1), value);
}
}
public static class MultiplyGoodsMatrixAndUserVectorSecondMapper extends Mapper<Text, Text, GoodsBean, Text> {
@Override
protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
context.write(new GoodsBean(key.toString(), 0), value);
}
}
public static class MultiplyGoodsMatrixAndUserVectorReducer extends Reducer<GoodsBean, Text, Text, DoubleWritable> {
@Override
protected void reduce(GoodsBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
Iterator<Text> iter = values.iterator();
String[] goods = iter.next().toString().split(",");
while (iter.hasNext()) {
String[] users = iter.next().toString().split(",");
for (String user : users) {
String[] uid_nums = user.split(":");
for (String good : goods) {
String[] gid_nums = good.split(":");
StringBuffer sb = new StringBuffer();
sb.append(uid_nums[0]).append(",").append(gid_nums[0]);
context.write(new Text(sb.toString()), new DoubleWritable(Double.parseDouble(uid_nums[1]) * Double.parseDouble(gid_nums[1])));
sb.setLength(0);
}
}
}
}
}
}
结果数据:
10001,20001 2
10001,20001 2
10001,20001 3
10001,20001 1
10001,20001 2
10001,20002 3
10001,20002 2
10001,20002 2
10001,20002 2
10001,20002 2
... ...
step6:对第5步计算的推荐的零散结果进行求和。
原始数据:第5步的计算结果
package com.briup.mr.six;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class MakeSumForMultiplication {
public static class MakeSumForMultiplicationMapper extends Mapper<Text, DoubleWritable, Text, DoubleWritable> {
@Override
protected void map(Text key, DoubleWritable value, Context context) throws IOException, InterruptedException {
context.write(key, value);
}
}
public static class MakeSumForMultiplicationReducer extends Reducer<Text, DoubleWritable, Text, DoubleWritable> {
@Override
protected void reduce(Text key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException {
double sum = 0;
for (DoubleWritable value : values) {
sum += value.get();
}
context.write(key, new DoubleWritable(sum));
}
}
}
结果数据:
10001,20001 10
10001,20002 11
10001,20003 1
10001,20004 2
10001,20005 9
10001,20006 10
... ...
step7:数据去重,在推荐结果中去掉用户已购买的商品信息。
数据来源: 1.FirstMapper处理用户的购买列表数据。 2.SecondMapper处理第6的推荐结果数据。
javaBean类UserAndGoods:
package com.briup.mr.seven;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class UserAndGoods implements WritableComparable<UserAndGoods> {
private String userId;
private String goodsId;
private int flag;
public UserAndGoods() {
}
public UserAndGoods(String userId, String goodsId, int flag) {
this.userId = userId;
this.goodsId = goodsId;
this.flag = flag;
}
public String getUserId() {
return userId;
}
public void setUserId(String userId) {
this.userId = userId;
}
public String getGoodsId() {
return goodsId;
}
public void setGoodsId(String goodsId) {
this.goodsId = goodsId;
}
@Override
public int compareTo(UserAndGoods o) {
int i = this.getUserId().compareTo(o.getUserId());
if (i != 0) {
return i;
} else return this.getGoodsId().compareTo(o.getGoodsId());
}
@Override
public String toString() {
return "UserAndGoods{" +
"userId='" + userId + '\'' +
", goodsId='" + goodsId + '\'' +
", flag=" + flag +
'}';
}
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeUTF(userId);
dataOutput.writeUTF(goodsId);
dataOutput.writeInt(flag);
}
@Override
public void readFields(DataInput dataInput) throws IOException {
this.userId = dataInput.readUTF();
this.goodsId = dataInput.readUTF();
this.flag = dataInput.readInt();
}
}
mapreduce类:
package com.briup.mr.seven;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
public class DuplicateDataForResult {
public static class DuplicateDataForResultFirstMapper extends Mapper<LongWritable, Text, UserAndGoods, Text> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] line = value.toString().split("\t");
context.write(new UserAndGoods(line[0], line[1], 1), value);
}
}
public static class DuplicateDataForResultSecondMapper extends Mapper<Text, DoubleWritable, UserAndGoods, Text> {
@Override
protected void map(Text key, DoubleWritable value, Context context) throws IOException, InterruptedException {
String[] line = key.toString().split(",");
context.write(new UserAndGoods(line[0], line[1], 0), new Text(key.toString() + "\t" + value.get()));
}
}
public static class DuplicateDataForResultReducer extends Reducer<UserAndGoods, Text, Text, NullWritable> {
int i = 0;
@Override
protected void reduce(UserAndGoods key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
Iterator<Text> iter = values.iterator();
System.out.println((i++) + "--" + key);
Text res = iter.next();
System.out.println(res.toString());
if (!iter.hasNext()) {
System.out.println("有下一个元素");
context.write(res, NullWritable.get());
}
}
}
}
计算结果:
10001 20004 2
10001 20003 1
10002 20002 2
10002 20007 2
10002 20001 2
10002 20005 2
10003 20006 3
10003 20005 3
10003 20001 3
10003 20004 1
10004 20007 5
10004 20004 1
10004 20003 1
10005 20006 2
10005 20002 2
... ...
step8:将推荐结果保存到MySQL数据库中
数据来源:第七步计算结果
package com.briup.mr.eight;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class DataInDB {
public static class DataInDBMapper extends Mapper<Text, NullWritable, RecommendResultBean, NullWritable> {
@Override
protected void map(Text key, NullWritable value, Context context) throws IOException, InterruptedException {
String[] line = key.toString().split("\t");
RecommendResultBean outK = new RecommendResultBean();
outK.setNums(Double.parseDouble(line[1]));
String[] split = line[0].split(",");
outK.setUid(Integer.parseInt(split[0]));
outK.setGid(Integer.parseInt(split[1]));
context.write(outK, NullWritable.get());
}
}
public static class DataInDBReducer extends Reducer<RecommendResultBean, DoubleWritable, RecommendResultBean, NullWritable> {
@Override
protected void reduce(RecommendResultBean key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException {
context.write(key, NullWritable.get());
}
}
}
结果:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-boi4R1TR-1627699308225)(C:\Users\ASUS\AppData\Roaming\Typora\typora-user-images\image-20210729205026043.png)]
Step9:构建job作业流,提交作业到集群运行
package com.briup.mr;
import com.briup.mr.eight.DataInDB;
import com.briup.mr.eight.RecommendResultBean;
import com.briup.mr.five.GoodsBean;
import com.briup.mr.five.GoodsGroup;
import com.briup.mr.five.GoodsPartitioner;
import com.briup.mr.five.MultiplyGoodsMatrixAndUserVector;
import com.briup.mr.four.UserBuyGoodsVector;
import com.briup.mr.one.UserBuyGoodsList;
import com.briup.mr.seven.DuplicateDataForResult;
import com.briup.mr.seven.UserAndGoods;
import com.briup.mr.six.MakeSumForMultiplication;
import com.briup.mr.three.GoodsCooccurrenceMatrix;
import com.briup.mr.two.GoodsCooccurrenceList;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.lib.db.DBConfiguration;
import org.apache.hadoop.mapred.lib.db.DBOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class FinalJob extends Configured implements Tool {
@Override
public int run(String[] strings) throws Exception {
Configuration conf = getConf();
Path input = new Path("/user/zhudz/goods/input.txt");
Path one_output = new Path("/user/zhudz/goods/out11");
Path two_output = new Path("/user/zhudz/goods/out22");
Path three_output = new Path("/user/zhudz/goods/out33");
Path four_output = new Path("/user/zhudz/goods/out44");
Path five_output = new Path("/user/zhudz/goods/out55");
Path six_output = new Path("/user/zhudz/goods/out66");
Path seven_output = new Path("/user/zhudz/goods/out77");
FileSystem fs = FileSystem.get(conf);
if (fs.exists(one_output)) {
fs.delete(one_output, true);
}
if (fs.exists(two_output)) {
fs.delete(two_output, true);
}
if (fs.exists(three_output)) {
fs.delete(three_output, true);
}
if (fs.exists(four_output)) {
fs.delete(four_output, true);
}
if (fs.exists(five_output)) {
fs.delete(five_output, true);
}
if (fs.exists(six_output)) {
fs.delete(six_output, true);
}
if (fs.exists(seven_output)) {
fs.delete(seven_output, true);
}
Job job = Job.getInstance(conf);
job.setJarByClass(this.getClass());
job.setJobName("Step1:计算用户购买商品的列表");
job.setMapperClass(UserBuyGoodsList.UserBuyGoodsListMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setReducerClass(UserBuyGoodsList.UserBuyGoodsListReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
TextInputFormat.addInputPath(job, input);
SequenceFileOutputFormat.setOutputPath(job, one_output);
Job job1 = Job.getInstance(conf);
job1.setJarByClass(this.getClass());
job1.setJobName("Step2:计算商品的共现关系");
job1.setMapperClass(GoodsCooccurrenceList.GoodsCooccurrenceListMapper.class);
job1.setMapOutputKeyClass(Text.class);
job1.setMapOutputValueClass(NullWritable.class);
job1.setOutputKeyClass(Text.class);
job1.setOutputValueClass(NullWritable.class);
job1.setInputFormatClass(SequenceFileInputFormat.class);
job1.setOutputFormatClass(SequenceFileOutputFormat.class);
SequenceFileInputFormat.addInputPath(job1, one_output);
SequenceFileOutputFormat.setOutputPath(job1, two_output);
Job job2 = Job.getInstance(conf);
job2.setJarByClass(this.getClass());
job2.setJobName("Step3:计算商品的共现次数(共现矩阵)");
job2.setMapperClass(GoodsCooccurrenceMatrix.GoodsCooccurrenceMatrixMapper.class);
job2.setMapOutputKeyClass(Text.class);
job2.setMapOutputValueClass(Text.class);
job2.setReducerClass(GoodsCooccurrenceMatrix.GoodsCooccurrenceMatrixReducer.class);
job2.setOutputKeyClass(Text.class);
job2.setOutputValueClass(Text.class);
job2.setInputFormatClass(SequenceFileInputFormat.class);
job2.setOutputFormatClass(SequenceFileOutputFormat.class);
SequenceFileInputFormat.addInputPath(job2, two_output);
SequenceFileOutputFormat.setOutputPath(job2, three_output);
Job job3 = Job.getInstance(conf);
job3.setJarByClass(this.getClass());
job3.setJobName("Step3:计算用户的购买向量");
job3.setMapperClass(UserBuyGoodsVector.UserBuyGoodsVectorMapper.class);
job3.setMapOutputKeyClass(Text.class);
job3.setMapOutputValueClass(Text.class);
job3.setReducerClass(UserBuyGoodsVector.UserBuyGoodsVectorReducer.class);
job3.setOutputKeyClass(Text.class);
job3.setOutputValueClass(Text.class);
job3.setInputFormatClass(TextInputFormat.class);
job3.setOutputFormatClass(SequenceFileOutputFormat.class);
TextInputFormat.addInputPath(job3, input);
SequenceFileOutputFormat.setOutputPath(job3, four_output);
Job job4 = Job.getInstance(conf);
job4.setJarByClass(this.getClass());
job4.setJobName("Step4:商品共现矩阵乘以用户购买向量,形成临时的推荐结果");
MultipleInputs.addInputPath(job4,
three_output,
SequenceFileInputFormat.class, MultiplyGoodsMatrixAndUserVector.MultiplyGoodsMatrixAndUserVectorFirstMapper.class);
MultipleInputs.addInputPath(job4,
four_output,
SequenceFileInputFormat.class, MultiplyGoodsMatrixAndUserVector.MultiplyGoodsMatrixAndUserVectorSecondMapper.class);
job4.setMapOutputKeyClass(GoodsBean.class);
job4.setMapOutputValueClass(Text.class);
job4.setPartitionerClass(GoodsPartitioner.class);
job4.setGroupingComparatorClass(GoodsGroup.class);
job4.setReducerClass(MultiplyGoodsMatrixAndUserVector.MultiplyGoodsMatrixAndUserVectorReducer.class);
job4.setOutputKeyClass(Text.class);
job4.setOutputValueClass(DoubleWritable.class);
job4.setOutputFormatClass(SequenceFileOutputFormat.class);
SequenceFileOutputFormat.setOutputPath(job4, five_output);
Job job5 = Job.getInstance(conf);
job5.setJarByClass(this.getClass());
job5.setJobName("Step6:对第5步计算的推荐的零散结果进行求和");
job5.setMapperClass(MakeSumForMultiplication.MakeSumForMultiplicationMapper.class);
job5.setMapOutputKeyClass(Text.class);
job5.setMapOutputValueClass(DoubleWritable.class);
job5.setReducerClass(MakeSumForMultiplication.MakeSumForMultiplicationReducer.class);
job5.setOutputKeyClass(Text.class);
job5.setOutputValueClass(DoubleWritable.class);
job5.setInputFormatClass(SequenceFileInputFormat.class);
job5.setOutputFormatClass(SequenceFileOutputFormat.class);
SequenceFileInputFormat.addInputPath(job5, five_output);
SequenceFileOutputFormat.setOutputPath(job5, six_output);
Job job6 = Job.getInstance(conf);
job6.setJarByClass(this.getClass());
job6.setJobName("Step7:数据去重,在推荐结果中去掉用户已购买的商品信息");
MultipleInputs.addInputPath(job6,
input,
TextInputFormat.class,
DuplicateDataForResult.DuplicateDataForResultFirstMapper.class);
MultipleInputs.addInputPath(job6,
six_output,
SequenceFileInputFormat.class,
DuplicateDataForResult.DuplicateDataForResultSecondMapper.class);
job6.setMapOutputKeyClass(UserAndGoods.class);
job6.setMapOutputValueClass(Text.class);
job6.setReducerClass(DuplicateDataForResult.DuplicateDataForResultReducer.class);
job6.setOutputKeyClass(Text.class);
job6.setOutputValueClass(NullWritable.class);
job6.setOutputFormatClass(SequenceFileOutputFormat.class);
SequenceFileOutputFormat.setOutputPath(job6, seven_output);
Job job7 = Job.getInstance(conf);
job7.setJarByClass(this.getClass());
job7.setJobName("Step8:将推荐结果保存到MySQL数据库中");
DBConfiguration.configureDB(job7.getConfiguration(), "com.mysql.jdbc.Driver",
"jdbc:mysql://192.168.10.131/briup", "root", "root");
DBOutputFormat.setOutput(job7, "recommend", "uid", "gid", "nums");
job7.setMapperClass(DataInDB.DataInDBMapper.class);
job7.setMapOutputKeyClass(RecommendResultBean.class);
job7.setMapOutputValueClass(NullWritable.class);
job7.setReducerClass(DataInDB.DataInDBReducer.class);
job7.setOutputKeyClass(RecommendResultBean.class);
job7.setOutputValueClass(NullWritable.class);
job7.setInputFormatClass(SequenceFileInputFormat.class);
SequenceFileInputFormat.addInputPath(job7, seven_output);
job7.setOutputFormatClass(DBOutputFormat.class);
ControlledJob contro_job = new ControlledJob(conf);
contro_job.setJob(job);
ControlledJob contro_job1 = new ControlledJob(conf);
contro_job1.setJob(job1);
contro_job1.addDependingJob(contro_job);
ControlledJob contro_job2 = new ControlledJob(conf);
contro_job2.setJob(job2);
contro_job2.addDependingJob(contro_job1);
ControlledJob contro_job3 = new ControlledJob(conf);
contro_job3.setJob(job3);
ControlledJob contro_job4 = new ControlledJob(conf);
contro_job4.setJob(job4);
contro_job4.addDependingJob(contro_job2);
contro_job4.addDependingJob(contro_job3);
ControlledJob contro_job5 = new ControlledJob(conf);
contro_job5.setJob(job5);
contro_job5.addDependingJob(contro_job4);
ControlledJob contro_job6 = new ControlledJob(conf);
contro_job6.setJob(job6);
contro_job6.addDependingJob(contro_job5);
ControlledJob contro_job7 = new ControlledJob(conf);
contro_job7.setJob(job7);
contro_job7.addDependingJob(contro_job6);
JobControl jobs = new JobControl("goods_recommends");
jobs.addJob(contro_job);
jobs.addJob(contro_job1);
jobs.addJob(contro_job2);
jobs.addJob(contro_job3);
jobs.addJob(contro_job4);
jobs.addJob(contro_job5);
jobs.addJob(contro_job6);
jobs.addJob(contro_job6);
jobs.addJob(contro_job7);
Thread t = new Thread(jobs);
t.start();
while (true) {
for (ControlledJob c : jobs.getRunningJobList()) {
c.getJob().monitorAndPrintJob();
}
if (jobs.allFinished()) break;
}
return 0;
}
public static void main(String[] args) throws Exception {
System.exit(new ToolRunner().run(new FinalJob(), args));
}
}
step9:在集群上编写shell任务脚本,并设置定时执行任务
1、编写将mysql中的数据迁移到hdfs文件系统中的脚本
sudo vi mysqlToHDFS.sh
#完成后添加执行权限
sudo chmod +x mysqlToHDFS.sh
sqoop import --connect jdbc:mysql://localhost:3306/briup --username root --password root --delete-target-dir --target-dir /user/hdfs/recommend --query 'select s.user_id,d.product_id,d.num from s_order s,order_line d where s.id=d.order_id and $CONDITIONS' --m 1
2、编写提交改项目的任务到集群的脚本
sudo vi recomendMR.sh
#完成后添加执行权限
sudo chmod +x recomendMR.sh
yarn jar /home/hdfs/GoodsRecommend-1.0-SNAPSHOT.jar com.briup.mr.FinalJob
3、设置定时任务
crontab -e
0 7 * * 2 sh ~/bin/mysqlToHDFS.sh
30 7 * * 2 sh ~/bin/recomendMR.sh
项目源码:https://gitee.com/zhu-dezhong/recomendGoods
|