0%

Personal_Rank

召回算法Personal Rank原理及自编程实现。

算法原理

背景及物理意义

  • 用户行为很容易表示为图
  • 图推荐在个性化推荐领域效果显著

二分图

二分图又称作二部图,是图论种的一种特殊模型。设G=(V,E)是一个无向图,如果顶点V可分割为两个互不相交的子集(A,B),并且图种的每条边(i,j)所关联的两个顶点i和j分别输入这两个不同的顶点集(i in A, j in B),则称图G为一个二分图。

物理意义

对user A来说,item c 和 item e 哪个更值得推荐?

image-20210517195834929

  • 两个顶点之间连通路径数
  • 两个顶点之间连通路径长度
  • 两个顶点之间连通路径经过顶点的出度

example分析

  • 分别有几条路径连通? A-c: 2条 (A-a-B-c, A-d-D-c) A-e: 1条 (A-b-C-e)
  • 连通路径的长度分别是多少?A-c: 3,3 A-e: 3
  • 连通路径经过顶点的出度分别是多少? A-c: (3,2,2,2; 3,2,2,2) A-e: (3,2,2,1)
  • 因此 item c 更值得推荐

算法公式解析

算法抽象

对用户A进行个性化推荐,从用户A结点开始在用户物品二分图random walk,以alpha的概率[一般0.6-0.8]从A的出边种等概率选择一条游走过去,到达该顶点后(举例顶点a),有alpha的概率继续从顶点a的出边种等概率选择一条继续游走到下一结点,或者(1-alpha)的概率回到起点A,多次迭代,直到各顶点对于用户A的重要度收敛。

算法公式

image-20210517201928649

上面算法在时间复杂度上有个明显的缺陷,每次为每个用户推荐时,都需要在整个用户物品二分图上进行迭代,直到整个图上每个节点收敛。这一过程时间复杂度非常高,不仅无法提高实时推荐,甚至离线生产推荐结果也很耗时。

为了解决时间复杂度过高问题,我们可以从矩阵角度出发,personalrank经过多次的迭代游走,使得各节点的重要度趋于稳定,实际上我们根据状态转移矩阵,经过一次矩阵运算就可以直接得到系统的稳态。

矩阵化公式

image-20210517202618445

算法实现

二部图的构建

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import os
def get_graph_from_data(input_file):
"""
:param input_file: user item rating file
:return: a dict: {UserA:{itemb:1, itemc:1}, itemb:{UserA: 1}}
"""

if not os.path.exists(input_file):
return {}
graph = {}
linenum=0
score_thr = 4.0
fp = open(input_file, encoding="utf-8")
for line in fp:
if linenum==0:
linenum+=1
continue
item = line.strip().split(",")
if len(item)<3:
continue
userid, itemid, rating = item[0], "item_"+item[1], float(item[2])
if rating < score_thr:
continue
if userid not in graph:
graph[userid] = {}
graph[userid][itemid] = 1
if itemid not in graph:
graph[itemid] = {}
graph[itemid][userid] = 1
fp.close()
return graph

非矩阵化实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
#-*-coding:utf8-*-
from tqdm import tqdm
import operator
import sys
sys.path.append("../util")
import util.read as read
def personal_rank(graph, root, alpha, iter_num, recom_num = 10):
"""
非矩阵化实现 personal rank 算法
:param graph: user item graph
:param root: the fixed user for who to recom
:param alpha: the prob to go to random walk
:param iter_num: iteration num
:param recom_num: recom item num
:return:
a dict: key itemid, value pr
"""
rank = {point: 0 for point in graph}
rank[root] = 1
recom_result = {}
for iter_index in tqdm(range(iter_num)):
tmp_rank = {point: 0 for point in graph}
for out_point, out_dict in graph.items():
for inner_point, value in graph[out_point].items():
tmp_rank[inner_point] += round(alpha*rank[out_point]/len(out_dict),4)
if inner_point == root:
tmp_rank[inner_point] += round(1-alpha,4)
if tmp_rank == rank:
break
rank = tmp_rank
right_num = 0
for co in sorted(rank.items(), key = operator.itemgetter(1),reverse=True):
point, pr_score = co[0], co[1]
if len(point.split('_'))<2:
continue
if point in graph[root]:
continue
recom_result[point] = pr_score
right_num +=1
if right_num > recom_num:
break
return recom_result

