0%

基于SVD的协同过滤

基于SVD的协同过滤理解和复现的一点记录。

程序入口

  • 日志对象;
  • 读取数据/分割数据集;
  • 创建SVD模型对象/初始化参数;
  • 训练SVD模型;
  • 预测评分并评估模型。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import os
from logger import Logger
from movielens import RatingData
from svd import SVD
import winsound
import pickle
import time

#%% svd协同过滤
# 获取日志对象
timeMarker = time.strftime('%m%d%H%M', time.localtime())
logPath = os.path.join('logs', 'svd' + timeMarker + '.log')
logger = Logger.getLogger(logPath)

# 读取数据集
filepath = os.path.join('ml-1m', 'ratings.dat')
#dataset = RatingData.getRatingDict(logger, filepath, 0.7, False)
#dataset = RatingData.getRatingDict(logger, filepath, 0.9, True)
dataset = RatingData.getSmallRatingData(logger, filepath, 0.7, 0.3, False)
#dataset = RatingData.getSmallRatingData(logger, filepath, 0.7, 0.3, True)
trainset = dataset[RatingData.TRAIN_SET_KEY]
testset = dataset[RatingData.TEST_SET_KEY]

#svd训练
topicNum = 8
svdModel = SVD(logger, trainset, topicNum, lr=0.001, gamma=0.3)
svdModel.train(100, topN=10)

# 保存模型
modelpath = os.path.join('logs', 'svd' + timeMarker + '.pkl')
with open(modelpath, 'wb') as modelfile:
svdModel.logger, logger = None, svdModel.logger # 防止日志线程读锁阻碍pickle保存模型
pickle.dump(svdModel, modelfile)
svdModel.logger = logger
# 读取模型
#del svdModel
#with open(modelpath, 'rb') as modelfile:
# svdModel = pickle.load(modelfile)
# svdModel.logger = logger

#svd预测
svdModel.evaluate(testset, topN=10)
winsound.Beep(500, 500)

各个模块

日志模块见 Item-based CF 实践(历史博客)。

数据加载模块movielens

讲解同样见Item-based CF实践。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
import random as rand
rand.seed(0)
class RatingData(object):
''' movielens的rating.dat的数据集处理类 '''

TRAIN_SET_KEY = 'trainset'
TEST_SET_KEY = 'testset'

@staticmethod
def __loadFile(filepath, logger):
''' 加载数据文件,返回生成器。
Args:
filepath:数据文件的路径
logger:日志记录对象
Returns:
数据文件中,每次按序返回一行数据的迭代器
'''
with open(filepath, 'r', encoding='UTF-8') as fp:
for i, line in enumerate(fp):
yield line.strip('\r\n')
if i % 100000 == 0:
logger.info('加载数据文件生成器 {filepath}({rowNum})'.format(filepath=filepath, rowNum=i))
logger.info('加载数据文件 {filepath} 成功'.format(filepath=filepath))

@staticmethod
def getRatingDict(logger, filepath, proportion=0.7, transport=False):
''' 加载评分数据字典对象,划分为训练集和测试集
Args:
logger:日志记录对象
filepath:数据文件的路径
proportion:数据集训练集的划分比例,剩余的作为测试集
transport:是否将数据集转置,未转置是user-item,转置则是item-user
Returns:
包含trainset和testset的字典对象
'''
trainset = {}
trainsetSize = 0
testset = {}
testsetSize = 0

for line in RatingData.__loadFile(filepath, logger):
# 数据格式是:用户::电影::评分::时间戳
if transport:
dimA = '电影'
item, user, rating, _ = line.split('::') # 转置,item-user
else:
dimA = '用户'
user, item, rating, _ = line.split('::') # 不转置,user-item
user = int(user)
item = int(item)
rating = float(rating)
# 通过比例划分数据集
if rand.random() < proportion:
trainset.setdefault(user, {})
trainset[user][item] = rating
trainsetSize += 1
else:
testset.setdefault(user, {})
testset[user][item] = rating
testsetSize += 1

