0%

CF

个性化召回算法协同过滤(Collaborative filtering)原理及自编程实现。

算法原理

Item cf

  • 给用户推荐他之前喜欢的物品相似的物品
  • 如何衡量相似
  • 如何衡量喜欢

公式

image-20210531100954722 image-20210531101023677

公式升级1

  • 理论意义:活跃用户应该被降低在相似度公式中的贡献度
image-20210531101455448

公式升级2

  • 理论意义:用户在不同时间对item的操作应给予时间衰减惩罚
image-20210531101619543

User cf

  • 给用户推荐相似兴趣用户感兴趣的物品
  • 如何评价相似兴趣用户集合
  • 找到集合用户感兴趣的而目标用户没行为过的item

公式

image-20210531103031062

公式升级1

  • 理论意义:降低异常活跃物品对与用户相似度的贡献
image-20210531103506872

公式升级2

  • 理论意义:不同用户对同一item行为的时间段不同应给予时间惩罚
image-20210531104013417

Usercf Vs Itemcf

优缺点比较

  • 推荐实时性 (用户发生新的行为,Usercf结果不变,itemcf结果实时变化)
  • 新用户/新物品的推荐
  • 推荐理由的可解释性(usercf 可解释性差,itemcf可解释性强)

适用场景

  • 性能层面考量:user item 的数量
  • 个性化层面考量:(usercf解决新物品及时下发,itemcf个性化更强)

算法实现

  • 工具函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
def get_user_click(rating_file):
"""
get user click list
:param rating_file: input file
:return: a dict key:userid, value:[itemid1,itemid2,...]
"""
if not os.path.exists(rating_file):
return {}, {}
linenum = 0
user_click = {}
user_click_time = {}
fp = open(rating_file, encoding="utf-8")
for line in fp:
if linenum == 0:
linenum += 1
continue
item = line.strip().split(",")
if len(item) < 4:
continue
[userid, itemid, rating, timestamp] = item
if userid + "_" + itemid not in user_click_time:
user_click_time[userid + "_" + itemid] = int(timestamp)
if float(rating) < 3.0:
continue
if userid not in user_click:
user_click[userid] = []
user_click[userid].append(itemid)
fp.close()
return user_click, user_click_time


def get_item_info(item_file):
"""
get item info[title, genres]
:param item_file: input file
:return: a dict: key itemid, value [title, genres]
"""
if not os.path.exists(item_file):
return {}
linenum = 0
item_info = {}
fp = open(item_file, encoding="utf-8")
for line in fp:
if linenum == 0:
linenum += 1
continue
item = line.strip().split(",")
if len(item) < 3:
continue
if len(item) == 3:
[itemid, title, genres] = item
elif len(item) > 3:
itemid = item[0]
genres = item[-1]
title = ",".join(item[1:-1])
if itemid not in item_info:
item_info[itemid] = [title, genres]
fp.close()
return item_info
  • itemcf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
def base_contribute_score():
"""
item cf base sim contribution score by user
:return:
"""
return 1

def update_one_contribute_score(user_total_click_num):
"""
item cf update sim contribution score by user
"""
return 1/math.log10(1+user_total_click_num)

def update_two_contribute_score(click_time_one,click_time_two):
"""
item cf update two sim contribution score by user
"""
delata_time = abs(click_time_two-click_time_one)
total_sec = 60*60*24
delata_time = delata_time/total_sec
return 1/(1+delata_time)

def cal_item_sim(user_click, user_click_time):
"""
:param user_click: dict key userid,value [itemid1, itemid2,...]
:return: dict, key: itemid_i, value dict, value_key itemid_j, value_value, simscore
"""
co_appear = {}
item_user_click_time = {}
for user, itemlist in user_click.items():
for index_i in range(0,len(itemlist)):
itemid_i = itemlist[index_i]
item_user_click_time.setdefault(itemid_i,0)
item_user_click_time[itemid_i] += 1
for index_j in range(index_i+1,len(itemlist)):
itemid_j = itemlist[index_j]
if user+"_"+itemid_i not in user_click_time:
click_time_one = 0
else:
click_time_one = user_click_time[user+"_"+itemid_i]
if user+"_"+itemid_j not in user_click_time:
click_time_two = 0
else:
click_time_two = user_click_time[user+"_"+itemid_j]
co_appear.setdefault(itemid_i,{})
co_appear[itemid_i].setdefault(itemid_j,0)
co_appear[itemid_i][itemid_j] += update_two_contribute_score(click_time_one,click_time_two)

co_appear.setdefault(itemid_j,{})
co_appear[itemid_j].setdefault(itemid_i, 0)
co_appear[itemid_j][itemid_i] += update_two_contribute_score(click_time_one,click_time_two)
item_sim_score = {}
item_sim_score_sorted = {}
for itemid_i , relate_item in co_appear.items():
for itemid_j,co_time in relate_item.items():
sim_score = co_time/math.sqrt(item_user_click_time[itemid_i]*item_user_click_time[itemid_j])
item_sim_score.setdefault(itemid_i,{})
item_sim_score[itemid_i].setdefault(itemid_j,0)
item_sim_score[itemid_i][itemid_j] = sim_score

