353 lines
13 KiB
Scala
353 lines
13 KiB
Scala
package com.twitter.tweetypie
|
|
package handler
|
|
|
|
import com.twitter.flockdb.client._
|
|
import com.twitter.snowflake.id.SnowflakeId
|
|
import com.twitter.stitch.Stitch
|
|
import com.twitter.tweetypie.additionalfields.AdditionalFields.setAdditionalFields
|
|
import com.twitter.tweetypie.core._
|
|
import com.twitter.tweetypie.repository._
|
|
import com.twitter.tweetypie.thriftscala._
|
|
import com.twitter.tweetypie.thriftscala.entities.EntityExtractor
|
|
import com.twitter.tweetypie.tweettext.Truncator
|
|
import com.twitter.tweetypie.util.CommunityUtil
|
|
import com.twitter.tweetypie.util.EditControlUtil
|
|
|
|
case class SourceTweetRequest(
|
|
tweetId: TweetId,
|
|
user: User,
|
|
hydrateOptions: WritePathHydrationOptions)
|
|
|
|
object RetweetBuilder {
|
|
import TweetBuilder._
|
|
import UpstreamFailure._
|
|
|
|
type Type = FutureArrow[RetweetRequest, TweetBuilderResult]
|
|
|
|
val SGSTestRole = "socialgraph"
|
|
|
|
val log: Logger = Logger(getClass)
|
|
|
|
/**
|
|
* Retweets text gets RT and username prepended
|
|
*/
|
|
def composeRetweetText(text: String, sourceUser: User): String =
|
|
composeRetweetText(text, sourceUser.profile.get.screenName)
|
|
|
|
/**
|
|
* Retweets text gets RT and username prepended
|
|
*/
|
|
def composeRetweetText(text: String, screenName: String): String =
|
|
Truncator.truncateForRetweet("RT @" + screenName + ": " + text)
|
|
|
|
// We do not want to allow community tweets to be retweeted.
|
|
def validateNotCommunityTweet(sourceTweet: Tweet): Future[Unit] =
|
|
if (CommunityUtil.hasCommunity(sourceTweet.communities)) {
|
|
Future.exception(TweetCreateFailure.State(TweetCreateState.CommunityRetweetNotAllowed))
|
|
} else {
|
|
Future.Unit
|
|
}
|
|
|
|
// We do not want to allow Trusted Friends tweets to be retweeted.
|
|
def validateNotTrustedFriendsTweet(sourceTweet: Tweet): Future[Unit] =
|
|
sourceTweet.trustedFriendsControl match {
|
|
case Some(trustedFriendsControl) =>
|
|
Future.exception(TweetCreateFailure.State(TweetCreateState.TrustedFriendsRetweetNotAllowed))
|
|
case None =>
|
|
Future.Unit
|
|
}
|
|
|
|
// We do not want to allow retweet of a stale version of a tweet in an edit chain.
|
|
def validateStaleTweet(sourceTweet: Tweet): Future[Unit] = {
|
|
if (!EditControlUtil.isLatestEdit(sourceTweet.editControl, sourceTweet.id).getOrElse(true)) {
|
|
Future.exception(TweetCreateFailure.State(TweetCreateState.StaleTweetRetweetNotAllowed))
|
|
} else {
|
|
// the source tweet does not have any edit control or the source tweet is the latest tweet
|
|
Future.Unit
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Builds the RetweetBuilder
|
|
*/
|
|
def apply(
|
|
validateRequest: RetweetRequest => Future[Unit],
|
|
tweetIdGenerator: TweetIdGenerator,
|
|
tweetRepo: TweetRepository.Type,
|
|
userRepo: UserRepository.Type,
|
|
tflock: TFlockClient,
|
|
deviceSourceRepo: DeviceSourceRepository.Type,
|
|
validateUpdateRateLimit: RateLimitChecker.Validate,
|
|
spamChecker: Spam.Checker[RetweetSpamRequest] = Spam.DoNotCheckSpam,
|
|
updateUserCounts: (User, Tweet) => Future[User],
|
|
superFollowRelationsRepo: StratoSuperFollowRelationsRepository.Type,
|
|
unretweetEdits: TweetDeletePathHandler.UnretweetEdits,
|
|
setEditWindowToSixtyMinutes: Gate[Unit]
|
|
): RetweetBuilder.Type = {
|
|
val entityExtactor = EntityExtractor.mutationAll.endo
|
|
|
|
val sourceTweetRepo: SourceTweetRequest => Stitch[Tweet] =
|
|
req => {
|
|
tweetRepo(
|
|
req.tweetId,
|
|
WritePathQueryOptions.retweetSourceTweet(req.user, req.hydrateOptions)
|
|
).rescue {
|
|
case _: FilteredState => Stitch.NotFound
|
|
}
|
|
.rescue {
|
|
convertRepoExceptions(TweetCreateState.SourceTweetNotFound, TweetLookupFailure(_))
|
|
}
|
|
}
|
|
|
|
val getUser = userLookup(userRepo)
|
|
val getSourceUser = sourceUserLookup(userRepo)
|
|
val getDeviceSource = deviceSourceLookup(deviceSourceRepo)
|
|
|
|
/**
|
|
* We exempt SGS test users from the check to get them through Block v2 testing.
|
|
*/
|
|
def isSGSTestRole(user: User): Boolean =
|
|
user.roles.exists { roles => roles.roles.contains(SGSTestRole) }
|
|
|
|
def validateCanRetweet(
|
|
user: User,
|
|
sourceUser: User,
|
|
sourceTweet: Tweet,
|
|
request: RetweetRequest
|
|
): Future[Unit] =
|
|
Future
|
|
.join(
|
|
validateNotCommunityTweet(sourceTweet),
|
|
validateNotTrustedFriendsTweet(sourceTweet),
|
|
validateSourceUserRetweetable(user, sourceUser),
|
|
validateStaleTweet(sourceTweet),
|
|
Future.when(!request.dark) {
|
|
if (request.returnSuccessOnDuplicate)
|
|
failWithRetweetIdIfAlreadyRetweeted(user, sourceTweet)
|
|
else
|
|
validateNotAlreadyRetweeted(user, sourceTweet)
|
|
}
|
|
)
|
|
.unit
|
|
|
|
def validateSourceUserRetweetable(user: User, sourceUser: User): Future[Unit] =
|
|
if (sourceUser.profile.isEmpty)
|
|
Future.exception(UserProfileEmptyException)
|
|
else if (sourceUser.safety.isEmpty)
|
|
Future.exception(UserSafetyEmptyException)
|
|
else if (sourceUser.view.isEmpty)
|
|
Future.exception(UserViewEmptyException)
|
|
else if (user.id != sourceUser.id && sourceUser.safety.get.isProtected)
|
|
Future.exception(TweetCreateFailure.State(TweetCreateState.CannotRetweetProtectedTweet))
|
|
else if (sourceUser.safety.get.deactivated)
|
|
Future.exception(TweetCreateFailure.State(TweetCreateState.CannotRetweetDeactivatedUser))
|
|
else if (sourceUser.safety.get.suspended)
|
|
Future.exception(TweetCreateFailure.State(TweetCreateState.CannotRetweetSuspendedUser))
|
|
else if (sourceUser.view.get.blockedBy && !isSGSTestRole(user))
|
|
Future.exception(TweetCreateFailure.State(TweetCreateState.CannotRetweetBlockingUser))
|
|
else if (sourceUser.profile.get.screenName.isEmpty)
|
|
Future.exception(
|
|
TweetCreateFailure.State(TweetCreateState.CannotRetweetUserWithoutScreenName)
|
|
)
|
|
else
|
|
Future.Unit
|
|
|
|
def tflockGraphContains(
|
|
graph: StatusGraph,
|
|
fromId: Long,
|
|
toId: Long,
|
|
dir: Direction
|
|
): Future[Boolean] =
|
|
tflock.contains(graph, fromId, toId, dir).rescue {
|
|
case ex: OverCapacity => Future.exception(ex)
|
|
case ex => Future.exception(TFlockLookupFailure(ex))
|
|
}
|
|
|
|
def getRetweetIdFromTflock(sourceTweetId: TweetId, userId: UserId): Future[Option[Long]] =
|
|
tflock
|
|
.selectAll(
|
|
Select(
|
|
sourceId = sourceTweetId,
|
|
graph = RetweetsGraph,
|
|
direction = Forward
|
|
).intersect(
|
|
Select(
|
|
sourceId = userId,
|
|
graph = UserTimelineGraph,
|
|
direction = Forward
|
|
)
|
|
)
|
|
)
|
|
.map(_.headOption)
|
|
|
|
def validateNotAlreadyRetweeted(user: User, sourceTweet: Tweet): Future[Unit] =
|
|
// use the perspective object from TLS if available, otherwise, check with tflock
|
|
(sourceTweet.perspective match {
|
|
case Some(perspective) =>
|
|
Future.value(perspective.retweeted)
|
|
case None =>
|
|
// we have to query the RetweetSourceGraph in the Reverse order because
|
|
// it is only defined in that direction, instead of bi-directionally
|
|
tflockGraphContains(RetweetSourceGraph, user.id, sourceTweet.id, Reverse)
|
|
}).flatMap {
|
|
case true =>
|
|
Future.exception(TweetCreateFailure.State(TweetCreateState.AlreadyRetweeted))
|
|
case false => Future.Unit
|
|
}
|
|
|
|
def failWithRetweetIdIfAlreadyRetweeted(user: User, sourceTweet: Tweet): Future[Unit] =
|
|
// use the perspective object from TLS if available, otherwise, check with tflock
|
|
(sourceTweet.perspective.flatMap(_.retweetId) match {
|
|
case Some(tweetId) => Future.value(Some(tweetId))
|
|
case None =>
|
|
getRetweetIdFromTflock(sourceTweet.id, user.id)
|
|
}).flatMap {
|
|
case None => Future.Unit
|
|
case Some(tweetId) =>
|
|
Future.exception(TweetCreateFailure.AlreadyRetweeted(tweetId))
|
|
}
|
|
|
|
def validateContributor(contributorIdOpt: Option[UserId]): Future[Unit] =
|
|
if (contributorIdOpt.isDefined)
|
|
Future.exception(TweetCreateFailure.State(TweetCreateState.ContributorNotSupported))
|
|
else
|
|
Future.Unit
|
|
|
|
case class RetweetSource(sourceTweet: Tweet, parentUserId: UserId)
|
|
|
|
/**
|
|
* Recursively follows a retweet chain to the root source tweet. Also returns user id from the
|
|
* first walked tweet as the 'parentUserId'.
|
|
* In practice, the depth of the chain should never be greater than 2 because
|
|
* share.sourceStatusId should always reference the root (unlike share.parentStatusId).
|
|
*/
|
|
def findRetweetSource(
|
|
tweetId: TweetId,
|
|
forUser: User,
|
|
hydrateOptions: WritePathHydrationOptions
|
|
): Future[RetweetSource] =
|
|
Stitch
|
|
.run(sourceTweetRepo(SourceTweetRequest(tweetId, forUser, hydrateOptions)))
|
|
.flatMap { tweet =>
|
|
getShare(tweet) match {
|
|
case None => Future.value(RetweetSource(tweet, getUserId(tweet)))
|
|
case Some(share) =>
|
|
findRetweetSource(share.sourceStatusId, forUser, hydrateOptions)
|
|
.map(_.copy(parentUserId = getUserId(tweet)))
|
|
}
|
|
}
|
|
|
|
FutureArrow { request =>
|
|
for {
|
|
() <- validateRequest(request)
|
|
userFuture = Stitch.run(getUser(request.userId))
|
|
tweetIdFuture = tweetIdGenerator()
|
|
devsrcFuture = Stitch.run(getDeviceSource(request.createdVia))
|
|
user <- userFuture
|
|
tweetId <- tweetIdFuture
|
|
devsrc <- devsrcFuture
|
|
rtSource <- findRetweetSource(
|
|
request.sourceStatusId,
|
|
user,
|
|
request.hydrationOptions.getOrElse(WritePathHydrationOptions(simpleQuotedTweet = true))
|
|
)
|
|
sourceTweet = rtSource.sourceTweet
|
|
sourceUser <- Stitch.run(getSourceUser(getUserId(sourceTweet), request.userId))
|
|
|
|
// We want to confirm that a user is actually allowed to
|
|
// retweet an Exclusive Tweet (only available to super followers)
|
|
() <- StratoSuperFollowRelationsRepository.Validate(
|
|
sourceTweet.exclusiveTweetControl,
|
|
user.id,
|
|
superFollowRelationsRepo)
|
|
|
|
() <- validateUser(user)
|
|
() <- validateUpdateRateLimit((user.id, request.dark))
|
|
() <- validateContributor(request.contributorUserId)
|
|
() <- validateCanRetweet(user, sourceUser, sourceTweet, request)
|
|
() <- unretweetEdits(sourceTweet.editControl, sourceTweet.id, user.id)
|
|
|
|
spamRequest = RetweetSpamRequest(
|
|
retweetId = tweetId,
|
|
sourceUserId = getUserId(sourceTweet),
|
|
sourceTweetId = sourceTweet.id,
|
|
sourceTweetText = getText(sourceTweet),
|
|
sourceUserName = sourceUser.profile.map(_.screenName),
|
|
safetyMetaData = request.safetyMetaData
|
|
)
|
|
|
|
spamResult <- spamChecker(spamRequest)
|
|
|
|
safety = user.safety.get
|
|
|
|
share = Share(
|
|
sourceStatusId = sourceTweet.id,
|
|
sourceUserId = sourceUser.id,
|
|
parentStatusId = request.sourceStatusId
|
|
)
|
|
|
|
retweetText = composeRetweetText(getText(sourceTweet), sourceUser)
|
|
createdAt = SnowflakeId(tweetId).time
|
|
|
|
coreData = TweetCoreData(
|
|
userId = request.userId,
|
|
text = retweetText,
|
|
createdAtSecs = createdAt.inSeconds,
|
|
createdVia = devsrc.internalName,
|
|
share = Some(share),
|
|
hasTakedown = safety.hasTakedown,
|
|
trackingId = request.trackingId,
|
|
nsfwUser = safety.nsfwUser,
|
|
nsfwAdmin = safety.nsfwAdmin,
|
|
narrowcast = request.narrowcast,
|
|
nullcast = request.nullcast
|
|
)
|
|
|
|
retweet = Tweet(
|
|
id = tweetId,
|
|
coreData = Some(coreData),
|
|
contributor = getContributor(request.userId),
|
|
editControl = Some(
|
|
EditControl.Initial(
|
|
EditControlUtil
|
|
.makeEditControlInitial(
|
|
tweetId = tweetId,
|
|
createdAt = createdAt,
|
|
setEditWindowToSixtyMinutes = setEditWindowToSixtyMinutes
|
|
)
|
|
.initial
|
|
.copy(isEditEligible = Some(false))
|
|
)
|
|
),
|
|
)
|
|
|
|
retweetWithEntities = entityExtactor(retweet)
|
|
retweetWithAdditionalFields = setAdditionalFields(
|
|
retweetWithEntities,
|
|
request.additionalFields
|
|
)
|
|
// update the perspective and counts fields of the source tweet to reflect the effects
|
|
// of the user performing a retweet, even though those effects haven't happened yet.
|
|
updatedSourceTweet = sourceTweet.copy(
|
|
perspective = sourceTweet.perspective.map {
|
|
_.copy(retweeted = true, retweetId = Some(retweet.id))
|
|
},
|
|
counts = sourceTweet.counts.map { c => c.copy(retweetCount = c.retweetCount.map(_ + 1)) }
|
|
)
|
|
|
|
user <- updateUserCounts(user, retweetWithAdditionalFields)
|
|
} yield {
|
|
TweetBuilderResult(
|
|
tweet = retweetWithAdditionalFields,
|
|
user = user,
|
|
createdAt = createdAt,
|
|
sourceTweet = Some(updatedSourceTweet),
|
|
sourceUser = Some(sourceUser),
|
|
parentUserId = Some(rtSource.parentUserId),
|
|
isSilentFail = spamResult == Spam.SilentFail
|
|
)
|
|
}
|
|
}
|
|
}
|
|
}
|