166 lines
6.3 KiB
Scala
166 lines
6.3 KiB
Scala
package com.twitter.recos.user_tweet_entity_graph
|
|
|
|
import com.twitter.finagle.stats.StatsReceiver
|
|
import com.twitter.graphjet.algorithms.{
|
|
RecommendationInfo,
|
|
RecommendationType => JavaRecommendationType
|
|
}
|
|
import com.twitter.graphjet.algorithms.socialproof.{
|
|
NodeMetadataSocialProofResult => EntitySocialProofJavaResult,
|
|
SocialProofResult => SocialProofJavaResult
|
|
}
|
|
import com.twitter.recos.decider.UserTweetEntityGraphDecider
|
|
import com.twitter.recos.util.Stats
|
|
import com.twitter.recos.util.Stats._
|
|
import com.twitter.recos.recos_common.thriftscala.{SocialProofType => SocialProofThriftType}
|
|
import com.twitter.recos.user_tweet_entity_graph.thriftscala.{
|
|
HashtagRecommendation,
|
|
TweetRecommendation,
|
|
UrlRecommendation,
|
|
UserTweetEntityRecommendationUnion,
|
|
RecommendationSocialProofRequest => SocialProofThriftRequest,
|
|
RecommendationSocialProofResponse => SocialProofThriftResponse,
|
|
RecommendationType => ThriftRecommendationType
|
|
}
|
|
import com.twitter.servo.request.RequestHandler
|
|
import com.twitter.util.{Future, Try}
|
|
import scala.collection.JavaConverters._
|
|
|
|
class SocialProofHandler(
|
|
tweetSocialProofRunner: TweetSocialProofRunner,
|
|
entitySocialProofRunner: EntitySocialProofRunner,
|
|
decider: UserTweetEntityGraphDecider,
|
|
statsReceiver: StatsReceiver)
|
|
extends RequestHandler[SocialProofThriftRequest, SocialProofThriftResponse] {
|
|
private val stats = statsReceiver.scope(this.getClass.getSimpleName)
|
|
|
|
private def getThriftSocialProof(
|
|
entitySocialProof: EntitySocialProofJavaResult
|
|
): Map[SocialProofThriftType, Map[Long, Seq[Long]]] = {
|
|
val socialProofAttempt = Try(entitySocialProof.getSocialProof)
|
|
.onFailure { e =>
|
|
stats.counter(e.getClass.getSimpleName).incr()
|
|
}
|
|
|
|
socialProofAttempt.toOption match {
|
|
case Some(socialProof) if socialProof.isEmpty =>
|
|
stats.counter(Stats.EmptyResult).incr()
|
|
Map.empty[SocialProofThriftType, Map[Long, Seq[Long]]]
|
|
case Some(socialProof) if !socialProof.isEmpty =>
|
|
socialProof.asScala.map {
|
|
case (socialProofType, socialProofUserToTweetsMap) =>
|
|
val userToTweetsSocialProof = socialProofUserToTweetsMap.asScala.map {
|
|
case (socialProofUser, connectingTweets) =>
|
|
(socialProofUser.toLong, connectingTweets.asScala.map(Long2long).toSeq)
|
|
}.toMap
|
|
(SocialProofThriftType(socialProofType.toInt), userToTweetsSocialProof)
|
|
}.toMap
|
|
case _ =>
|
|
Map.empty[SocialProofThriftType, Map[Long, Seq[Long]]]
|
|
}
|
|
}
|
|
|
|
private def getThriftSocialProof(
|
|
tweetSocialProof: SocialProofJavaResult
|
|
): Map[SocialProofThriftType, Seq[Long]] = {
|
|
val socialProofAttempt = Try(tweetSocialProof.getSocialProof)
|
|
.onFailure { e =>
|
|
stats.counter(e.getClass.getSimpleName).incr()
|
|
}
|
|
|
|
socialProofAttempt.toOption match {
|
|
case Some(socialProof) if socialProof.isEmpty =>
|
|
stats.counter(Stats.EmptyResult).incr()
|
|
Map.empty[SocialProofThriftType, Seq[Long]]
|
|
case Some(socialProof) if !socialProof.isEmpty =>
|
|
socialProof.asScala.map {
|
|
case (socialProofType, connectingUsers) =>
|
|
(
|
|
SocialProofThriftType(socialProofType.toInt),
|
|
connectingUsers.asScala.map { Long2long }.toSeq)
|
|
}.toMap
|
|
case _ =>
|
|
Map.empty[SocialProofThriftType, Seq[Long]]
|
|
}
|
|
}
|
|
|
|
private def getEntitySocialProof(
|
|
request: SocialProofThriftRequest
|
|
): Future[Seq[UserTweetEntityRecommendationUnion]] = {
|
|
val socialProofsFuture = entitySocialProofRunner(request)
|
|
|
|
socialProofsFuture.map { socialProofs: Seq[RecommendationInfo] =>
|
|
stats.counter(Stats.Served).incr(socialProofs.size)
|
|
socialProofs.flatMap { entitySocialProof: RecommendationInfo =>
|
|
val entitySocialProofJavaResult =
|
|
entitySocialProof.asInstanceOf[EntitySocialProofJavaResult]
|
|
if (entitySocialProofJavaResult.getRecommendationType == JavaRecommendationType.URL) {
|
|
Some(
|
|
UserTweetEntityRecommendationUnion.UrlRec(
|
|
UrlRecommendation(
|
|
entitySocialProofJavaResult.getNodeMetadataId,
|
|
entitySocialProofJavaResult.getWeight,
|
|
getThriftSocialProof(entitySocialProofJavaResult)
|
|
)
|
|
)
|
|
)
|
|
} else if (entitySocialProofJavaResult.getRecommendationType == JavaRecommendationType.HASHTAG) {
|
|
Some(
|
|
UserTweetEntityRecommendationUnion.HashtagRec(
|
|
HashtagRecommendation(
|
|
entitySocialProofJavaResult.getNodeMetadataId,
|
|
entitySocialProofJavaResult.getWeight,
|
|
getThriftSocialProof(entitySocialProofJavaResult)
|
|
)
|
|
)
|
|
)
|
|
} else {
|
|
None
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
private def getTweetSocialProof(
|
|
request: SocialProofThriftRequest
|
|
): Future[Seq[UserTweetEntityRecommendationUnion]] = {
|
|
val socialProofsFuture = tweetSocialProofRunner(request)
|
|
|
|
socialProofsFuture.map { socialProofs: Seq[RecommendationInfo] =>
|
|
stats.counter(Stats.Served).incr(socialProofs.size)
|
|
socialProofs.flatMap { tweetSocialProof: RecommendationInfo =>
|
|
val tweetSocialProofJavaResult = tweetSocialProof.asInstanceOf[SocialProofJavaResult]
|
|
Some(
|
|
UserTweetEntityRecommendationUnion.TweetRec(
|
|
TweetRecommendation(
|
|
tweetSocialProofJavaResult.getNode,
|
|
tweetSocialProofJavaResult.getWeight,
|
|
getThriftSocialProof(tweetSocialProofJavaResult)
|
|
)
|
|
)
|
|
)
|
|
}
|
|
}
|
|
}
|
|
|
|
def apply(request: SocialProofThriftRequest): Future[SocialProofThriftResponse] = {
|
|
trackFutureBlockStats(stats) {
|
|
val recommendationsWithSocialProofFut = Future
|
|
.collect {
|
|
request.recommendationIdsForSocialProof.keys.map {
|
|
case ThriftRecommendationType.Tweet if decider.tweetSocialProof =>
|
|
getTweetSocialProof(request)
|
|
case (ThriftRecommendationType.Url | ThriftRecommendationType.Hashtag)
|
|
if decider.entitySocialProof =>
|
|
getEntitySocialProof(request)
|
|
case _ =>
|
|
Future.Nil
|
|
}.toSeq
|
|
}.map(_.flatten)
|
|
recommendationsWithSocialProofFut.map { recommendationsWithSocialProof =>
|
|
SocialProofThriftResponse(recommendationsWithSocialProof)
|
|
}
|
|
}
|
|
}
|
|
}
|