logger.info('划分训练集和测试集成功!')
logger.info('训练集{dimA}数量 {item}'.format(dimA=dimA, item = len(trainset)))
logger.info('训练集大小 {size}'.format(size=trainsetSize))
logger.info('测试集{dimA}数量 {item}'.format(dimA=dimA, item = len(testset)))
logger.info('测试集大小 {size}'.format(size=testsetSize))

return {RatingData.TRAIN_SET_KEY:trainset, RatingData.TEST_SET_KEY:testset}


@staticmethod
def getSmallRatingData(logger, filepath, proportion=0.7, ratio=0.3, transport=False):
''' 加载评分数据字典对象,划分为训练集和测试集
Args:
logger:日志记录对象
filepath:数据文件的路径
proportion:数据集训练集的划分比例,剩余的作为测试集
ratio:缩小为原数据集的比例数
transport:是否将数据集转置,未转置是user-item,转置则是item-user
Returns:
包含trainset和testset的字典对象
'''
# 获取原数据集
dataset = RatingData.getRatingDict(logger, filepath, proportion, transport)
trainset = dataset[RatingData.TRAIN_SET_KEY]
testset = dataset[RatingData.TEST_SET_KEY]
# 获取用户集,并收缩用户集
userset = set(trainset.keys())
smallUserset = set()
for user in userset:
if rand.random() < ratio:
smallUserset.add(user)
# 获取物品集,并收缩物品集
itemset = set()
for user in userset:
itemset |= set(trainset[user].keys())
ratio *= len(itemset)
itemset.clear()
for user in smallUserset:
itemset |= set(trainset[user].keys())
ratio /= len(itemset)
smallItemset = set()
for item in itemset:
if rand.random() < ratio:
smallItemset.add(item)
# 将训练集数据,限制在缩小的用户、物品集内
smallTrainset = {}
for user in smallUserset:
smallTrainset[user] = {}
for item in trainset[user].keys():
if item not in smallItemset:
continue
smallTrainset[user][item] = trainset[user][item]
# 将测试集数据,限制在缩小的用户、物品集内
smallTestset = {}
for user in smallUserset:
smallTestset[user] = {}
if testset.get(user) == None:
continue
for item in testset[user].keys():
if item not in smallItemset:
continue
smallTestset[user][item] = testset[user][item]

if transport:
dimA = '电影'
dimB = '用户'
else:
dimA = '用户'
dimB = '电影'
logger.info('原数据集[{dimA}*{dimB}]大小:[{lenA}*{lenB}]'.format(dimA=dimA,
dimB=dimB, lenA=len(userset), lenB=len(itemset)))
logger.info('收缩数据集[{dimA}*{dimB}]大小:[{lenA}*{lenB}]'.format(dimA=dimA,
dimB=dimB, lenA=len(smallUserset), lenB=len(smallItemset)))

return {RatingData.TRAIN_SET_KEY:smallTrainset, RatingData.TEST_SET_KEY:smallTestset}

svd模块

初始化模型的参数,用户和物品评分偏置初始化为0,用户和物品主题向量进行的随机初始化。然后,在用训练集进行数据测试,最后评估并评价。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
import numpy as np

class SVD(object):
''' 基于SVD的协同过滤推荐算法 '''

def __init__(self, logger, trainset, topicNum, lr, gamma):
''' 初始化函数
Args:
logger:日志记录对象
trainset:训练集
topicNum:主题个数,奇异值数量
lr:学习率
gamma:正则项权重
'''
self.logger = logger
self.trainset = trainset
self.topicNum = topicNum
self.lr = lr
self.gamma = gamma

# rate = mean + biasUser + biasItem + pUser * qItem
self.mean = 0 # 所有已评分物品的均值
self.biasUser = {} # 用户评分偏置
self.biasItem = {} # 物品评分偏置
self.pUser = {} # 用户对topicNum个主题的偏好向量
self.qItem = {} # 物品在topicNum个主题的分布向量

self.trainMap = {} # 序号到(user,item)的映射,用于训练时,序号到训练样本的映射