for itemid in item_sim_score:
item_sim_score_sorted[itemid] = sorted(item_sim_score[itemid].items(),\
key=operator.itemgetter(1), reverse=True)
return item_sim_score_sorted

def cal_recom_result(sim_info, user_click):
"""
recom by itemcf
:param sim_info: item sim dict
:param user_click: user click dict
:return: a dict, key:userid, value: dict, value_key itemid, value_value recom_score
"""
recent_click_num = 3
topk=5
recom_info = {}
for user in user_click:
click_list = user_click[user]
recom_info.setdefault(user, {})
for itemid in click_list[:recent_click_num]:
if itemid not in sim_info:
continue
for itemsimzuhe in sim_info[itemid][:topk]:
itemsimid = itemsimzuhe[0]
itemsimscore = itemsimzuhe[1]
recom_info[user][itemsimid] = itemsimscore
return recom_info
  • usercf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
def base_contribution_score():
return 1

def update_one_contribution_score(item_user_click_count):
"""
usercf user contribution score update v1
:param item_user_click_count: how many user have clicked this item
:return: contribution score
"""
return 1 / math.log10(1 + item_user_click_count)

def update_two_contribution_score(click_time_one, click_time_two):
"""
user cf user contribution score update v2
:param click_time_one: different user click time to the same item
:param click_time_two:
:return: contribution score
"""
delta_time = abs(click_time_two - click_time_one)
total_time = 60 * 60 * 24
delta_time = delta_time / total_time
return 1 / (1 + delta_time)


def transfer_user_click(user_click):
"""
get item by user_click
:param user_click: dict key userid, value:[itemid1, itemid2,...]
:return: dict, key itemid value:[userid1,userid2,...]
"""
item_click_by_user = {}
for user in user_click:
item_list = user_click[user]
for itemid in item_list:
item_click_by_user.setdefault(itemid, [])
item_click_by_user[itemid].append(user)
return item_click_by_user


def cal_user_sim(item_click_by_user, user_click_time):
"""
get user sim info
:param item_click_by_user: dict, key itemid value:[userid1,userid2]
:return: dict key: itemid, value_key: itemid_j, value_value:sim
"""
co_appear = {}
user_click_count = {}
for itemid, user_list in item_click_by_user.items():
for index_i in range(0, len(user_list)):
user_i = user_list[index_i]
user_click_count.setdefault(user_i, 0)
user_click_count[user_i] += 1
if user_i + "_" + itemid not in user_click_time:
click_time_one = 0
else:
click_time_one = user_click_time[user_i + "_" + itemid]
for index_j in range(index_i + 1, len(user_list)):
user_j = user_list[index_j]
if user_j + "_" + itemid not in user_click_time:
click_time_two = 0
else:
click_time_two = user_click_time[user_j + "_" + itemid]
co_appear.setdefault(user_i, {})
co_appear[user_i].setdefault(user_j, 0)
co_appear[user_i][user_j] += update_two_contribution_score(click_time_one, click_time_two)
co_appear.setdefault(user_j, {})
co_appear[user_j].setdefault(user_i, 0)
co_appear[user_j][user_i] += update_two_contribution_score(click_time_one, click_time_two)
user_sim_info = {}
user_sim_info_sorted = {}
for user_i, relate_user in co_appear.items():
user_sim_info.setdefault(user_i, {})
for user_j, cotime in relate_user.items():
user_sim_info[user_i].setdefault(user_j, 0)
user_sim_info[user_i][user_j] = cotime / math.sqrt(user_click_count[user_i] * user_click_count[user_j])
for user in user_sim_info:
user_sim_info_sorted[user] = sorted(user_sim_info[user].items(), \
key=operator.itemgetter(1), reverse=True)
return user_sim_info_sorted


def cal_recom_result(user_click, user_sim):
"""
recom by usercf algo
:param user_click: dict key userid value [itemid1,itemid2,...]
:param user_sim: dict key userid value:[(userid1,score),(x,x),...]
:return:
dict, key userid value dict value_key: itemid, value_value: recom_score
"""
recom_result = {}
topk_user = 3
item_num = 5
for user, item_list in user_click.items():
tmp_dict = {}
for itemid in item_list:
tmp_dict.setdefault(itemid, 1)
recom_result.setdefault(user, {})
for zuhe in user_sim[user][:topk_user]:
userid_j, sim_score = zuhe
if userid_j not in user_click:
continue
for itemid_j in user_click[userid_j][:item_num]:
recom_result[user].setdefault(itemid_j, sim_score)
return recom_result