对基于物品的协同过滤 理解和复现的一点记录。
程序入口
日志对象;
读取数据/分割数据集;
创建推荐算法对象/初始化参数;
计算相似度矩阵;
预测评分并推荐;
评价指标计算。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 import osfrom logger import Loggerfrom movielens import RatingDatafrom cf import CFfrom similarity import Similarity''' 基于kNN-CF推荐算法的流程 ''' logPath = os.path.join('logs' , 'kNNCF.log' ) logger = Logger.getLogger(logPath) ratingfile = os.path.join('ml-1m' , 'ratings.dat' ) proportion = 0.7 itemRatingDict = RatingData.getItemRatingDict(logger, ratingfile, proportion) itemTrainset = itemRatingDict[RatingData.TRAIN_SET_KEY] itemTestset = itemRatingDict[RatingData.TEST_SET_KEY] cfAlgo = CF(logger, itemTrainset, itemTestset, neighborK=20 , topN=10 ) cfAlgo.calcSimMat(Similarity.rateDiffDivCoRated) cfAlgo.recommand() cfAlgo.evaluate() import winsoundwinsound.Beep(500 , 500 )
各个模块 日志模块 logger.py 打印到控制台,同时也可以记录到 logPath
路径下的文件中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 import loggingclass Logger (object) : ''' 日志记录类,记录到目录路径文件内,同时也打印到console上 ''' __logger = {} @staticmethod def __initLogger (logPath, logLevel) : ''' 创建日志对象,并放入到__logger字典。key为logPath,value为logger对象 Args: logPath:日志文件路径 logLevel:日志打印水平 ''' logger = logging.getLogger(logPath) logger.setLevel(logLevel) formatter = logging.Formatter(r'%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s' ) consoleHandler = logging.StreamHandler() consoleHandler.setLevel(logLevel) consoleHandler.setFormatter(formatter) fileHandler = logging.FileHandler(logPath, mode='w' , encoding='UTF-8' ) fileHandler.setLevel(logLevel) fileHandler.setFormatter(formatter) logger.addHandler(consoleHandler) logger.addHandler(fileHandler) logger.info('创建日志文件{logfile}成功!' .format(logfile=logPath)) Logger.__logger[logPath] = logger @staticmethod def getLogger (logPath, logLevel=logging.DEBUG) : ''' 从__logger字典获取日志对象,key为logPath。若logger对象不存在,则新建 Args: logPath:日志文件路径 logLevel:日志打印水平,默认打印debug等级及以上 Returns: logger日志打印对象 ''' if Logger.__logger.get(logPath) == None : Logger.__initLogger(logPath, logLevel) return Logger.__logger.get(logPath)
Question :如果出现日志打印多次的情况(logger.info('1')
但是打印了两次1),是因为程序异常停止,导致自己创建的logger对象
会被回收,但是logging.getLogger
创建的对象依然存在,若此时再次创建重名的logging.getLogger对象
,则会重复添加打印通道的句柄,导致多处打印(因为logging.getLogger(name)
是对name
的单例模式)。
解决 :出现异常了,给换个名字创建日志对象,或者,quit()
重启console
就行,或者,运行下面语句:
1 2 3 import loggingfrom imp import reloadreload(logging)
数据加载模块 movielens.py 用字典变量存储,类似ratesDict[item][user] = rate
。然后按设定比例、随机种子,分割训练集和测试集。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 import random as randrand.seed(0 ) class RatingData (object) : ''' movielens的rating.dat的数据集处理类 ''' TRAIN_SET_KEY = 'trainset' TEST_SET_KEY = 'testset' @staticmethod def __loadFile (filepath, logger) : ''' 加载数据文件,返回生成器。 Args: filepath:数据文件的路径 logger:日志记录对象 Returns: 数据文件中,每次按序返回一行数据的迭代器 ''' with open(filepath, 'r' , encoding='UTF-8' ) as fp: for i, line in enumerate(fp): yield line.strip('\r\n' ) if i % 100000 == 0 : logger.info('加载数据文件生成器 {filepath}({rowNum})' .format(filepath=filepath, rowNum=i)) logger.info('加载数据文件 {filepath} 成功' .format(filepath=filepath)) @staticmethod def getItemRatingDict (logger, filepath, proportion) : ''' 加载评分数据字典对象,划分为训练集和测试集 Args: logger:日志记录对象 filepath:数据文件的路径 proportion:数据集训练集的划分比例,剩余的作为测试集 Returns: 包含trainset和testset的字典对象 ''' trainset = {} trainsetSize = 0 testset = {} testsetSize = 0 for line in RatingData.__loadFile(filepath, logger): user, item, rating, _ = line.split('::' ) user = int(user) item = int(item) rating = float(rating) if rand.random() < proportion: trainset.setdefault(item, {}) trainset[item][user] = rating trainsetSize += 1 else : testset.setdefault(item, {}) testset[item][user] = rating testsetSize += 1 logger.info('划分训练集和测试集成功!' ) logger.info('训练集物品数量 {item}' .format(item = len(trainset))) logger.info('训练集大小 {size}' .format(size=trainsetSize)) logger.info('测试集物品数量 {item}' .format(item = len(testset))) logger.info('测试集大小 {size}' .format(size=testsetSize)) return {RatingData.TRAIN_SET_KEY:trainset, RatingData.TEST_SET_KEY:testset}
一个良好的实践就是:总是先用较小的数据集 测试功能、性能;针对到movielens 1m
数据集,原来大概是3000+ item * 6000+ user
,相似度计算就大概O(3000 * 3000 * 6000)
时间复杂度;把数据集缩小为 1000item * 2000 user
,时间复杂度可是蹭蹭蹭的降下来了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 @staticmethod def getSmallRatingData (logger, filepath, proportion=0.7 , ratio=0.3 , transport=False) : ''' 加载评分数据字典对象,划分为训练集和测试集 Args: logger:日志记录对象 filepath:数据文件的路径 proportion:数据集训练集的划分比例,剩余的作为测试集 ratio:缩小为原数据集的比例数 transport:是否将数据集转置,未转置是user-item,转置则是item-user Returns: 包含trainset和testset的字典对象 ''' dataset = RatingData.getRatingDict(logger, filepath, proportion, transport) trainset = dataset[RatingData.TRAIN_SET_KEY] testset = dataset[RatingData.TEST_SET_KEY] userset = set(trainset.keys()) itemset = set() for user in userset: itemset |= set(trainset[user].keys()) smallUserset = set() for user in userset: if rand.random() < ratio: smallUserset.add(user) smallItemset = set() for item in itemset: if rand.random() < ratio: smallItemset.add(item) smallTrainset = {} for user in smallUserset: smallTrainset[user] = {} for item in trainset[user].keys(): if item not in smallItemset: continue smallTrainset[user][item] = trainset[user][item] smallTestset = {} for user in smallUserset: smallTestset[user] = {} for item in testset[user].keys(): if item not in smallItemset: continue smallTestset[user][item] = testset[user][item] if transport: dimA = '电影' dimB = '用户' else : dimA = '用户' dimB = '电影' logger.info('原数据集[{dimA}*{dimB}]大小:[{lenA}*{lenB}]' .format(dimA=dimA, dimB=dimB, lenA=len(userset), lenB=len(itemset))) logger.info('收缩数据集[{dimA}*{dimB}]大小:[{lenA}*{lenB}]' .format(dimA=dimA, dimB=dimB, lenA=len(smallUserset), lenB=len(smallItemset))) return {RatingData.TRAIN_SET_KEY:smallTrainset, RatingData.TEST_SET_KEY:smallTestset}
相似度函数模块 similarity.py 提供相似度计算的函数,这里写了3个相似度度量函数:共现、LiRa、自己推的 。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 import numpy as npimport warningsclass Similarity (object) : ''' 相似度计算工具类 ''' @staticmethod def coOccurSim (a, b) : ''' 向量a,b的共现相似度,有点类似于关联分析 sim = count(a and b) / sqrt(count(a) * count(b)) ''' setA = set(a.keys()) setB = set(b.keys()) users = setA & setB coOccur = len(users) if coOccur == 0 : return 0 occurA = len(setA) occurB = len(setB) sim = coOccur / np.sqrt(occurA * occurB) return sim @staticmethod def liRa (a, b) : ''' LiRa评分 sim = log_10(p1/p2) p1 = cum(power(1/2, dist+1)) p2 = cum(p(coOccur)) ''' users = set(a.keys()) & set(b.keys()) if len(users) == 0 : return 0 p1 = 1 p2 = 1 sim = 0 for user in users: if np.abs(a[user] - b[user]) == 4 : p1 = np.power(1 /2 , np.abs(a[user] - b[user])) else : p1 = np.power(1 /2 , np.abs(a[user] - b[user]) + 1 ) if np.abs(a[user] - b[user]) == 0 : p2 = 1 /5 else : p2 = 2 * (5 - np.abs(a[user] - b[user])) / 25 sim += np.log10(p1/p2) return sim def rateDiffDivCoRated (a, b) : ''' 相似性 ~ 评分差异性/共同评分项概率 sim ''' setA = set(a.keys()) setB = set(b.keys()) users = setA & setB coOccur = len(users) if coOccur <= 1 : return 0 lenA = len(setA) lenB = len(setB) sim = 0 warnings.filterwarnings('error' ) for user in users: rateDiff = np.abs(a[user] - b[user]) sim += (1 - rateDiff/3 ) sim /= np.sqrt(lenA * lenB) return sim
Question . 为什么不用 pearson
或者 cosine
等等其他的?
事实上,我试过了,效果没共现好(LiRa<共现<推导的)。
由于稀疏性,导致某些物品的co-rated用户
非常少,那此时计算出来的similarity
的可信度就值得推敲了,这是一些相似度计算方法,必须面对的一个问题。一种方式是设定个co-rated阈值
控制similarity
可信度:
共现则不存在这样的问题,因为它本身相似度就与co-rated数
成正比(2010 Google在youtube视频推荐算法用的相似度度量The YouTube video recommendation system
)。LiRa
是我找到的2017年的针对这个问题的论文提出的方法,实测效果没共现好。不过想法倒是挺好:在共现的想法上,加入概率和对评分差异的考虑。
然后我就沿着这个思路,推导一波,简化一波,得出了rateDiffDivCoRated函数
的公式,效果好像和共现的差不多(或者好一点),不过有个超参需要设置。这是个值得思考的点。
其中,是co-rated
物体的下标,统计共同评分项的得分,然后除以 各自已评分项的积的根 做标准化。
基于物品的协同过滤 cf.py 包含4个函数:
初始化;
相似度矩阵计算;
推荐(kNN相似item选取 + 预测评分计算 + Top-N item选取);
评估(准确率、召回率、覆盖率)。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 from operator import itemgetterimport randomclass CF (object) : ''' TopN推荐 - 协同过滤(Collaborative Filtering) ''' def __init__ (self, logger, trainset, testset, neighborK=20 , topN=10 ) : self.logger = logger self.trainset = trainset self.testset = testset self.neighborK = neighborK self.topN = topN self.simArray = {} self.predictRates = {} self.precision = 0 self.recall = 0 self.coverage = 0 self.logger.info('相似近邻物品数量 = {kNNValue}' .format(kNNValue=self.neighborK)) self.logger.info('推荐top N物品数量 = {topNValue}' .format(topNValue=self.topN)) def calcSimMat (self, simFunc) : ''' 计算物品相似度矩阵 ''' self.logger.info('计算物品相似度矩阵...' ) totalItemNum = len(self.trainset.keys()) itemNum = 1 for itemA, userRatesA in self.trainset.items(): for itemB, userARatesB in self.trainset.items(): if itemA == itemB: continue self.simArray.setdefault(itemA, {}) if self.simArray.get(itemB) != None and self.simArray[itemB].get(itemA) != None : self.simArray[itemA][itemB] = self.simArray[itemB][itemA] continue self.simArray[itemA][itemB] = simFunc(userRatesA, userARatesB) itemNum += 1 if itemNum % 1000 == 0 : self.logger.info('计算相似度矩阵进度:{rowNum}/{totalNum}' .format(rowNum=itemNum, totalNum=totalItemNum)) self.logger.info('计算物品相似度矩阵成功!' ) def recommand (self) : ''' 根据近邻数K,推荐Top N ''' for item, itemSim in self.simArray.items(): self.simArray[item] = sorted(itemSim.items(), key=itemgetter(1 ), reverse=True )[:self.neighborK] if random.random() > 0.99 : self.logger.info('example:物品[{item}]的相似近邻K物品为:{items}' .format(item=item, items=self.simArray[item])) users = set() for _, userRates in self.trainset.items(): users |= set(userRates.keys()) self.predictRates = {} for user in users: self.predictRates.setdefault(user, {}) for item, itemSims in self.simArray.items(): if self.trainset[item].get(user) != None : continue sumSim = 0 sumPredictRate = 0 coRatedNum = 0 for itemInSim, sim in itemSims: if self.trainset[itemInSim].get(user) == None : continue sumPredictRate += (self.trainset[itemInSim][user] * sim) sumSim += sim coRatedNum += 1 if sumSim != 0 : self.predictRates[user][item] = sumPredictRate for user, itemRates in self.predictRates.items(): if len(itemRates) <= self.topN: self.predictRates[user] = sorted(itemRates.items(), key=itemgetter(1 ), reverse=True ) else : self.predictRates[user] = sorted(itemRates.items(), key=itemgetter(1 ), reverse=True )[:self.topN] if random.random() > 0.99 : self.logger.info('example:用户[{u}]的top N物品为:{i}' .format(u=user, i=self.predictRates[user])) def evaluate (self) : predictItemNum = 0 predictItemRates = {} for user, itemRates in self.predictRates.items(): predictItemNum += len(itemRates) for item, rate in itemRates: predictItemRates.setdefault(item, {}) predictItemRates[item][user] = rate self.logger.info('总预测物品数为:{num}' .format(num=predictItemNum)) predictItemTypeNum = len(predictItemRates.keys()) itemTypeNum = len(self.testset.keys()) self.coverage = predictItemTypeNum/itemTypeNum self.logger.info('总预测物品种类/总物品种类为:{cov}={pred}/{total}' .format(cov=self.coverage, pred=predictItemTypeNum, total=itemTypeNum)) hit = 0 rateNum = 0 for item, userRates in self.testset.items(): rateNum += len(userRates.keys()) for user, rate in userRates.items(): if predictItemRates.get(item) == None : continue if predictItemRates[item].get(user) != None : hit += 1 self.precision = hit/predictItemNum self.recall = hit/rateNum self.logger.info('预测准确度 {precision}' .format(precision=self.precision)) self.logger.info('预测召回率 {recall}' .format(recall=self.recall)) self.logger.info('预测覆盖率 {coverage}' .format(coverage=self.coverage)) totalRateNum = itemTypeNum * len(self.predictRates) ratedNum = 0 for item, userRates in self.trainset.items(): ratedNum += len(userRates) unrateNum = totalRateNum - ratedNum randomPrecision = self.topN * rateNum/unrateNum self.logger.info('随机预测误差 {randomPrecision}' .format(randomPrecision=randomPrecision))
Question :预测评分
是的,又是一个值得思考的点。用已评分物品来预测未评分物品,需要考量:该用户已评分物品数据多不?与未评分物品的相似性够不?足够支撑预测不?如何计算预测评分(用相似度加权平均?那明明低相似度的物品不应该比高相似度的物品更能预测评分,为什么还同样权重加权平均?)
我这里针对任一用户,待预测物品基于其已评分物品集合进行加权累加,粗暴的利用用户偏好进行推荐(用户偏好的某一类别物品,那么该类别内的物品的物品相似度高、已评分物品数多,则预测分数高),容易造成推荐类别单一。
故是否可以先对物品进行分类,然后对每类推荐物品数设置一个阈值或罚项,以权衡多样性和用户偏好?
总结 实战的话:
先用小数据集,快速调试算法功能、性能,甚至初步调参;
在前期测试阶段,对于公式计算的模块,使用warnings.filterwarnings('error')
,将warning
也可作为exception
进行捕捉,这样可以打印公式计算过程中warning
的信息。后期解决问题了,便可以移除。
理论的话,需要思考:
co-rated数量
对相似度计算的影响;
已评分数和相似度大小对评分预测的影响。