0%

基于物品的协同过滤

基于物品的协同过滤理解和复现的一点记录。

程序入口

  • 日志对象;
  • 读取数据/分割数据集;
  • 创建推荐算法对象/初始化参数;
  • 计算相似度矩阵;
  • 预测评分并推荐;
  • 评价指标计算。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import os
from logger import Logger
from movielens import RatingData
from cf import CF
from similarity import Similarity

''' 基于kNN-CF推荐算法的流程 '''
# 日志打印对象
logPath = os.path.join('logs', 'kNNCF.log')
logger = Logger.getLogger(logPath)

# 读取数据
ratingfile = os.path.join('ml-1m', 'ratings.dat')
proportion = 0.7 # 训练集占总数据集比例,其余为测试集
# 基于item
itemRatingDict = RatingData.getItemRatingDict(logger, ratingfile, proportion)
itemTrainset = itemRatingDict[RatingData.TRAIN_SET_KEY]
itemTestset = itemRatingDict[RatingData.TEST_SET_KEY]
# 计算相似度矩阵,并推荐
cfAlgo = CF(logger, itemTrainset, itemTestset, neighborK=20, topN=10)
cfAlgo.calcSimMat(Similarity.rateDiffDivCoRated)
cfAlgo.recommand()
# 评价
cfAlgo.evaluate()

# 播放完成音乐
#from playsound import playsound
#playsound('C:/Users/Public/Music/Sample Music/Sleep Away.mp3') # 无法设置播放时间
import winsound
winsound.Beep(500, 500) # 不好听

各个模块

日志模块 logger.py

打印到控制台,同时也可以记录到 logPath 路径下的文件中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import logging
class Logger(object):
''' 日志记录类,记录到目录路径文件内,同时也打印到console上 '''
__logger = {}

@staticmethod
def __initLogger(logPath, logLevel):
''' 创建日志对象,并放入到__logger字典。key为logPath,value为logger对象
Args:
logPath:日志文件路径
logLevel:日志打印水平
'''
# 设置日志记录等级、格式、文件
logger = logging.getLogger(logPath)
logger.setLevel(logLevel)
# Formatter - 格式
formatter = logging.Formatter(r'%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s')

# Console - 控制台就打印debug
consoleHandler = logging.StreamHandler()
consoleHandler.setLevel(logLevel)
consoleHandler.setFormatter(formatter)
# 文件 - 文件就记录error
fileHandler = logging.FileHandler(logPath, mode='w', encoding='UTF-8')
fileHandler.setLevel(logLevel)
fileHandler.setFormatter(formatter)

# 添加句柄
logger.addHandler(consoleHandler)
logger.addHandler(fileHandler)
logger.info('创建日志文件{logfile}成功!'.format(logfile=logPath))
Logger.__logger[logPath] = logger

@staticmethod
def getLogger(logPath, logLevel=logging.DEBUG):
''' 从__logger字典获取日志对象,key为logPath。若logger对象不存在,则新建
Args:
logPath:日志文件路径
logLevel:日志打印水平,默认打印debug等级及以上
Returns:
logger日志打印对象
'''
if Logger.__logger.get(logPath) == None:
Logger.__initLogger(logPath, logLevel)
return Logger.__logger.get(logPath)

Question:如果出现日志打印多次的情况(logger.info('1')但是打印了两次1),是因为程序异常停止,导致自己创建的logger对象会被回收,但是logging.getLogger创建的对象依然存在,若此时再次创建重名的logging.getLogger对象,则会重复添加打印通道的句柄,导致多处打印(因为logging.getLogger(name)是对name的单例模式)。

解决:出现异常了,给换个名字创建日志对象,或者,quit()重启console就行,或者,运行下面语句:

1
2
3
import logging
from imp import reload
reload(logging)

数据加载模块 movielens.py

用字典变量存储,类似ratesDict[item][user] = rate。然后按设定比例、随机种子,分割训练集和测试集。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
import random as rand
rand.seed(0)
class RatingData(object):
''' movielens的rating.dat的数据集处理类 '''

TRAIN_SET_KEY = 'trainset'
TEST_SET_KEY = 'testset'

@staticmethod
def __loadFile(filepath, logger):
''' 加载数据文件,返回生成器。
Args:
filepath:数据文件的路径
logger:日志记录对象
Returns:
数据文件中,每次按序返回一行数据的迭代器
'''
with open(filepath, 'r', encoding='UTF-8') as fp:
for i, line in enumerate(fp):
yield line.strip('\r\n')
if i % 100000 == 0:
logger.info('加载数据文件生成器 {filepath}({rowNum})'.format(filepath=filepath, rowNum=i))
logger.info('加载数据文件 {filepath} 成功'.format(filepath=filepath))

