问题描述
目前,我们已经使用重启随机游走算法实现了在大型社交网络中的用户关联度计算,详情见:【推荐平台】基于Spark的大型社交网络重启随机游走算法实现 。
但是,仅仅计算用户间的关联度还不够。例如,对于搜索场景而言,用户不仅需要搜索联系人,还可能会搜索 Channel
,File
,Meeting
等等。同时,对于 AI 助手而言,在回答用户提出的问题时,也需要去获取与这个用户相关的上下文。这些上下文包括但不限于用户参加过的 Meeting
,用户发过的 Message
,用户分享的 File
等等。因此,如何把用户社交网络的概念,扩展为包含 User
,Channel
,Meeting
,File
等多个实体的异构网络,并使用重启随机游走算法计算用户与实体间的关联度,是亟需解决的问题。
换句话说,我们需要针对每个用户构建如下的数据结构:
{
"userId": "yzdfct5ltoums2xs9vf6bg",
"accountId": "S6Xz47u2SxKYzXy85qKLDg",
"relevantUsers": [
{
"userId": "agyxmy6qtmebgwfznokz-a",
"accountId": "S6Xz47u2SxKYzXy85qKLDg",
"score": 0.2154
},
{
"userId": "gisb1z8qrmgxi1v11m9eta",
"accountId": "E3OwQ6tSTQu1JdJx2unNtw",
"score": 0.1603
},
......
],
"relevantChannels": [
{
"channelId": "[email protected]",
"accountId": "S6Xz47u2SxKYzXy85qKLDg",
"score": 0.1135
},
{
"channelId": "[email protected]",
"accountId": "S6Xz47u2SxKYzXy85qKLDg",
"score": 0.0997
},
......
],
"relevantMeetings": [
{
"meetingId": "D0nhua0ou/eCoBIC4Szohw==",
"accountId": "S6Xz47u2SxKYzXy85qKLDg",
"score": 0.1045
},
{
"meetingId": "jzHhSxjGS6aNFYox6grIPQ==",
"accountId": "S6Xz47u2SxKYzXy85qKLDg",
"score": 0.0911
},
......
],
......
}
FieldName | Type | Desc |
userId | string | current user id |
accountId | string | current user’s account id |
relevantUsers | list | Top150 users associated with current user |
relevantChannels | list | Top150 channels associated with current user |
relevantMeetings | list | Top150 meetings associated with current user |
relevantXXX | list | Top150 XXXs associated with current user |
技术上的挑战
数据指数爆炸
下图是以 User1
为中心的用户社交网络图:
当我们向其中加入 Channel
,Meeting
,File
等实体后,图变得拥挤了起来:
显然,当我们加入的实体类型越多,图就会变得越拥挤。事实上,图上任意两个顶点间的路径数量,与图中的顶点类型数量呈指数相关。这意味着,当我们在这样的异构图中执行随机游走算法,为了在有限的时间内计算出最终结果,我们就不能向其中增加过多的顶点类型。举个例子,当图中只有 User
这一种顶点时,算法可能需要 2 小时执行;当我们向其中加入 Meeting
后,执行时间就变为了 4 小时。目前,我们有 Meeting
,Channel
,File
,Email
,Calendar
,Whiteboard
,Room
等实体类型,在不增加计算资源的情况下,可能需要数天到数十天的时间才能执行一次。这在成本上是不可接受的。
在工程层面上,基于我们之前使用重启的随机游走算法在大型社交网络中的实践方案,我们也无法实现这样拥挤的图的随机游走算法。原因是在每次Spark迭代中,我们需要对每个顶点的边进行截断,以防止潜在的数据倾斜问题的发生。经过实验,将每个顶点的出边和入边分别限制在约 150 个时,效果最好(过低会影响准确度,过高则会产生数据倾斜)。当我们只有User这一种顶点类型时,看起来没什么问题——因为大多数人在工作中不会同时维护超过 100 个同事关系。但是,当我们继续扩展顶点类型,150 个限制就显得过低了。想象下,30 天内,用户可能分别在数十个 Channel
里发过言,开过数十个 Meeting
,预约了数十个 Calendar
等等。所有这些加起来,很容易超过 150 的限制。
综上所述,我们需要找到一种工程上的方式,使得在这种拥挤的图中进行随机游走成为可能。
技术方案
边权重的可传递性
随机游走的每次迭代可以看作是分值传播的过程。在上图中,我们归一化每个顶点的出边权重之和等于 1 。以左边的图为例,假设起点为 User1
,起始分数为 1 。第一次迭代后,顶点 M
和 C
的分数分别为 0.4 和 0.6。第二次迭代后,User2
,User3
,User4
的分数分别为 0.12 ,0.64 ,0.24 。换句话说,在第 T 次迭代后,顶点 UserX
上的分数为图中所有 User1
到 UserX
路径长度小于等于 T 的分数之和。其中路径的分数等于这条路径经过的所有边权重的乘积。
当只考虑 User
与 User
之间的关系时,左图和右图是等价的。即我们分别在左图和右图上进行随机游走算法,最终 User
和 User
之间的结果是相同的。显然,右图比左图简单很多,所花费的计算时长也小得多。
需要注意的是,我们对重启的概念作出修改:即仅在 User
节点重启,对于 Meeting
和 Channel
节点,重启的概率始终为 0 。这是可以接受的,因为重启的目的是使得随机游走的范围聚焦在种子节点附近。显然,修改后的重启方式也可以满足这一需求。
重启随机游走的收敛性
当随机游走的次数趋于无穷时,图中每个顶点的分数是收敛的。这也可以从分值传播的角度来解读:第 T 次和第 T+1 次迭代中,相同顶点的分数的区别来源于长度为 T+1 的路径所贡献的分数。由于图中每条边的权重总是小于 1 的,因此当 T 趋近于无穷大时,增量趋近于 0 。
假设当 T 趋近为无穷时,上图为 T 时刻 UserSeed
与 User1-4
之间的分数,但 UserSeed
与 M
和 C
顶点的分数未知。那么我们如何获得该时刻他们的分数呢?很简单,我们使用上图的拓扑再进行一次迭代即可。此时 T+1 时刻的分数就等于 T 时刻。
解决方案
在之前的用户网络关联度计算中,我们在准备阶段搜集用户在各个维度(例如 Meeting
,Channel
等等)的指标,并将这些指标映射为分数,再合并两个用户间的所有指标,最终为每个用户的所有出边进行归一化。实际上,上述的这些准备工作等价于在“边权重的可传递性”这一节中所画的示意图。
那么,后续我们需要做的就很简单了。正如“重启随机游走的收敛性”这一节所说,我们只需要还原整个异构图,并且再进行一次迭代就可以了。
到这里,我们成功的解决了技术上的挑战,即:
- 我们简化了计算,在用户图的基础上,仅需要附加一次迭代,就可以实现异构图的重启随机游走算法。而一次迭代的成本相当低廉,仅仅是在 Spark 上做一次 Join 操作即可。
- 对于截断的问题,我们可以将其分割为多个子问题进行解决。我们依次计算
User
与Meeting
,User
与Channel
,再将最终的结果进行合并。具体的 SQL 如下所示:
# [SQL1] turncat USER2ENTITY table
select
srcUserId,
dstEntityId,
dstEntityType,
weight
from (
select
srcUserId,
dstEntityId,
dstEntityType,
weight,
row_number() over(partition by dstEntityType, srcUserId order by weight desc) as rank
from
{TABLE_USER2ENTITY_EDGES}
)
where
rank <= 150
# [SQL2] turncat USER2USER RWR RESULT table
select
srcUserId,
dstUserId,
score
from (
select
srcUserId,
dstUserId,
score,
row_number() over(partition by dstUserId order by score desc) as rank
from
{TABLE_USER_RWR_RESULT}
)
where
rank <= 150
# [SQL3] results
select
entity.dstEntityType as entityType
user.srcUserId as srcUserId,
entity.dstEntityId as dstEntityId,
sum(user.score * entity.weight) as score
from
({SQL2}) as user
inner join ({SQL1}) as entity
on user.dstUserId = entity.srcUserId
group by
entity.dstEntityType,
user.srcUserId,
entity.dstEntityId
having
sum(user.score * entity.weight) >= 0.0001