self.initModel()
self.logger.info('SVD模型建立完毕!')

def initModel(self):
''' 初始化模型参数:biasUser、biasItem、pUser、qItem '''
# 初始化变量数组、值
itemset = set()
sumRate = 0
lenRate = 0
for user, itemRates in self.trainset.items():
itemset |= set(itemRates.keys())
sumRate += sum(itemRates.values())
lenRate += len(itemRates)
self.biasUser[user] = 0
self.pUser[user] = np.random.rand(self.topicNum) / 10
self.mean = sumRate/lenRate
self.logger.info('初始化SVD,均值:{mu}'.format(mu=self.mean))
self.logger.info('初始化SVD,用户偏置向量为0向量')
self.logger.info('初始化SVD,用户主题矩阵为随机矩阵')

for item in itemset:
self.biasItem[item] = 0
self.qItem[item] = np.random.rand(self.topicNum) / 10
self.logger.info('初始化SVD,物品偏置向量为0向量')
self.logger.info('初始化SVD,物品主题矩阵为随机矩阵')

i = 0
for user, itemRates in self.trainset.items():
for item in itemRates.keys():
self.trainMap[i] = (user, item)
i += 1
self.logger.info('SVD模型初始化完毕!')

def train(self, iteration, topN):
''' 迭代多批次训练SVD模型
min power(rated - rate, 2)/2 + gamma * regularization/2
rate = mean + biasUser + biasItem + qItem * pUser
regularization = power(biasUser) + power(biasItem) + power(pUser) + power(qItem)
故每次迭代:lr = learning rate
delta(biasUser) = lr * ((rate - rated) + gamma * biasUser)
delta(biasItem) = lr * ((rate - rated) + gamma * biasItem)
delta(pUser) = lr * ((rate - rated) * qItem + gamma * pUser)
delta(qItem) = lr * ((rate - rated) * pUser + gamma * qItem)
Args:
iteration:迭代次数
'''
self.logger.info('开始训练...')
sampleSize = len(self.trainMap) # 训练样本总数
sampleSeq = np.arange(sampleSize) # 训练样本号的训练队列
# 迭代训练
self.logger.info('训练前,误差为:')
self.evaluate(self.trainset, topN)
for i in range(iteration):
np.random.shuffle(sampleSeq) # 打乱训练样本号的训练队列
# 取样本号,逐样本训练
for j in sampleSeq:
user, item = self.trainMap[j]
rate = self.predict(user, item) # 预测得分,计算差异
rated = self.trainset[user][item]

# 计算预测误差,并更新参数
rateDiff = rate - rated
self.biasUser[user] -= (self.lr * (rateDiff + self.gamma * self.biasUser[user]))
self.biasItem[item] -= (self.lr * (rateDiff + self.gamma * self.biasItem[item]))
self.pUser[user] -= (self.lr * (rateDiff * self.qItem[item] +
self.gamma * self.pUser[user]))
self.qItem[item] -= (self.lr * (rateDiff * self.pUser[user] +
self.gamma * self.qItem[item]))

if i % 10 == 0:
self.logger.info('训练进度[{i}/{iteration}],训练误差为:'.format(i=i,
iteration=iteration))
self.evaluate(self.trainset, topN)
self.lr *= 0.5

self.logger.info('训练后,误差为:')
self.evaluate(self.trainset, topN)
self.logger.info('训练结束!')



def predict(self, user, item):
''' 测试SVD模型
rate = mean + biasUser + biasItem + qItem * pUser
regularization = power(biasUser) + power(biasItem) + power(pUser) + power(qItem)
Args:
user:待预测评分的用户
item:待预测评分的物品
Returns:
rate:用户user对物品item的评分
'''
rate = (self.mean + self.biasUser[user] + self.biasItem[item] +
np.dot(self.qItem[item], self.pUser[user]))
return rate