@staticmethod
def getItemRatingDict(logger, filepath, proportion):
''' 加载评分数据字典对象,划分为训练集和测试集
Args:
logger:日志记录对象
filepath:数据文件的路径
proportion:数据集训练集的划分比例,剩余的作为测试集
Returns:
包含trainset和testset的字典对象
'''
trainset = {}
trainsetSize = 0
testset = {}
testsetSize = 0

for line in RatingData.__loadFile(filepath, logger):
user, item, rating, _ = line.split('::') # 数据格式是:用户::电影::评分::时间戳
user = int(user)
item = int(item)
rating = float(rating)
# 通过比例划分数据集
if rand.random() < proportion:
trainset.setdefault(item, {})
trainset[item][user] = rating
trainsetSize += 1
else:
testset.setdefault(item, {})
testset[item][user] = rating
testsetSize += 1

logger.info('划分训练集和测试集成功!')
logger.info('训练集物品数量 {item}'.format(item = len(trainset)))
logger.info('训练集大小 {size}'.format(size=trainsetSize))
logger.info('测试集物品数量 {item}'.format(item = len(testset)))
logger.info('测试集大小 {size}'.format(size=testsetSize))

return {RatingData.TRAIN_SET_KEY:trainset, RatingData.TEST_SET_KEY:testset}

一个良好的实践就是:总是先用较小的数据集测试功能、性能;针对到movielens 1m数据集,原来大概是3000+ item * 6000+ user,相似度计算就大概O(3000 * 3000 * 6000)时间复杂度;把数据集缩小为 1000item * 2000 user,时间复杂度可是蹭蹭蹭的降下来了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
@staticmethod
def getSmallRatingData(logger, filepath, proportion=0.7, ratio=0.3, transport=False):
''' 加载评分数据字典对象,划分为训练集和测试集
Args:
logger:日志记录对象
filepath:数据文件的路径
proportion:数据集训练集的划分比例,剩余的作为测试集
ratio:缩小为原数据集的比例数
transport:是否将数据集转置,未转置是user-item,转置则是item-user
Returns:
包含trainset和testset的字典对象
'''
# 获取原数据集
dataset = RatingData.getRatingDict(logger, filepath, proportion, transport)
trainset = dataset[RatingData.TRAIN_SET_KEY]
testset = dataset[RatingData.TEST_SET_KEY]
# 获取用户集、物品集
userset = set(trainset.keys())
itemset = set()
for user in userset:
itemset |= set(trainset[user].keys())
# 缩小用户集、物品集
smallUserset = set()
for user in userset:
if rand.random() < ratio:
smallUserset.add(user)
smallItemset = set()
for item in itemset:
if rand.random() < ratio:
smallItemset.add(item)
# 将训练集数据,限制在缩小的用户、物品集内
smallTrainset = {}
for user in smallUserset:
smallTrainset[user] = {}
for item in trainset[user].keys():
if item not in smallItemset:
continue
smallTrainset[user][item] = trainset[user][item]
# 将测试集数据,限制在缩小的用户、物品集内
smallTestset = {}
for user in smallUserset:
smallTestset[user] = {}
for item in testset[user].keys():
if item not in smallItemset:
continue
smallTestset[user][item] = testset[user][item]

if transport:
dimA = '电影'
dimB = '用户'
else:
dimA = '用户'
dimB = '电影'
logger.info('原数据集[{dimA}*{dimB}]大小:[{lenA}*{lenB}]'.format(dimA=dimA,
dimB=dimB, lenA=len(userset), lenB=len(itemset)))
logger.info('收缩数据集[{dimA}*{dimB}]大小:[{lenA}*{lenB}]'.format(dimA=dimA,
dimB=dimB, lenA=len(smallUserset), lenB=len(smallItemset)))

return {RatingData.TRAIN_SET_KEY:smallTrainset, RatingData.TEST_SET_KEY:smallTestset}

相似度函数模块 similarity.py

提供相似度计算的函数,这里写了3个相似度度量函数:共现、LiRa、自己推的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
import numpy as np
import warnings

class Similarity(object):
''' 相似度计算工具类 '''

@staticmethod
def coOccurSim(a, b):
''' 向量a,b的共现相似度,有点类似于关联分析
sim = count(a and b) / sqrt(count(a) * count(b))
'''
# 取评分用户的交集
setA = set(a.keys())
setB = set(b.keys())
users = setA & setB
coOccur = len(users)
# 若物品没有评分用户交集,则说明没有相似性
if coOccur == 0:
return 0

# 统计评分共现项
occurA = len(setA)
occurB = len(setB)
# 计算共现相似度
sim = coOccur / np.sqrt(occurA * occurB)
return sim

