the-algorithm/tweetypie/server/src/main/scala/com/twitter/tweetypie/handler/PostTweet.scala

396 lines
16 KiB
Scala

package com.twitter.tweetypie
package handler
import com.twitter.context.thriftscala.FeatureContext
import com.twitter.tweetypie.backends.LimiterService
import com.twitter.tweetypie.core._
import com.twitter.tweetypie.serverutil.ExceptionCounter
import com.twitter.tweetypie.store.InsertTweet
import com.twitter.tweetypie.thriftscala._
import com.twitter.tweetypie.util.TweetCreationLock.{Key => TweetCreationLockKey}
object PostTweet {
type Type[R] = FutureArrow[R, PostTweetResult]
/**
* A type-class to abstract over tweet creation requests.
*/
trait RequestView[R] {
def isDark(req: R): Boolean
def sourceTweetId(req: R): Option[TweetId]
def options(req: R): Option[WritePathHydrationOptions]
def userId(req: R): UserId
def uniquenessId(req: R): Option[Long]
def returnSuccessOnDuplicate(req: R): Boolean
def returnDuplicateTweet(req: R): Boolean =
returnSuccessOnDuplicate(req) || uniquenessId(req).nonEmpty
def lockKey(req: R): TweetCreationLockKey
def geo(req: R): Option[TweetCreateGeo]
def featureContext(req: R): Option[FeatureContext]
def additionalContext(req: R): Option[collection.Map[TweetCreateContextKey, String]]
def transientContext(req: R): Option[TransientCreateContext]
def additionalFields(req: R): Option[Tweet]
def duplicateState: TweetCreateState
def scope: String
def isNullcast(req: R): Boolean
def creativesContainerId(req: R): Option[CreativesContainerId]
def noteTweetMentionedUserIds(req: R): Option[Seq[Long]]
}
/**
* An implementation of `RequestView` for `PostTweetRequest`.
*/
implicit object PostTweetRequestView extends RequestView[PostTweetRequest] {
def isDark(req: PostTweetRequest): Boolean = req.dark
def sourceTweetId(req: PostTweetRequest): None.type = None
def options(req: PostTweetRequest): Option[WritePathHydrationOptions] = req.hydrationOptions
def userId(req: PostTweetRequest): UserId = req.userId
def uniquenessId(req: PostTweetRequest): Option[Long] = req.uniquenessId
def returnSuccessOnDuplicate(req: PostTweetRequest) = false
def lockKey(req: PostTweetRequest): TweetCreationLockKey = TweetCreationLockKey.byRequest(req)
def geo(req: PostTweetRequest): Option[TweetCreateGeo] = req.geo
def featureContext(req: PostTweetRequest): Option[FeatureContext] = req.featureContext
def additionalContext(
req: PostTweetRequest
): Option[collection.Map[TweetCreateContextKey, String]] = req.additionalContext
def transientContext(req: PostTweetRequest): Option[TransientCreateContext] =
req.transientContext
def additionalFields(req: PostTweetRequest): Option[Tweet] = req.additionalFields
def duplicateState: TweetCreateState.Duplicate.type = TweetCreateState.Duplicate
def scope = "tweet"
def isNullcast(req: PostTweetRequest): Boolean = req.nullcast
def creativesContainerId(req: PostTweetRequest): Option[CreativesContainerId] =
req.underlyingCreativesContainerId
def noteTweetMentionedUserIds(req: PostTweetRequest): Option[Seq[Long]] =
req.noteTweetOptions match {
case Some(noteTweetOptions) => noteTweetOptions.mentionedUserIds
case _ => None
}
}
/**
* An implementation of `RequestView` for `RetweetRequest`.
*/
implicit object RetweetRequestView extends RequestView[RetweetRequest] {
def isDark(req: RetweetRequest): Boolean = req.dark
def sourceTweetId(req: RetweetRequest): None.type = None
def options(req: RetweetRequest): Option[WritePathHydrationOptions] = req.hydrationOptions
def userId(req: RetweetRequest): UserId = req.userId
def uniquenessId(req: RetweetRequest): Option[Long] = req.uniquenessId
def returnSuccessOnDuplicate(req: RetweetRequest): Boolean = req.returnSuccessOnDuplicate
def lockKey(req: RetweetRequest): TweetCreationLockKey =
req.uniquenessId match {
case Some(id) => TweetCreationLockKey.byUniquenessId(req.userId, id)
case None => TweetCreationLockKey.bySourceTweetId(req.userId, req.sourceStatusId)
}
def geo(req: RetweetRequest): None.type = None
def featureContext(req: RetweetRequest): Option[FeatureContext] = req.featureContext
def additionalContext(req: RetweetRequest): None.type = None
def transientContext(req: RetweetRequest): None.type = None
def additionalFields(req: RetweetRequest): Option[Tweet] = req.additionalFields
def duplicateState: TweetCreateState.AlreadyRetweeted.type = TweetCreateState.AlreadyRetweeted
def scope = "retweet"
def isNullcast(req: RetweetRequest): Boolean = req.nullcast
def creativesContainerId(req: RetweetRequest): Option[CreativesContainerId] = None
def noteTweetMentionedUserIds(req: RetweetRequest): Option[Seq[Long]] = None
}
/**
* A `Filter` is used to decorate a `FutureArrow` that has a known return type
* and an input type for which there is a `RequestView` type-class instance.
*/
trait Filter[Res] { self =>
type T[Req] = FutureArrow[Req, Res]
/**
* Wraps a base arrow with additional behavior.
*/
def apply[Req: RequestView](base: T[Req]): T[Req]
/**
* Composes two filter. The resulting filter itself composes FutureArrows.
*/
def andThen(next: Filter[Res]): Filter[Res] =
new Filter[Res] {
def apply[Req: RequestView](base: T[Req]): T[Req] =
next(self(base))
}
}
/**
* This filter attempts to prevent some race-condition related duplicate tweet creations,
* via use of a `TweetCreateLock`. When a duplicate is detected, this filter can synthesize
* a successful `PostTweetResult` if applicable, or return the appropriate coded response.
*/
object DuplicateHandler {
def apply(
tweetCreationLock: TweetCreationLock,
getTweets: GetTweetsHandler.Type,
stats: StatsReceiver
): Filter[PostTweetResult] =
new Filter[PostTweetResult] {
def apply[R: RequestView](base: T[R]): T[R] = {
val view = implicitly[RequestView[R]]
val notFoundCount = stats.counter(view.scope, "not_found")
val foundCounter = stats.counter(view.scope, "found")
FutureArrow.rec[R, PostTweetResult] { self => req =>
val duplicateKey = view.lockKey(req)
// attempts to find the duplicate tweet.
//
// if `returnDupTweet` is true and we find the tweet, then we return a
// successful `PostTweetResult` with that tweet. if we don't find the
// tweet, we throw an `InternalServerError`.
//
// if `returnDupTweet` is false and we find the tweet, then we return
// the appropriate duplicate state. if we don't find the tweet, then
// we unlock the duplicate key and try again.
def duplicate(tweetId: TweetId, returnDupTweet: Boolean) =
findDuplicate(tweetId, req).flatMap {
case Some(postTweetResult) =>
foundCounter.incr()
if (returnDupTweet) Future.value(postTweetResult)
else Future.value(PostTweetResult(state = view.duplicateState))
case None =>
notFoundCount.incr()
if (returnDupTweet) {
// If we failed to load the tweet, but we know that it
// should exist, then return an InternalServerError, so that
// the client treats it as a failed tweet creation req.
Future.exception(
InternalServerError("Failed to load duplicate existing tweet: " + tweetId)
)
} else {
// Assume the lock is stale if we can't load the tweet. It's
// possible that the lock is not stale, but the tweet is not
// yet available, which requires that it not be present in
// cache and not yet available from the backend. This means
// that the failure mode is to allow tweeting if we can't
// determine the state, but it should be rare that we can't
// determine it.
tweetCreationLock.unlock(duplicateKey).before(self(req))
}
}
tweetCreationLock(duplicateKey, view.isDark(req), view.isNullcast(req)) {
base(req)
}.rescue {
case TweetCreationInProgress =>
Future.value(PostTweetResult(state = TweetCreateState.Duplicate))
// if tweetCreationLock detected a duplicate, look up the duplicate
// and return the appropriate result
case DuplicateTweetCreation(tweetId) =>
duplicate(tweetId, view.returnDuplicateTweet(req))
// it's possible that tweetCreationLock didn't find a duplicate for a
// retweet attempt, but `RetweetBuilder` did.
case TweetCreateFailure.AlreadyRetweeted(tweetId) if view.returnDuplicateTweet(req) =>
duplicate(tweetId, true)
}
}
}
private def findDuplicate[R: RequestView](
tweetId: TweetId,
req: R
): Future[Option[PostTweetResult]] = {
val view = implicitly[RequestView[R]]
val readRequest =
GetTweetsRequest(
tweetIds = Seq(tweetId),
// Assume that the defaults are OK for all of the hydration
// options except the ones that are explicitly set in the
// req.
options = Some(
GetTweetOptions(
forUserId = Some(view.userId(req)),
includePerspectivals = true,
includeCards = view.options(req).exists(_.includeCards),
cardsPlatformKey = view.options(req).flatMap(_.cardsPlatformKey)
)
)
)
getTweets(readRequest).map {
case Seq(result) =>
if (result.tweetState == StatusState.Found) {
// If the tweet was successfully found, then convert the
// read result into a successful write result.
Some(
PostTweetResult(
TweetCreateState.Ok,
result.tweet,
// if the retweet is really old, the retweet perspective might no longer
// be available, but we want to maintain the invariant that the `postRetweet`
// endpoint always returns a source tweet with the correct perspective.
result.sourceTweet.map { srcTweet =>
TweetLenses.perspective
.update(_.map(_.copy(retweeted = true, retweetId = Some(tweetId))))
.apply(srcTweet)
},
result.quotedTweet
)
)
} else {
None
}
}
}
}
}
/**
* A `Filter` that applies rate limiting to failing requests.
*/
object RateLimitFailures {
def apply(
validateLimit: RateLimitChecker.Validate,
incrementSuccess: LimiterService.IncrementByOne,
incrementFailure: LimiterService.IncrementByOne
): Filter[TweetBuilderResult] =
new Filter[TweetBuilderResult] {
def apply[R: RequestView](base: T[R]): T[R] = {
val view = implicitly[RequestView[R]]
FutureArrow[R, TweetBuilderResult] { req =>
val userId = view.userId(req)
val dark = view.isDark(req)
val contributorUserId: Option[UserId] = getContributor(userId).map(_.userId)
validateLimit((userId, dark))
.before {
base(req).onFailure { _ =>
// We don't increment the failure rate limit if the failure
// was from the failure rate limit so that the user can't
// get in a loop where tweet creation is never attempted. We
// don't increment it if the creation is dark because there
// is no way to perform a dark tweet creation through the
// API, so it's most likey some kind of test traffic like
// tap-compare.
if (!dark) incrementFailure(userId, contributorUserId)
}
}
.onSuccess { resp =>
// If we return a silent failure, then we want to
// increment the rate limit as if the tweet was fully
// created, because we want it to appear that way to the
// user whose creation silently failed.
if (resp.isSilentFail) incrementSuccess(userId, contributorUserId)
}
}
}
}
}
/**
* A `Filter` for counting non-`TweetCreateFailure` failures.
*/
object CountFailures {
def apply[Res](stats: StatsReceiver, scopeSuffix: String = "_builder"): Filter[Res] =
new Filter[Res] {
def apply[R: RequestView](base: T[R]): T[R] = {
val view = implicitly[RequestView[R]]
val exceptionCounter = ExceptionCounter(stats.scope(view.scope + scopeSuffix))
base.onFailure {
case (_, _: TweetCreateFailure) =>
case (_, ex) => exceptionCounter(ex)
}
}
}
}
/**
* A `Filter` for logging failures.
*/
object LogFailures extends Filter[PostTweetResult] {
private[this] val failedTweetCreationsLogger = Logger(
"com.twitter.tweetypie.FailedTweetCreations"
)
def apply[R: RequestView](base: T[R]): T[R] =
FutureArrow[R, PostTweetResult] { req =>
base(req).onFailure {
case failure => failedTweetCreationsLogger.info(s"request: $req\nfailure: $failure")
}
}
}
/**
* A `Filter` for converting a thrown `TweetCreateFailure` into a `PostTweetResult`.
*/
object RescueTweetCreateFailure extends Filter[PostTweetResult] {
def apply[R: RequestView](base: T[R]): T[R] =
FutureArrow[R, PostTweetResult] { req =>
base(req).rescue {
case failure: TweetCreateFailure => Future.value(failure.toPostTweetResult)
}
}
}
/**
* Builds a base handler for `PostTweetRequest` and `RetweetRequest`. The handler
* calls an underlying tweet builder, creates a `InsertTweet.Event`, hydrates
* that, passes it to `tweetStore`, and then converts it to a `PostTweetResult`.
*/
object Handler {
def apply[R: RequestView](
tweetBuilder: FutureArrow[R, TweetBuilderResult],
hydrateInsertEvent: FutureArrow[InsertTweet.Event, InsertTweet.Event],
tweetStore: InsertTweet.Store,
): Type[R] = {
FutureArrow { req =>
for {
bldrRes <- tweetBuilder(req)
event <- hydrateInsertEvent(toInsertTweetEvent(req, bldrRes))
_ <- Future.when(!event.dark)(tweetStore.insertTweet(event))
} yield toPostTweetResult(event)
}
}
/**
* Converts a request/`TweetBuilderResult` pair into an `InsertTweet.Event`.
*/
def toInsertTweetEvent[R: RequestView](
req: R,
bldrRes: TweetBuilderResult
): InsertTweet.Event = {
val view = implicitly[RequestView[R]]
InsertTweet.Event(
tweet = bldrRes.tweet,
user = bldrRes.user,
sourceTweet = bldrRes.sourceTweet,
sourceUser = bldrRes.sourceUser,
parentUserId = bldrRes.parentUserId,
timestamp = bldrRes.createdAt,
dark = view.isDark(req) || bldrRes.isSilentFail,
hydrateOptions = view.options(req).getOrElse(WritePathHydrationOptions()),
featureContext = view.featureContext(req),
initialTweetUpdateRequest = bldrRes.initialTweetUpdateRequest,
geoSearchRequestId = for {
geo <- view.geo(req)
searchRequestID <- geo.geoSearchRequestId
} yield {
GeoSearchRequestId(requestID = searchRequestID.id)
},
additionalContext = view.additionalContext(req),
transientContext = view.transientContext(req),
noteTweetMentionedUserIds = view.noteTweetMentionedUserIds(req)
)
}
/**
* Converts an `InsertTweet.Event` into a successful `PostTweetResult`.
*/
def toPostTweetResult(event: InsertTweet.Event): PostTweetResult =
PostTweetResult(
TweetCreateState.Ok,
Some(event.tweet),
sourceTweet = event.sourceTweet,
quotedTweet = event.quotedTweet
)
}
}