396 lines
16 KiB
Scala
396 lines
16 KiB
Scala
package com.twitter.tweetypie
|
|
package handler
|
|
|
|
import com.twitter.context.thriftscala.FeatureContext
|
|
import com.twitter.tweetypie.backends.LimiterService
|
|
import com.twitter.tweetypie.core._
|
|
import com.twitter.tweetypie.serverutil.ExceptionCounter
|
|
import com.twitter.tweetypie.store.InsertTweet
|
|
import com.twitter.tweetypie.thriftscala._
|
|
import com.twitter.tweetypie.util.TweetCreationLock.{Key => TweetCreationLockKey}
|
|
|
|
object PostTweet {
|
|
type Type[R] = FutureArrow[R, PostTweetResult]
|
|
|
|
/**
|
|
* A type-class to abstract over tweet creation requests.
|
|
*/
|
|
trait RequestView[R] {
|
|
def isDark(req: R): Boolean
|
|
def sourceTweetId(req: R): Option[TweetId]
|
|
def options(req: R): Option[WritePathHydrationOptions]
|
|
def userId(req: R): UserId
|
|
def uniquenessId(req: R): Option[Long]
|
|
def returnSuccessOnDuplicate(req: R): Boolean
|
|
def returnDuplicateTweet(req: R): Boolean =
|
|
returnSuccessOnDuplicate(req) || uniquenessId(req).nonEmpty
|
|
def lockKey(req: R): TweetCreationLockKey
|
|
def geo(req: R): Option[TweetCreateGeo]
|
|
def featureContext(req: R): Option[FeatureContext]
|
|
def additionalContext(req: R): Option[collection.Map[TweetCreateContextKey, String]]
|
|
def transientContext(req: R): Option[TransientCreateContext]
|
|
def additionalFields(req: R): Option[Tweet]
|
|
def duplicateState: TweetCreateState
|
|
def scope: String
|
|
def isNullcast(req: R): Boolean
|
|
def creativesContainerId(req: R): Option[CreativesContainerId]
|
|
def noteTweetMentionedUserIds(req: R): Option[Seq[Long]]
|
|
}
|
|
|
|
/**
|
|
* An implementation of `RequestView` for `PostTweetRequest`.
|
|
*/
|
|
implicit object PostTweetRequestView extends RequestView[PostTweetRequest] {
|
|
def isDark(req: PostTweetRequest): Boolean = req.dark
|
|
def sourceTweetId(req: PostTweetRequest): None.type = None
|
|
def options(req: PostTweetRequest): Option[WritePathHydrationOptions] = req.hydrationOptions
|
|
def userId(req: PostTweetRequest): UserId = req.userId
|
|
def uniquenessId(req: PostTweetRequest): Option[Long] = req.uniquenessId
|
|
def returnSuccessOnDuplicate(req: PostTweetRequest) = false
|
|
def lockKey(req: PostTweetRequest): TweetCreationLockKey = TweetCreationLockKey.byRequest(req)
|
|
def geo(req: PostTweetRequest): Option[TweetCreateGeo] = req.geo
|
|
def featureContext(req: PostTweetRequest): Option[FeatureContext] = req.featureContext
|
|
def additionalContext(
|
|
req: PostTweetRequest
|
|
): Option[collection.Map[TweetCreateContextKey, String]] = req.additionalContext
|
|
def transientContext(req: PostTweetRequest): Option[TransientCreateContext] =
|
|
req.transientContext
|
|
def additionalFields(req: PostTweetRequest): Option[Tweet] = req.additionalFields
|
|
def duplicateState: TweetCreateState.Duplicate.type = TweetCreateState.Duplicate
|
|
def scope = "tweet"
|
|
def isNullcast(req: PostTweetRequest): Boolean = req.nullcast
|
|
def creativesContainerId(req: PostTweetRequest): Option[CreativesContainerId] =
|
|
req.underlyingCreativesContainerId
|
|
def noteTweetMentionedUserIds(req: PostTweetRequest): Option[Seq[Long]] =
|
|
req.noteTweetOptions match {
|
|
case Some(noteTweetOptions) => noteTweetOptions.mentionedUserIds
|
|
case _ => None
|
|
}
|
|
}
|
|
|
|
/**
|
|
* An implementation of `RequestView` for `RetweetRequest`.
|
|
*/
|
|
implicit object RetweetRequestView extends RequestView[RetweetRequest] {
|
|
def isDark(req: RetweetRequest): Boolean = req.dark
|
|
def sourceTweetId(req: RetweetRequest): None.type = None
|
|
def options(req: RetweetRequest): Option[WritePathHydrationOptions] = req.hydrationOptions
|
|
def userId(req: RetweetRequest): UserId = req.userId
|
|
def uniquenessId(req: RetweetRequest): Option[Long] = req.uniquenessId
|
|
def returnSuccessOnDuplicate(req: RetweetRequest): Boolean = req.returnSuccessOnDuplicate
|
|
def lockKey(req: RetweetRequest): TweetCreationLockKey =
|
|
req.uniquenessId match {
|
|
case Some(id) => TweetCreationLockKey.byUniquenessId(req.userId, id)
|
|
case None => TweetCreationLockKey.bySourceTweetId(req.userId, req.sourceStatusId)
|
|
}
|
|
def geo(req: RetweetRequest): None.type = None
|
|
def featureContext(req: RetweetRequest): Option[FeatureContext] = req.featureContext
|
|
def additionalContext(req: RetweetRequest): None.type = None
|
|
def transientContext(req: RetweetRequest): None.type = None
|
|
def additionalFields(req: RetweetRequest): Option[Tweet] = req.additionalFields
|
|
def duplicateState: TweetCreateState.AlreadyRetweeted.type = TweetCreateState.AlreadyRetweeted
|
|
def scope = "retweet"
|
|
def isNullcast(req: RetweetRequest): Boolean = req.nullcast
|
|
def creativesContainerId(req: RetweetRequest): Option[CreativesContainerId] = None
|
|
def noteTweetMentionedUserIds(req: RetweetRequest): Option[Seq[Long]] = None
|
|
}
|
|
|
|
/**
|
|
* A `Filter` is used to decorate a `FutureArrow` that has a known return type
|
|
* and an input type for which there is a `RequestView` type-class instance.
|
|
*/
|
|
trait Filter[Res] { self =>
|
|
type T[Req] = FutureArrow[Req, Res]
|
|
|
|
/**
|
|
* Wraps a base arrow with additional behavior.
|
|
*/
|
|
def apply[Req: RequestView](base: T[Req]): T[Req]
|
|
|
|
/**
|
|
* Composes two filter. The resulting filter itself composes FutureArrows.
|
|
*/
|
|
def andThen(next: Filter[Res]): Filter[Res] =
|
|
new Filter[Res] {
|
|
def apply[Req: RequestView](base: T[Req]): T[Req] =
|
|
next(self(base))
|
|
}
|
|
}
|
|
|
|
/**
|
|
* This filter attempts to prevent some race-condition related duplicate tweet creations,
|
|
* via use of a `TweetCreateLock`. When a duplicate is detected, this filter can synthesize
|
|
* a successful `PostTweetResult` if applicable, or return the appropriate coded response.
|
|
*/
|
|
object DuplicateHandler {
|
|
def apply(
|
|
tweetCreationLock: TweetCreationLock,
|
|
getTweets: GetTweetsHandler.Type,
|
|
stats: StatsReceiver
|
|
): Filter[PostTweetResult] =
|
|
new Filter[PostTweetResult] {
|
|
def apply[R: RequestView](base: T[R]): T[R] = {
|
|
val view = implicitly[RequestView[R]]
|
|
val notFoundCount = stats.counter(view.scope, "not_found")
|
|
val foundCounter = stats.counter(view.scope, "found")
|
|
|
|
FutureArrow.rec[R, PostTweetResult] { self => req =>
|
|
val duplicateKey = view.lockKey(req)
|
|
|
|
// attempts to find the duplicate tweet.
|
|
//
|
|
// if `returnDupTweet` is true and we find the tweet, then we return a
|
|
// successful `PostTweetResult` with that tweet. if we don't find the
|
|
// tweet, we throw an `InternalServerError`.
|
|
//
|
|
// if `returnDupTweet` is false and we find the tweet, then we return
|
|
// the appropriate duplicate state. if we don't find the tweet, then
|
|
// we unlock the duplicate key and try again.
|
|
def duplicate(tweetId: TweetId, returnDupTweet: Boolean) =
|
|
findDuplicate(tweetId, req).flatMap {
|
|
case Some(postTweetResult) =>
|
|
foundCounter.incr()
|
|
if (returnDupTweet) Future.value(postTweetResult)
|
|
else Future.value(PostTweetResult(state = view.duplicateState))
|
|
|
|
case None =>
|
|
notFoundCount.incr()
|
|
if (returnDupTweet) {
|
|
// If we failed to load the tweet, but we know that it
|
|
// should exist, then return an InternalServerError, so that
|
|
// the client treats it as a failed tweet creation req.
|
|
Future.exception(
|
|
InternalServerError("Failed to load duplicate existing tweet: " + tweetId)
|
|
)
|
|
} else {
|
|
// Assume the lock is stale if we can't load the tweet. It's
|
|
// possible that the lock is not stale, but the tweet is not
|
|
// yet available, which requires that it not be present in
|
|
// cache and not yet available from the backend. This means
|
|
// that the failure mode is to allow tweeting if we can't
|
|
// determine the state, but it should be rare that we can't
|
|
// determine it.
|
|
tweetCreationLock.unlock(duplicateKey).before(self(req))
|
|
}
|
|
}
|
|
|
|
tweetCreationLock(duplicateKey, view.isDark(req), view.isNullcast(req)) {
|
|
base(req)
|
|
}.rescue {
|
|
case TweetCreationInProgress =>
|
|
Future.value(PostTweetResult(state = TweetCreateState.Duplicate))
|
|
|
|
// if tweetCreationLock detected a duplicate, look up the duplicate
|
|
// and return the appropriate result
|
|
case DuplicateTweetCreation(tweetId) =>
|
|
duplicate(tweetId, view.returnDuplicateTweet(req))
|
|
|
|
// it's possible that tweetCreationLock didn't find a duplicate for a
|
|
// retweet attempt, but `RetweetBuilder` did.
|
|
case TweetCreateFailure.AlreadyRetweeted(tweetId) if view.returnDuplicateTweet(req) =>
|
|
duplicate(tweetId, true)
|
|
}
|
|
}
|
|
}
|
|
|
|
private def findDuplicate[R: RequestView](
|
|
tweetId: TweetId,
|
|
req: R
|
|
): Future[Option[PostTweetResult]] = {
|
|
val view = implicitly[RequestView[R]]
|
|
val readRequest =
|
|
GetTweetsRequest(
|
|
tweetIds = Seq(tweetId),
|
|
// Assume that the defaults are OK for all of the hydration
|
|
// options except the ones that are explicitly set in the
|
|
// req.
|
|
options = Some(
|
|
GetTweetOptions(
|
|
forUserId = Some(view.userId(req)),
|
|
includePerspectivals = true,
|
|
includeCards = view.options(req).exists(_.includeCards),
|
|
cardsPlatformKey = view.options(req).flatMap(_.cardsPlatformKey)
|
|
)
|
|
)
|
|
)
|
|
|
|
getTweets(readRequest).map {
|
|
case Seq(result) =>
|
|
if (result.tweetState == StatusState.Found) {
|
|
// If the tweet was successfully found, then convert the
|
|
// read result into a successful write result.
|
|
Some(
|
|
PostTweetResult(
|
|
TweetCreateState.Ok,
|
|
result.tweet,
|
|
// if the retweet is really old, the retweet perspective might no longer
|
|
// be available, but we want to maintain the invariant that the `postRetweet`
|
|
// endpoint always returns a source tweet with the correct perspective.
|
|
result.sourceTweet.map { srcTweet =>
|
|
TweetLenses.perspective
|
|
.update(_.map(_.copy(retweeted = true, retweetId = Some(tweetId))))
|
|
.apply(srcTweet)
|
|
},
|
|
result.quotedTweet
|
|
)
|
|
)
|
|
} else {
|
|
None
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* A `Filter` that applies rate limiting to failing requests.
|
|
*/
|
|
object RateLimitFailures {
|
|
def apply(
|
|
validateLimit: RateLimitChecker.Validate,
|
|
incrementSuccess: LimiterService.IncrementByOne,
|
|
incrementFailure: LimiterService.IncrementByOne
|
|
): Filter[TweetBuilderResult] =
|
|
new Filter[TweetBuilderResult] {
|
|
def apply[R: RequestView](base: T[R]): T[R] = {
|
|
val view = implicitly[RequestView[R]]
|
|
|
|
FutureArrow[R, TweetBuilderResult] { req =>
|
|
val userId = view.userId(req)
|
|
val dark = view.isDark(req)
|
|
val contributorUserId: Option[UserId] = getContributor(userId).map(_.userId)
|
|
|
|
validateLimit((userId, dark))
|
|
.before {
|
|
base(req).onFailure { _ =>
|
|
// We don't increment the failure rate limit if the failure
|
|
// was from the failure rate limit so that the user can't
|
|
// get in a loop where tweet creation is never attempted. We
|
|
// don't increment it if the creation is dark because there
|
|
// is no way to perform a dark tweet creation through the
|
|
// API, so it's most likey some kind of test traffic like
|
|
// tap-compare.
|
|
if (!dark) incrementFailure(userId, contributorUserId)
|
|
}
|
|
}
|
|
.onSuccess { resp =>
|
|
// If we return a silent failure, then we want to
|
|
// increment the rate limit as if the tweet was fully
|
|
// created, because we want it to appear that way to the
|
|
// user whose creation silently failed.
|
|
if (resp.isSilentFail) incrementSuccess(userId, contributorUserId)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* A `Filter` for counting non-`TweetCreateFailure` failures.
|
|
*/
|
|
object CountFailures {
|
|
def apply[Res](stats: StatsReceiver, scopeSuffix: String = "_builder"): Filter[Res] =
|
|
new Filter[Res] {
|
|
def apply[R: RequestView](base: T[R]): T[R] = {
|
|
val view = implicitly[RequestView[R]]
|
|
val exceptionCounter = ExceptionCounter(stats.scope(view.scope + scopeSuffix))
|
|
base.onFailure {
|
|
case (_, _: TweetCreateFailure) =>
|
|
case (_, ex) => exceptionCounter(ex)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* A `Filter` for logging failures.
|
|
*/
|
|
object LogFailures extends Filter[PostTweetResult] {
|
|
private[this] val failedTweetCreationsLogger = Logger(
|
|
"com.twitter.tweetypie.FailedTweetCreations"
|
|
)
|
|
|
|
def apply[R: RequestView](base: T[R]): T[R] =
|
|
FutureArrow[R, PostTweetResult] { req =>
|
|
base(req).onFailure {
|
|
case failure => failedTweetCreationsLogger.info(s"request: $req\nfailure: $failure")
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* A `Filter` for converting a thrown `TweetCreateFailure` into a `PostTweetResult`.
|
|
*/
|
|
object RescueTweetCreateFailure extends Filter[PostTweetResult] {
|
|
def apply[R: RequestView](base: T[R]): T[R] =
|
|
FutureArrow[R, PostTweetResult] { req =>
|
|
base(req).rescue {
|
|
case failure: TweetCreateFailure => Future.value(failure.toPostTweetResult)
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Builds a base handler for `PostTweetRequest` and `RetweetRequest`. The handler
|
|
* calls an underlying tweet builder, creates a `InsertTweet.Event`, hydrates
|
|
* that, passes it to `tweetStore`, and then converts it to a `PostTweetResult`.
|
|
*/
|
|
object Handler {
|
|
def apply[R: RequestView](
|
|
tweetBuilder: FutureArrow[R, TweetBuilderResult],
|
|
hydrateInsertEvent: FutureArrow[InsertTweet.Event, InsertTweet.Event],
|
|
tweetStore: InsertTweet.Store,
|
|
): Type[R] = {
|
|
FutureArrow { req =>
|
|
for {
|
|
bldrRes <- tweetBuilder(req)
|
|
event <- hydrateInsertEvent(toInsertTweetEvent(req, bldrRes))
|
|
_ <- Future.when(!event.dark)(tweetStore.insertTweet(event))
|
|
} yield toPostTweetResult(event)
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Converts a request/`TweetBuilderResult` pair into an `InsertTweet.Event`.
|
|
*/
|
|
def toInsertTweetEvent[R: RequestView](
|
|
req: R,
|
|
bldrRes: TweetBuilderResult
|
|
): InsertTweet.Event = {
|
|
val view = implicitly[RequestView[R]]
|
|
InsertTweet.Event(
|
|
tweet = bldrRes.tweet,
|
|
user = bldrRes.user,
|
|
sourceTweet = bldrRes.sourceTweet,
|
|
sourceUser = bldrRes.sourceUser,
|
|
parentUserId = bldrRes.parentUserId,
|
|
timestamp = bldrRes.createdAt,
|
|
dark = view.isDark(req) || bldrRes.isSilentFail,
|
|
hydrateOptions = view.options(req).getOrElse(WritePathHydrationOptions()),
|
|
featureContext = view.featureContext(req),
|
|
initialTweetUpdateRequest = bldrRes.initialTweetUpdateRequest,
|
|
geoSearchRequestId = for {
|
|
geo <- view.geo(req)
|
|
searchRequestID <- geo.geoSearchRequestId
|
|
} yield {
|
|
GeoSearchRequestId(requestID = searchRequestID.id)
|
|
},
|
|
additionalContext = view.additionalContext(req),
|
|
transientContext = view.transientContext(req),
|
|
noteTweetMentionedUserIds = view.noteTweetMentionedUserIds(req)
|
|
)
|
|
}
|
|
|
|
/**
|
|
* Converts an `InsertTweet.Event` into a successful `PostTweetResult`.
|
|
*/
|
|
def toPostTweetResult(event: InsertTweet.Event): PostTweetResult =
|
|
PostTweetResult(
|
|
TweetCreateState.Ok,
|
|
Some(event.tweet),
|
|
sourceTweet = event.sourceTweet,
|
|
quotedTweet = event.quotedTweet
|
|
)
|
|
}
|
|
}
|