def get_one_user_recom():
"""
give one fix_user recom result
"""
user = "1"
alpha = 0.8
graph = read.get_graph_from_data("../data/ratings.csv")
iter_num = 100
recom_result = personal_rank(graph,user,alpha,iter_num)
"""
# 将推荐结果直接存盘或写入到KV中
item_info = read.get_item_info("../data/movies.csv")
for itemid in graph[user]:
pure_itemid = itemid.split("_")[1]
print(item_info[pure_itemid])
print("result---------")
for itemid in recom_result:
pure_itemid = itemid.split("_")[1]
print(item_info[pure_itemid])
print(recom_result[itemid])
"""

if __name__ == '__main__':
get_one_user_recom()


矩阵化实现

  • 稀疏矩阵M使用 scipy.sparse.coo_matrix 存储
  • coo_matrix 可调用 to_csr(), to_csc(), to_dense() 把它转换成CSR或稠密矩阵
  • 构建完成后不允许再插入或删除元素,不能直接进行科学计算和切片操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
from scipy.sparse import coo_matrix
import numpy as np

def graph_to_m(graph):
"""
:param graph: user item graph
:return: a coo_mat M, a list: total user item point, a dict: map all the point to row index
"""
vertex = list(graph.keys())
address_dict = {}
total_len = len(vertex)
for index in range(len(vertex)):
address_dict[vertex[index]] = index
row = []
col = []
data = []
for element_i in graph:
weight = round(1 / len(graph[element_i]), 3)
row_index = address_dict[element_i]
for element_j in graph[element_i]:
col_index = address_dict[element_j]
row.append(row_index)
col.append(col_index)
data.append(weight)
row = np.array(row)
col = np.array(col)
data = np.array(data)
m = coo_matrix((data, (row, col)), shape=(total_len, total_len))
return m, vertex, address_dict

def mat_all_point(m_mat, vertex, alpha):
"""
get E-alpha*m_mat.T
:param m_mat:
:param vertex: total item and user point
:param alpha: the prob for random walking
:return: a sparse mat
"""
total_len = len(vertex)
row = []
col = []
data = []
for index in range(total_len):
row.append(index)
col.append(index)
data.append(1)
row = np.array(row)
col = np.array(col)
data = np.array(data)
eye_t = coo_matrix((data, (row, col)), shape=(total_len, total_len))
# print(eye_t.todense())
# sys.exit()
return eye_t.tocsr() - alpha * m_mat.tocsr().transpose()
  • 利用 scipy.sparse.linalg.gmres 解线性方程组
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
def personal_rank_matrix(graph, root, alpha, recom_num=10):
"""
:param graph: user item graph
:param root: the fix user to recom
:param alpha: the prob to random walk
:param recom_num: recom item num
:return: a dict, key itemid, value pr score
"""
m, vertex, address_dict = mat_util.graph_to_m(graph)
if root not in address_dict:
return {}
score_dict = {}
recom_dict = {}

mat_all = mat_util.mat_all_point(m, vertex, alpha)
index = address_dict[root]
initial_list = [[0] for i in range(len(vertex))]
initial_list[index] = [1]
r_zero = np.array(initial_list)

res = gmres(mat_all, r_zero, tol=1e-8)[0]

for i in range(len(res)):
point = vertex[i]
if len(point.strip().split('_')) < 2:
continue
if point in graph[root]:
continue
score_dict[point] = round(res[i], 3)
for co in sorted(score_dict.items(), key=operator.itemgetter(1), reverse=True)[:recom_num]:
point, score = co[0], co[1]
recom_dict[point] = score
return recom_dict