@staticmethod
def liRa(a, b):
''' LiRa评分
sim = log_10(p1/p2)
p1 = cum(power(1/2, dist+1))
p2 = cum(p(coOccur))
'''
# 取评分用户的交集
users = set(a.keys()) & set(b.keys())
# 若物品没有评分用户交集,则说明没有相似性
if len(users) == 0:
return 0

p1 = 1
p2 = 1
sim = 0
for user in users:
if np.abs(a[user] - b[user]) == 4:
p1 = np.power(1/2, np.abs(a[user] - b[user]))
else:
p1 = np.power(1/2, np.abs(a[user] - b[user]) + 1)

if np.abs(a[user] - b[user]) == 0:
p2 = 1/5
else:
p2 = 2 * (5 - np.abs(a[user] - b[user])) / 25

sim += np.log10(p1/p2)

return sim


def rateDiffDivCoRated(a, b):
''' 相似性 ~ 评分差异性/共同评分项概率
sim
'''
# 取评分用户的交集
setA = set(a.keys())
setB = set(b.keys())
users = setA & setB

coOccur = len(users)
# 若物品没有评分用户交集,则说明没有相似性
if coOccur <= 1:
return 0

lenA = len(setA)
lenB = len(setB)
sim = 0
warnings.filterwarnings('error')
for user in users:
rateDiff = np.abs(a[user] - b[user])
sim += (1 - rateDiff/3)

sim /= np.sqrt(lenA * lenB)

return sim

Question. 为什么不用 pearson 或者 cosine 等等其他的?

事实上,我试过了,效果没共现好(LiRa<共现<推导的)。

由于稀疏性,导致某些物品的co-rated用户非常少,那此时计算出来的similarity的可信度就值得推敲了,这是一些相似度计算方法,必须面对的一个问题。一种方式是设定个co-rated阈值 控制similarity可信度:

共现则不存在这样的问题,因为它本身相似度就与co-rated数成正比(2010 Google在youtube视频推荐算法用的相似度度量The YouTube video recommendation system)。LiRa是我找到的2017年的针对这个问题的论文提出的方法,实测效果没共现好。不过想法倒是挺好:在共现的想法上,加入概率和对评分差异的考虑。

然后我就沿着这个思路,推导一波,简化一波,得出了rateDiffDivCoRated函数的公式,效果好像和共现的差不多(或者好一点),不过有个超参需要设置。这是个值得思考的点。

其中,co-rated物体的下标,统计共同评分项的得分,然后除以 各自已评分项的积的根 做标准化。

基于物品的协同过滤 cf.py

包含4个函数:

  • 初始化;
  • 相似度矩阵计算;
  • 推荐(kNN相似item选取 + 预测评分计算 + Top-N item选取);
  • 评估(准确率、召回率、覆盖率)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
from operator import itemgetter
import random

class CF(object):
''' TopN推荐 - 协同过滤(Collaborative Filtering) '''

def __init__(self, logger, trainset, testset, neighborK=20, topN=10):
self.logger = logger # 日志对象
self.trainset = trainset # 训练集
self.testset = testset # 测试集
self.neighborK = neighborK # 近邻数
self.topN = topN # 推荐数
self.simArray = {} # 物品相似度矩阵
self.predictRates = {}
# 用于推荐算法的评价
self.precision = 0
self.recall = 0
self.coverage = 0

self.logger.info('相似近邻物品数量 = {kNNValue}'.format(kNNValue=self.neighborK))
self.logger.info('推荐top N物品数量 = {topNValue}'.format(topNValue=self.topN))

def calcSimMat(self, simFunc):
''' 计算物品相似度矩阵 '''
self.logger.info('计算物品相似度矩阵...')
totalItemNum = len(self.trainset.keys())
itemNum = 1
for itemA, userRatesA in self.trainset.items():
for itemB, userARatesB in self.trainset.items():
if itemA == itemB:
continue
# 若对角相似度已经存在,则直接复制
self.simArray.setdefault(itemA, {})
if self.simArray.get(itemB) != None and self.simArray[itemB].get(itemA) != None:
self.simArray[itemA][itemB] = self.simArray[itemB][itemA]
continue
self.simArray[itemA][itemB] = simFunc(userRatesA, userARatesB)
# 记录迭代次数日志
itemNum += 1
if itemNum % 1000 == 0:
self.logger.info('计算相似度矩阵进度:{rowNum}/{totalNum}'.format(rowNum=itemNum, totalNum=totalItemNum))

self.logger.info('计算物品相似度矩阵成功!')

def recommand(self):
''' 根据近邻数K,推荐Top N '''

# 给物品相似度排序
for item, itemSim in self.simArray.items():
# 产生由前K个相似的物品(item, sim)组成的列表
self.simArray[item] = sorted(itemSim.items(), key=itemgetter(1), reverse=True)[:self.neighborK]
if random.random() > 0.99:
self.logger.info('example:物品[{item}]的相似近邻K物品为:{items}'.format(item=item, items=self.simArray[item]))

