【推荐平台】重启随机游走算法在以用户为中心的大型异构网络中的应用

问题描述

目前,我们已经使用重启随机游走算法实现了在大型社交网络中的用户关联度计算,详情见:【推荐平台】基于Spark的大型社交网络重启随机游走算法实现

但是,仅仅计算用户间的关联度还不够。例如,对于搜索场景而言,用户不仅需要搜索联系人,还可能会搜索 ChannelFileMeeting 等等。同时,对于 AI 助手而言,在回答用户提出的问题时,也需要去获取与这个用户相关的上下文。这些上下文包括但不限于用户参加过的 Meeting ,用户发过的 Message ,用户分享的 File 等等。因此,如何把用户社交网络的概念,扩展为包含 UserChannelMeetingFile 等多个实体的异构网络,并使用重启随机游走算法计算用户与实体间的关联度,是亟需解决的问题。

换句话说,我们需要针对每个用户构建如下的数据结构:

{
  "userId": "yzdfct5ltoums2xs9vf6bg",
  "accountId": "S6Xz47u2SxKYzXy85qKLDg",
  "relevantUsers": [
    {
      "userId": "agyxmy6qtmebgwfznokz-a",
      "accountId": "S6Xz47u2SxKYzXy85qKLDg",
      "score": 0.2154
    },
    {
      "userId": "gisb1z8qrmgxi1v11m9eta",
      "accountId": "E3OwQ6tSTQu1JdJx2unNtw",
      "score": 0.1603
    },
    ......
  ],
  "relevantChannels": [
    {
      "channelId": "[email protected]",
      "accountId": "S6Xz47u2SxKYzXy85qKLDg",
      "score": 0.1135
    },
    {
      "channelId": "[email protected]",
      "accountId": "S6Xz47u2SxKYzXy85qKLDg",
      "score": 0.0997
    },
    ......
  ],
  "relevantMeetings": [
    {
      "meetingId": "D0nhua0ou/eCoBIC4Szohw==",
      "accountId": "S6Xz47u2SxKYzXy85qKLDg",
      "score": 0.1045
    },
    {
      "meetingId": "jzHhSxjGS6aNFYox6grIPQ==",
      "accountId": "S6Xz47u2SxKYzXy85qKLDg",
      "score": 0.0911
    },
    ......
  ],
  ......
}
FieldNameTypeDesc
userIdstringcurrent user id
accountIdstringcurrent user’s account id
relevantUserslistTop150 users associated with current user
relevantChannelslistTop150 channels associated with current user
relevantMeetingslistTop150 meetings associated with current user
relevantXXXlistTop150 XXXs associated with current user
数据结构

技术上的挑战

数据指数爆炸

下图是以 User1 为中心的用户社交网络图:

以 User1 为中心的用户社交网络图

当我们向其中加入 ChannelMeetingFile 等实体后,图变得拥挤了起来:

以 User1 为中心的异构网络图

显然,当我们加入的实体类型越多,图就会变得越拥挤。事实上,图上任意两个顶点间的路径数量,与图中的顶点类型数量呈指数相关。这意味着,当我们在这样的异构图中执行随机游走算法,为了在有限的时间内计算出最终结果,我们就不能向其中增加过多的顶点类型。举个例子,当图中只有 User 这一种顶点时,算法可能需要 2 小时执行;当我们向其中加入 Meeting 后,执行时间就变为了 4 小时。目前,我们有 MeetingChannelFileEmailCalendarWhiteboardRoom 等实体类型,在不增加计算资源的情况下,可能需要数天到数十天的时间才能执行一次。这在成本上是不可接受的。

在工程层面上,基于我们之前使用重启的随机游走算法在大型社交网络中的实践方案,我们也无法实现这样拥挤的图的随机游走算法。原因是在每次Spark迭代中,我们需要对每个顶点的边进行截断,以防止潜在的数据倾斜问题的发生。经过实验,将每个顶点的出边和入边分别限制在约 150 个时,效果最好(过低会影响准确度,过高则会产生数据倾斜)。当我们只有User这一种顶点类型时,看起来没什么问题——因为大多数人在工作中不会同时维护超过 100 个同事关系。但是,当我们继续扩展顶点类型,150 个限制就显得过低了。想象下,30 天内,用户可能分别在数十个 Channel 里发过言,开过数十个 Meeting ,预约了数十个 Calendar 等等。所有这些加起来,很容易超过 150 的限制。