def evaluate(self, dataset, topN):
''' 评估SVD模型
RMSE = sqrt(mean(power(testRate - predictRate, 2)))
MAE = mean(abs(testRate - predictRate))
precision = len(I(testRate) & I(topN)) / #topN
recall = len(I(testRate) & I(topN)) / len(I(testRate))
recovery = len(set(topN[for all user].keys())) / len(set(testset[for all user].keys()))
Args:
dataset:评估数据集
topN:top N数值
'''
# 获取训练集的物品集
itemset = set()
for _, itemRate in self.trainset.items():
itemset |= itemRate.keys()

rmse = 0
mae = 0
count = 0
hit = 0
totalPredictNum = 0
predictItemset = set()
testItemset = set()
for user, itemRates in dataset.items():
# 如果该用户不在训练集内,则无法进行预测
if self.biasUser.get(user) == None:
continue

testItemset |= set(itemRates.keys())
# 对用户的物品进行预测打分
predictRate = {}
for item in itemset:
predictRate[item] = self.predict(user, item)
# 以分值降序,取top N,然后转为dict变量
rateTopN = dict(sorted(predictRate.items(), key=lambda x:x[1], reverse=True)[0:topN])
if np.random.rand() > 0.999:
self.logger.info('用户{user}的topN推荐[{topN}/{total}]是:{rateTopN}'.format(user=user,
topN=topN, total=len(itemset), rateTopN=rateTopN))
predictItemset |= set(rateTopN.keys())
totalPredictNum += topN
# 以测试集已评分物品为准
for item, rate in itemRates.items():
if self.biasItem.get(item) == None:
continue
rateDiff = rate - predictRate[item]
rmse += np.power(rateDiff, 2)
mae += np.abs(rateDiff)
count += 1
if item in rateTopN.keys():
hit += 1
rmse = np.sqrt(rmse / count)
mae /= count
precision = hit / totalPredictNum
recall = hit / count
recovery = len(predictItemset) / len(testItemset)
self.logger.info('本次评估RMSE:{rmse}'.format(rmse=rmse))
self.logger.info('本次评估mae:{mae}'.format(mae=mae))
self.logger.info('本次评估预测准确度:{precision}'.format(precision=precision))
self.logger.info('本次评估召回率:{recall}'.format(recall=recall))
self.logger.info('本次评估覆盖率:{recovery}'.format(recovery=recovery))

思考的点包括:

  1. 训练过程涉及到超参的调节:迭代次数、学习步长、罚项的权重;其中,学习步长还涉及到随着迭代步长的变化规则。
  2. 评价标准的选取。

总结

这个实现层面上倒是比较好实现,但在评估时却发现了一个有趣的事:随着训练迭代递增,RMSE下降,而top N推荐的准确率也跟着下降了。是的,预测更加准确,但top N推荐效果却下降了

其实这个事情,在Greg Linden的09年3月份博客里就已经思考过了(Greg Linden是Amazon早期推荐系统的主要贡献者之一),在衡量好的推荐算法上,RMSE低并不是说top N预测效果(Recall@nPrecision@n)就好。在5星评分里,本来用户评4星,你预测3星,和评3新预测2星,效果和意义上都更差了,因为对于差的物品,用户并不care,唯一matter的是,他们喜欢的、高分的玩意。并且在网络世界里,用户的评分受各种因素的影响(如视频流畅度、物品的完整性),他们更trust的,还是能给出推荐理由的算法(Amazon在这方面走得更远一些)。并且,推荐的多样性、新颖度、可解释性等,都对算法十分有益。所以说,RMSE评价也不能全仰仗。更多可参考的论文还有,Performance of recommender algorithms on top-n recommendation tasks

到现实中,可能某部电影口评差,但还有许多人想看(看个热闹,好批斗一番?或者,为了扩充自己观影资料库里的多样性?)。所以,在评价指标上,莫慌。理解算法原理,明确业务指标,理清推荐目的,可也是很重要的实战指导意义。

个人偏好,注重的更多的还是,对算法的后续分析、可视化、原理深入探索,还有运营数据的理解及对当前系统所使用的算法的反馈意义。

感谢对原创的支持~