# 取用户集合
users = set()
for _, userRates in self.trainset.items():
users |= set(userRates.keys())

# 对每个用户的每个物品,由最相似的K个物品预测评分
self.predictRates = {}
for user in users:
self.predictRates.setdefault(user, {})
for item, itemSims in self.simArray.items():
# 若用户已对该物品X评分,则不用进行预测
if self.trainset[item].get(user) != None:
continue

sumSim = 0
sumPredictRate = 0
coRatedNum = 0
# 遍历预测物品X的相似物品k
for itemInSim, sim in itemSims:
# 若用户没对相似物品k评分,则该物品k提供不了预测
if self.trainset[itemInSim].get(user) == None:
continue

sumPredictRate += (self.trainset[itemInSim][user] * sim)
sumSim += sim
coRatedNum += 1
# 标准化
if sumSim != 0:
self.predictRates[user][item] = sumPredictRate


# 保留预测评分中的top N
for user, itemRates in self.predictRates.items():
if len(itemRates) <= self.topN:
self.predictRates[user] = sorted(itemRates.items(), key=itemgetter(1), reverse=True)
else:
self.predictRates[user] = sorted(itemRates.items(), key=itemgetter(1), reverse=True)[:self.topN]
if random.random() > 0.99:
self.logger.info('example:用户[{u}]的top N物品为:{i}'.format(u=user, i=self.predictRates[user]))

def evaluate(self):
# 将以user-item的预测评分,转换为item-user评分
predictItemNum = 0
predictItemRates = {}
for user, itemRates in self.predictRates.items():
predictItemNum += len(itemRates)
for item, rate in itemRates:
predictItemRates.setdefault(item, {})
predictItemRates[item][user] = rate

self.logger.info('总预测物品数为:{num}'.format(num=predictItemNum))
# 将测试集与预测集进行比较评价:覆盖率、准确度、召回率、预测误差
predictItemTypeNum = len(predictItemRates.keys())
itemTypeNum = len(self.testset.keys())
self.coverage = predictItemTypeNum/itemTypeNum # 预测覆盖率 = 预测物品种类 / 总物品种类
self.logger.info('总预测物品种类/总物品种类为:{cov}={pred}/{total}'.format(cov=self.coverage,
pred=predictItemTypeNum, total=itemTypeNum))

hit = 0
rateNum = 0
for item, userRates in self.testset.items():
rateNum += len(userRates.keys())
for user, rate in userRates.items():
if predictItemRates.get(item) == None:
continue
if predictItemRates[item].get(user) != None:
hit += 1

self.precision = hit/predictItemNum # 预测准确度 = 预测中 / 总预测物品数
self.recall = hit/rateNum # 预测召回率 = 预测中 / 测试集用户评分数

self.logger.info('预测准确度 {precision}'.format(precision=self.precision))
self.logger.info('预测召回率 {recall}'.format(recall=self.recall))
self.logger.info('预测覆盖率 {coverage}'.format(coverage=self.coverage))

# 随机预测的准确率 = N * 测试集用户评分总数 / (物品种类数 * 用户数 - 已评分过的物品数)
totalRateNum = itemTypeNum * len(self.predictRates)
ratedNum = 0
for item, userRates in self.trainset.items():
ratedNum += len(userRates)
unrateNum = totalRateNum - ratedNum
randomPrecision = self.topN * rateNum/unrateNum
self.logger.info('随机预测误差 {randomPrecision}'.format(randomPrecision=randomPrecision))

Question:预测评分

是的,又是一个值得思考的点。用已评分物品来预测未评分物品,需要考量:该用户已评分物品数据多不?与未评分物品的相似性够不?足够支撑预测不?如何计算预测评分(用相似度加权平均?那明明低相似度的物品不应该比高相似度的物品更能预测评分,为什么还同样权重加权平均?)

我这里针对任一用户,待预测物品基于其已评分物品集合进行加权累加,粗暴的利用用户偏好进行推荐(用户偏好的某一类别物品,那么该类别内的物品的物品相似度高、已评分物品数多,则预测分数高),容易造成推荐类别单一。

故是否可以先对物品进行分类,然后对每类推荐物品数设置一个阈值或罚项,以权衡多样性和用户偏好?

总结

实战的话:

  1. 先用小数据集,快速调试算法功能、性能,甚至初步调参;
  2. 在前期测试阶段,对于公式计算的模块,使用warnings.filterwarnings('error'),将warning也可作为exception进行捕捉,这样可以打印公式计算过程中warning的信息。后期解决问题了,便可以移除。

理论的话,需要思考:

  1. co-rated数量对相似度计算的影响;
  2. 已评分数和相似度大小对评分预测的影响。
感谢对原创的支持~