综上所述,我们需要找到一种工程上的方式,使得在这种拥挤的图中进行随机游走成为可能。

技术方案

边权重的可传递性

随机游走的分值传播

随机游走的每次迭代可以看作是分值传播的过程。在上图中,我们归一化每个顶点的出边权重之和等于 1 。以左边的图为例,假设起点为 User1 ,起始分数为 1 。第一次迭代后,顶点 MC 的分数分别为 0.4 和 0.6。第二次迭代后,User2User3User4 的分数分别为 0.12 ,0.64 ,0.24 。换句话说,在第 T 次迭代后,顶点 UserX 上的分数为图中所有 User1UserX 路径长度小于等于 T 的分数之和。其中路径的分数等于这条路径经过的所有边权重的乘积。

当只考虑 UserUser 之间的关系时,左图和右图是等价的。即我们分别在左图和右图上进行随机游走算法,最终 UserUser 之间的结果是相同的。显然,右图比左图简单很多,所花费的计算时长也小得多。

需要注意的是,我们对重启的概念作出修改:即仅在 User 节点重启,对于 MeetingChannel 节点,重启的概率始终为 0 。这是可以接受的,因为重启的目的是使得随机游走的范围聚焦在种子节点附近。显然,修改后的重启方式也可以满足这一需求。

重启随机游走的收敛性

异构图的随机游走

当随机游走的次数趋于无穷时,图中每个顶点的分数是收敛的。这也可以从分值传播的角度来解读:第 T 次和第 T+1 次迭代中,相同顶点的分数的区别来源于长度为 T+1 的路径所贡献的分数。由于图中每条边的权重总是小于 1 的,因此当 T 趋近于无穷大时,增量趋近于 0 。

假设当 T 趋近为无穷时,上图为 T 时刻 UserSeedUser1-4 之间的分数,但 UserSeedMC 顶点的分数未知。那么我们如何获得该时刻他们的分数呢?很简单,我们使用上图的拓扑再进行一次迭代即可。此时 T+1 时刻的分数就等于 T 时刻。

解决方案

在之前的用户网络关联度计算中,我们在准备阶段搜集用户在各个维度(例如 MeetingChannel 等等)的指标,并将这些指标映射为分数,再合并两个用户间的所有指标,最终为每个用户的所有出边进行归一化。实际上,上述的这些准备工作等价于在“边权重的可传递性”这一节中所画的示意图。

那么,后续我们需要做的就很简单了。正如“重启随机游走的收敛性”这一节所说,我们只需要还原整个异构图,并且再进行一次迭代就可以了。

到这里,我们成功的解决了技术上的挑战,即:

  1. 我们简化了计算,在用户图的基础上,仅需要附加一次迭代,就可以实现异构图的重启随机游走算法。而一次迭代的成本相当低廉,仅仅是在 Spark 上做一次 Join 操作即可。
  2. 对于截断的问题,我们可以将其分割为多个子问题进行解决。我们依次计算 UserMeetingUserChannel ,再将最终的结果进行合并。具体的 SQL 如下所示:
# [SQL1] turncat USER2ENTITY table
select
    srcUserId,
    dstEntityId,
    dstEntityType,
    weight
from (
    select
        srcUserId,
        dstEntityId,
        dstEntityType,
        weight,
        row_number() over(partition by dstEntityType, srcUserId order by weight desc) as rank
    from
        {TABLE_USER2ENTITY_EDGES}
)
where
    rank <= 150

# [SQL2] turncat USER2USER RWR RESULT table
select
    srcUserId,
    dstUserId,
    score
from (
    select
        srcUserId,
        dstUserId,
        score,
        row_number() over(partition by dstUserId order by score desc) as rank
    from
        {TABLE_USER_RWR_RESULT}
)
where
    rank <= 150

# [SQL3] results
select
    entity.dstEntityType as entityType
    user.srcUserId as srcUserId,
    entity.dstEntityId as dstEntityId,
    sum(user.score * entity.weight) as score
from
    ({SQL2}) as user
    inner join ({SQL1}) as entity
    on user.dstUserId = entity.srcUserId
group by
    entity.dstEntityType,
    user.srcUserId,
    entity.dstEntityId
having
    sum(user.score * entity.weight) >= 0.0001
上一篇
下一篇