the-algorithm/tweetypie/server/src/main/scala/com/twitter/tweetypie/hydrator/TweetHydration.scala

849 lines
34 KiB
Scala

package com.twitter.tweetypie
package hydrator
import com.twitter.expandodo.thriftscala.Card
import com.twitter.expandodo.thriftscala.Card2
import com.twitter.servo.cache.Cached
import com.twitter.servo.cache.CachedValueStatus
import com.twitter.servo.cache.LockingCache
import com.twitter.stitch.Stitch
import com.twitter.tweetypie.core._
import com.twitter.tweetypie.media.thriftscala.MediaRef
import com.twitter.tweetypie.repository.PastedMedia
import com.twitter.tweetypie.repository.TweetQuery
import com.twitter.tweetypie.repository.TweetRepoCachePicker
import com.twitter.tweetypie.repository.TweetResultRepository
import com.twitter.tweetypie.thriftscala._
import com.twitter.tweetypie.util.Takedowns
import com.twitter.util.Return
import com.twitter.util.Throw
object TweetHydration {
/**
* Wires up a set of hydrators that include those whose results are cached on the tweet,
* and some whose results are not cached but depend upon the results of the former.
*/
def apply(
hydratorStats: StatsReceiver,
hydrateFeatureSwitchResults: TweetDataValueHydrator,
hydrateMentions: MentionEntitiesHydrator.Type,
hydrateLanguage: LanguageHydrator.Type,
hydrateUrls: UrlEntitiesHydrator.Type,
hydrateQuotedTweetRef: QuotedTweetRefHydrator.Type,
hydrateQuotedTweetRefUrls: QuotedTweetRefUrlsHydrator.Type,
hydrateMediaCacheable: MediaEntitiesHydrator.Cacheable.Type,
hydrateReplyScreenName: ReplyScreenNameHydrator.Type,
hydrateConvoId: ConversationIdHydrator.Type,
hydratePerspective: PerspectiveHydrator.Type,
hydrateEditPerspective: EditPerspectiveHydrator.Type,
hydrateConversationMuted: ConversationMutedHydrator.Type,
hydrateContributor: ContributorHydrator.Type,
hydrateTakedowns: TakedownHydrator.Type,
hydrateDirectedAt: DirectedAtHydrator.Type,
hydrateGeoScrub: GeoScrubHydrator.Type,
hydrateCacheableRepairs: TweetDataValueHydrator,
hydrateMediaUncacheable: MediaEntitiesHydrator.Uncacheable.Type,
hydratePostCacheRepairs: TweetDataValueHydrator,
hydrateTweetLegacyFormat: TweetDataValueHydrator,
hydrateQuoteTweetVisibility: QuoteTweetVisibilityHydrator.Type,
hydrateQuotedTweet: QuotedTweetHydrator.Type,
hydratePastedMedia: PastedMediaHydrator.Type,
hydrateMediaRefs: MediaRefsHydrator.Type,
hydrateMediaTags: MediaTagsHydrator.Type,
hydrateClassicCards: CardHydrator.Type,
hydrateCard2: Card2Hydrator.Type,
hydrateContributorVisibility: ContributorVisibilityFilter.Type,
hydrateHasMedia: HasMediaHydrator.Type,
hydrateTweetCounts: TweetCountsHydrator.Type,
hydratePreviousTweetCounts: PreviousTweetCountsHydrator.Type,
hydratePlace: PlaceHydrator.Type,
hydrateDeviceSource: DeviceSourceHydrator.Type,
hydrateProfileGeo: ProfileGeoHydrator.Type,
hydrateSourceTweet: SourceTweetHydrator.Type,
hydrateIM1837State: IM1837FilterHydrator.Type,
hydrateIM2884State: IM2884FilterHydrator.Type,
hydrateIM3433State: IM3433FilterHydrator.Type,
hydrateTweetAuthorVisibility: TweetAuthorVisibilityHydrator.Type,
hydrateReportedTweetVisibility: ReportedTweetFilter.Type,
scrubSuperfluousUrlEntities: TweetDataValueHydrator,
copyFromSourceTweet: TweetDataValueHydrator,
hydrateTweetVisibility: TweetVisibilityHydrator.Type,
hydrateEscherbirdAnnotations: EscherbirdAnnotationHydrator.Type,
hydrateScrubEngagements: ScrubEngagementHydrator.Type,
hydrateConversationControl: ConversationControlHydrator.Type,
hydrateEditControl: EditControlHydrator.Type,
hydrateUnmentionData: UnmentionDataHydrator.Type,
hydrateNoteTweetSuffix: TweetDataValueHydrator
): TweetDataValueHydrator = {
val scrubCachedTweet: TweetDataValueHydrator =
ValueHydrator
.fromMutation[Tweet, TweetQuery.Options](
ScrubUncacheable.tweetMutation.countMutations(hydratorStats.counter("scrub_cached_tweet"))
)
.lensed(TweetData.Lenses.tweet)
.onlyIf((td, opts) => opts.cause.reading(td.tweet.id))
// We perform independent hydrations of individual bits of
// data and pack the results into tuples instead of updating
// the tweet for each one in order to avoid making lots of
// copies of the tweet.
val hydratePrimaryCacheableFields: TweetDataValueHydrator =
ValueHydrator[TweetData, TweetQuery.Options] { (td, opts) =>
val ctx = TweetCtx.from(td, opts)
val tweet = td.tweet
val urlsMediaQuoteTweet: Stitch[
ValueState[(Seq[UrlEntity], Seq[MediaEntity], Option[QuotedTweet])]
] =
for {
urls <- hydrateUrls(getUrls(tweet), ctx)
(media, quotedTweet) <- Stitch.join(
hydrateMediaCacheable(
getMedia(tweet),
MediaEntityHydrator.Cacheable.Ctx(urls.value, ctx)
),
for {
qtRef <- hydrateQuotedTweetRef(
tweet.quotedTweet,
QuotedTweetRefHydrator.Ctx(urls.value, ctx)
)
qtRefWithUrls <- hydrateQuotedTweetRefUrls(qtRef.value, ctx)
} yield {
ValueState(qtRefWithUrls.value, qtRef.state ++ qtRefWithUrls.state)
}
)
} yield {
ValueState.join(urls, media, quotedTweet)
}
val conversationId: Stitch[ValueState[Option[ConversationId]]] =
hydrateConvoId(getConversationId(tweet), ctx)
val mentions: Stitch[ValueState[Seq[MentionEntity]]] =
hydrateMentions(getMentions(tweet), ctx)
val replyScreenName: Stitch[ValueState[Option[Reply]]] =
hydrateReplyScreenName(getReply(tweet), ctx)
val directedAt: Stitch[ValueState[Option[DirectedAtUser]]] =
hydrateDirectedAt(
getDirectedAtUser(tweet),
DirectedAtHydrator.Ctx(
mentions = getMentions(tweet),
metadata = tweet.directedAtUserMetadata,
underlyingTweetCtx = ctx
)
)
val language: Stitch[ValueState[Option[Language]]] =
hydrateLanguage(tweet.language, ctx)
val contributor: Stitch[ValueState[Option[Contributor]]] =
hydrateContributor(tweet.contributor, ctx)
val geoScrub: Stitch[ValueState[(Option[GeoCoordinates], Option[PlaceId])]] =
hydrateGeoScrub(
(TweetLenses.geoCoordinates(tweet), TweetLenses.placeId(tweet)),
ctx
)
Stitch
.joinMap(
urlsMediaQuoteTweet,
conversationId,
mentions,
replyScreenName,
directedAt,
language,
contributor,
geoScrub
)(ValueState.join(_, _, _, _, _, _, _, _))
.map { values =>
if (values.state.isEmpty) {
ValueState.unmodified(td)
} else {
values.map {
case (
(urls, media, quotedTweet),
conversationId,
mentions,
reply,
directedAt,
language,
contributor,
coreGeo
) =>
val (coordinates, placeId) = coreGeo
td.copy(
tweet = tweet.copy(
coreData = tweet.coreData.map(
_.copy(
reply = reply,
conversationId = conversationId,
directedAtUser = directedAt,
coordinates = coordinates,
placeId = placeId
)
),
urls = Some(urls),
media = Some(media),
mentions = Some(mentions),
language = language,
quotedTweet = quotedTweet,
contributor = contributor
)
)
}
}
}
}
val assertNotScrubbed: TweetDataValueHydrator =
ValueHydrator.fromMutation[TweetData, TweetQuery.Options](
ScrubUncacheable
.assertNotScrubbed(
"output of the cacheable tweet hydrator should not require scrubbing"
)
.lensed(TweetData.Lenses.tweet)
)
val hydrateDependentUncacheableFields: TweetDataValueHydrator =
ValueHydrator[TweetData, TweetQuery.Options] { (td, opts) =>
val ctx = TweetCtx.from(td, opts)
val tweet = td.tweet
val quotedTweetResult: Stitch[ValueState[Option[QuotedTweetResult]]] =
for {
qtFilterState <- hydrateQuoteTweetVisibility(None, ctx)
quotedTweet <- hydrateQuotedTweet(
td.quotedTweetResult,
QuotedTweetHydrator.Ctx(qtFilterState.value, ctx)
)
} yield {
ValueState.join(qtFilterState, quotedTweet).map(_._2)
}
val pastedMedia: Stitch[ValueState[PastedMedia]] =
hydratePastedMedia(
PastedMediaHydrator.getPastedMedia(tweet),
PastedMediaHydrator.Ctx(getUrls(tweet), ctx)
)
val mediaTags: Stitch[ValueState[Option[TweetMediaTags]]] =
hydrateMediaTags(tweet.mediaTags, ctx)
val classicCards: Stitch[ValueState[Option[Seq[Card]]]] =
hydrateClassicCards(
tweet.cards,
CardHydrator.Ctx(getUrls(tweet), getMedia(tweet), ctx)
)
val card2: Stitch[ValueState[Option[Card2]]] =
hydrateCard2(
tweet.card2,
Card2Hydrator.Ctx(
getUrls(tweet),
getMedia(tweet),
getCardReference(tweet),
ctx,
td.featureSwitchResults
)
)
val contributorVisibility: Stitch[ValueState[Option[Contributor]]] =
hydrateContributorVisibility(tweet.contributor, ctx)
val takedowns: Stitch[ValueState[Option[Takedowns]]] =
hydrateTakedowns(
None, // None because uncacheable hydrator doesn't depend on previous value
TakedownHydrator.Ctx(Takedowns.fromTweet(tweet), ctx)
)
val conversationControl: Stitch[ValueState[Option[ConversationControl]]] =
hydrateConversationControl(
tweet.conversationControl,
ConversationControlHydrator.Ctx(getConversationId(tweet), ctx)
)
// PreviousTweetCounts and Perspective hydration depends on tweet.editControl.edit_control_initial
// having been hydrated in EditControlHydrator; thus we are chaining them together.
val editControlWithDependencies: Stitch[
ValueState[
(
Option[EditControl],
Option[StatusPerspective],
Option[StatusCounts],
Option[TweetPerspective]
)
]
] =
for {
(edit, perspective) <- Stitch.join(
hydrateEditControl(tweet.editControl, ctx),
hydratePerspective(
tweet.perspective,
PerspectiveHydrator.Ctx(td.featureSwitchResults, ctx))
)
(counts, editPerspective) <- Stitch.join(
hydratePreviousTweetCounts(
tweet.previousCounts,
PreviousTweetCountsHydrator.Ctx(edit.value, td.featureSwitchResults, ctx)),
hydrateEditPerspective(
tweet.editPerspective,
EditPerspectiveHydrator
.Ctx(perspective.value, edit.value, td.featureSwitchResults, ctx))
)
} yield {
ValueState.join(edit, perspective, counts, editPerspective)
}
Stitch
.joinMap(
quotedTweetResult,
pastedMedia,
mediaTags,
classicCards,
card2,
contributorVisibility,
takedowns,
conversationControl,
editControlWithDependencies
)(ValueState.join(_, _, _, _, _, _, _, _, _))
.map { values =>
if (values.state.isEmpty) {
ValueState.unmodified(td)
} else {
values.map {
case (
quotedTweetResult,
pastedMedia,
ownedMediaTags,
cards,
card2,
contributor,
takedowns,
conversationControl,
(editControl, perspective, previousCounts, editPerspective)
) =>
td.copy(
tweet = tweet.copy(
media = Some(pastedMedia.mediaEntities),
mediaTags = pastedMedia.mergeTweetMediaTags(ownedMediaTags),
cards = cards,
card2 = card2,
contributor = contributor,
takedownCountryCodes = takedowns.map(_.countryCodes.toSeq),
takedownReasons = takedowns.map(_.reasons.toSeq),
conversationControl = conversationControl,
editControl = editControl,
previousCounts = previousCounts,
perspective = perspective,
editPerspective = editPerspective,
),
quotedTweetResult = quotedTweetResult
)
}
}
}
}
val hydrateIndependentUncacheableFields: TweetDataEditHydrator =
EditHydrator[TweetData, TweetQuery.Options] { (td, opts) =>
val ctx = TweetCtx.from(td, opts)
val tweet = td.tweet
// Group together the results of hydrators that don't perform
// filtering, because we don't care about the precedence of
// exceptions from these hydrators, because the exceptions all
// indicate failures, and picking any failure will be
// fine. (All of the other hydrators might throw filtering
// exceptions, so we need to make sure that we give precedence
// to their failures.)
val hydratorsWithoutFiltering =
Stitch.joinMap(
hydrateTweetCounts(tweet.counts, TweetCountsHydrator.Ctx(td.featureSwitchResults, ctx)),
// Note: Place is cached in memcache, it is just not cached on the Tweet.
hydratePlace(tweet.place, ctx),
hydrateDeviceSource(tweet.deviceSource, ctx),
hydrateProfileGeo(tweet.profileGeoEnrichment, ctx)
)(ValueState.join(_, _, _, _))
/**
* Multiple hydrators throw visibility filtering exceptions so specify an order to achieve
* a deterministic hydration result while ensuring that any retweet has a source tweet:
* 1. hydrateSourceTweet throws SourceTweetNotFound, this is a detached-retweet so treat
* the retweet hydration as if it were not found
* 2. hydrateTweetAuthorVisibility
* 3. hydrateSourceTweet (other than SourceTweetNotFound already handled above)
* 4. hydrateIM1837State
* 5. hydrateIM2884State
* 6. hydrateIM3433State
* 7. hydratorsWithoutFiltering miscellaneous exceptions (any visibility filtering
* exceptions should win over failure of a hydrator)
*/
val sourceTweetAndTweetAuthorResult =
Stitch
.joinMap(
hydrateSourceTweet(td.sourceTweetResult, ctx).liftToTry,
hydrateTweetAuthorVisibility((), ctx).liftToTry,
hydrateIM1837State((), ctx).liftToTry,
hydrateIM2884State((), ctx).liftToTry,
hydrateIM3433State((), ctx).liftToTry
) {
case (Throw(t @ FilteredState.Unavailable.SourceTweetNotFound(_)), _, _, _, _) =>
Throw(t)
case (_, Throw(t), _, _, _) => Throw(t) // TweetAuthorVisibility
case (Throw(t), _, _, _, _) => Throw(t) // SourceTweet
case (_, _, Throw(t), _, _) => Throw(t) // IM1837State
case (_, _, _, Throw(t), _) => Throw(t) // IM2884State
case (_, _, _, _, Throw(t)) => Throw(t) // IM3433State
case (
Return(sourceTweetResultValue),
Return(authorVisibilityValue),
Return(im1837Value),
Return(im2884Value),
Return(im3433Value)
) =>
Return(
ValueState
.join(
sourceTweetResultValue,
authorVisibilityValue,
im1837Value,
im2884Value,
im3433Value
)
)
}.lowerFromTry
StitchExceptionPrecedence(sourceTweetAndTweetAuthorResult)
.joinWith(hydratorsWithoutFiltering)(ValueState.join(_, _))
.toStitch
.map { values =>
if (values.state.isEmpty) {
EditState.unit[TweetData]
} else {
EditState[TweetData] { tweetData =>
val tweet = tweetData.tweet
values.map {
case (
(sourceTweetResult, _, _, _, _),
(counts, place, deviceSource, profileGeo)
) =>
tweetData.copy(
tweet = tweet.copy(
counts = counts,
place = place,
deviceSource = deviceSource,
profileGeoEnrichment = profileGeo
),
sourceTweetResult = sourceTweetResult
)
}
}
}
}
}
val hydrateUnmentionDataToTweetData: TweetDataValueHydrator =
TweetHydration.setOnTweetData(
TweetData.Lenses.tweet.andThen(TweetLenses.unmentionData),
(td: TweetData, opts: TweetQuery.Options) =>
UnmentionDataHydrator
.Ctx(getConversationId(td.tweet), getMentions(td.tweet), TweetCtx.from(td, opts)),
hydrateUnmentionData
)
val hydrateCacheableFields: TweetDataValueHydrator =
ValueHydrator.inSequence(
scrubCachedTweet,
hydratePrimaryCacheableFields,
// Relies on mentions being hydrated in hydratePrimaryCacheableFields
hydrateUnmentionDataToTweetData,
assertNotScrubbed,
hydrateCacheableRepairs
)
// The conversation muted hydrator needs the conversation id,
// which comes from the primary cacheable fields, and the media hydrator
// needs the cacheable media entities.
val hydrateUncacheableMedia: TweetDataValueHydrator =
ValueHydrator[TweetData, TweetQuery.Options] { (td, opts) =>
val ctx = TweetCtx.from(td, opts)
val tweet = td.tweet
val mediaCtx =
MediaEntityHydrator.Uncacheable.Ctx(td.tweet.mediaKeys, ctx)
val media: Stitch[ValueState[Option[Seq[MediaEntity]]]] =
hydrateMediaUncacheable.liftOption.apply(td.tweet.media, mediaCtx)
val conversationMuted: Stitch[ValueState[Option[Boolean]]] =
hydrateConversationMuted(
tweet.conversationMuted,
ConversationMutedHydrator.Ctx(getConversationId(tweet), ctx)
)
// MediaRefs need to be hydrated at this phase because they rely on the media field
// on the Tweet, which can get unset by later hydrators.
val mediaRefs: Stitch[ValueState[Option[Seq[MediaRef]]]] =
hydrateMediaRefs(
tweet.mediaRefs,
MediaRefsHydrator.Ctx(getMedia(tweet), getMediaKeys(tweet), getUrls(tweet), ctx)
)
Stitch
.joinMap(
media,
conversationMuted,
mediaRefs
)(ValueState.join(_, _, _))
.map { values =>
if (values.state.isEmpty) {
ValueState.unmodified(td)
} else {
val tweet = td.tweet
values.map {
case (media, conversationMuted, mediaRefs) =>
td.copy(
tweet = tweet.copy(
media = media,
conversationMuted = conversationMuted,
mediaRefs = mediaRefs
)
)
}
}
}
}
val hydrateHasMediaToTweetData: TweetDataValueHydrator =
TweetHydration.setOnTweetData(
TweetData.Lenses.tweet.andThen(TweetLenses.hasMedia),
(td: TweetData, opts: TweetQuery.Options) => td.tweet,
hydrateHasMedia
)
val hydrateReportedTweetVisibilityToTweetData: TweetDataValueHydrator = {
// Create a TweetDataValueHydrator that calls hydrateReportedTweetVisibility, which
// either throws a FilteredState.Unavailable or returns Unit.
ValueHydrator[TweetData, TweetQuery.Options] { (td, opts) =>
val ctx = ReportedTweetFilter.Ctx(td.tweet.perspective, TweetCtx.from(td, opts))
hydrateReportedTweetVisibility((), ctx).map { _ =>
ValueState.unmodified(td)
}
}
}
val hydrateTweetVisibilityToTweetData: TweetDataValueHydrator =
TweetHydration.setOnTweetData(
TweetData.Lenses.suppress,
(td: TweetData, opts: TweetQuery.Options) =>
TweetVisibilityHydrator.Ctx(td.tweet, TweetCtx.from(td, opts)),
hydrateTweetVisibility
)
val hydrateEscherbirdAnnotationsToTweetAndCachedTweet: TweetDataValueHydrator =
TweetHydration.setOnTweetAndCachedTweet(
TweetLenses.escherbirdEntityAnnotations,
(td: TweetData, _: TweetQuery.Options) => td.tweet,
hydrateEscherbirdAnnotations
)
val scrubEngagements: TweetDataValueHydrator =
TweetHydration.setOnTweetData(
TweetData.Lenses.tweetCounts,
(td: TweetData, _: TweetQuery.Options) => ScrubEngagementHydrator.Ctx(td.suppress),
hydrateScrubEngagements
)
/**
* This is where we wire up all the separate hydrators into a single [[TweetDataValueHydrator]].
*
* Each hydrator here is either a [[TweetDataValueHydrator]] or a [[TweetDataEditHydrator]].
* We use [[EditHydrator]]s for anything that needs to run in parallel ([[ValueHydrator]]s can
* only be run in sequence).
*/
ValueHydrator.inSequence(
// Hydrate FeatureSwitchResults first, so they can be used by other hydrators if needed
hydrateFeatureSwitchResults,
EditHydrator
.inParallel(
ValueHydrator
.inSequence(
// The result of running these hydrators is saved as `cacheableTweetResult` and
// written back to cache via `cacheChangesEffect` in `hydrateRepo`
TweetHydration.captureCacheableTweetResult(
hydrateCacheableFields
),
// Uncacheable hydrators that depend only on the cacheable fields
hydrateUncacheableMedia,
// clean-up partially hydrated entities before any of the hydrators that look at
// url and media entities run, so that they never see bad entities.
hydratePostCacheRepairs,
// These hydrators are all dependent on each other and/or the previous hydrators
hydrateDependentUncacheableFields,
// Sets `hasMedia`. Comes after PastedMediaHydrator in order to include pasted
// pics as well as other media & urls.
hydrateHasMediaToTweetData
)
.toEditHydrator,
// These hydrators do not rely on any other hydrators and so can be run in parallel
// with the above hydrators (and with each other)
hydrateIndependentUncacheableFields
)
.toValueHydrator,
// Depends on reported perspectival having been hydrated in PerspectiveHydrator
hydrateReportedTweetVisibilityToTweetData,
// Remove superfluous urls entities when there is a corresponding MediaEntity for the same url
scrubSuperfluousUrlEntities,
// The copyFromSourceTweet hydrator needs to be located after the hydrators that produce the
// fields to copy. It must be located after PartialEntityCleaner (part of postCacheRepairs),
// which removes failed MediaEntities. It also depends on takedownCountryCodes having been
// hydrated in TakedownHydrator.
copyFromSourceTweet,
// depends on AdditionalFieldsHydrator and CopyFromSourceTweet to copy safety labels
hydrateTweetVisibilityToTweetData,
// for IPI'd tweets, we want to disable tweet engagement counts from being returned
// StatusCounts for replyCount, retweetCount.
// scrubEngagements hydrator must come after tweet visibility hydrator.
// tweet visibility hydrator emits the suppressed FilteredState needed for scrubbing.
scrubEngagements,
// this hydrator runs when writing the current tweet
// Escherbird comes last in order to consume a tweet that's as close as possible
// to the tweet written to tweet_events
hydrateEscherbirdAnnotationsToTweetAndCachedTweet
.onlyIf((td, opts) => opts.cause.writing(td.tweet.id)),
// Add an ellipsis to the end of the text for a Tweet that has a NoteTweet associated.
// This is so that the Tweet is displayed on the home timeline with an ellipsis, letting
// the User know that there's more to see.
hydrateNoteTweetSuffix,
/**
* Post-cache repair of QT text and entities to support rendering on all clients
* Moving this to end of the pipeline to avoid/minimize chance of following hydrators
* depending on modified tweet text or entities.
* When we start persisting shortUrl in MH - permalink won't be empty. therefore,
* we won't run QuotedTweetRefHydrator and just hydrate expanded and display
* using QuotedTweetRefUrlsHydrator. We will use hydrated permalink to repair
* QT text and entities for non-upgraded clients in this step.
* */
hydrateTweetLegacyFormat
)
}
/**
* Returns a new hydrator that takes the produced result, and captures the result value
* in the `cacheableTweetResult` field of the enclosed `TweetData`.
*/
def captureCacheableTweetResult(h: TweetDataValueHydrator): TweetDataValueHydrator =
ValueHydrator[TweetData, TweetQuery.Options] { (td, opts) =>
h(td, opts).map { v =>
// In addition to saving off a copy of ValueState, make sure that the TweetData inside
// the ValueState has its "completedHydrations" set to the ValueState.HydrationStates's
// completedHydrations. This is used when converting to a CachedTweet.
v.map { td =>
td.copy(
cacheableTweetResult = Some(v.map(_.addHydrated(v.state.completedHydrations)))
)
}
}
}
/**
* Takes a ValueHydrator and a Lens and returns a `TweetDataValueHydrator` that does three things:
*
* 1. Runs the ValueHydrator on the lensed value
* 2. Saves the result back to the main tweet using the lens
* 3. Saves the result back to the tweet in cacheableTweetResult using the lens
*/
def setOnTweetAndCachedTweet[A, C](
l: Lens[Tweet, A],
mkCtx: (TweetData, TweetQuery.Options) => C,
h: ValueHydrator[A, C]
): TweetDataValueHydrator = {
// A lens that goes from TweetData -> tweet -> l
val tweetDataLens = TweetData.Lenses.tweet.andThen(l)
// A lens that goes from TweetData -> cacheableTweetResult -> tweet -> l
val cachedTweetLens =
TweetLenses
.requireSome(TweetData.Lenses.cacheableTweetResult)
.andThen(TweetResult.Lenses.tweet)
.andThen(l)
ValueHydrator[TweetData, TweetQuery.Options] { (td, opts) =>
h.run(tweetDataLens.get(td), mkCtx(td, opts)).map { r =>
if (r.state.isEmpty) {
ValueState.unmodified(td)
} else {
r.map { v => Lens.setAll(td, tweetDataLens -> v, cachedTweetLens -> v) }
}
}
}
}
/**
* Creates a `TweetDataValueHydrator` that hydrates a lensed value, overwriting
* the existing value.
*/
def setOnTweetData[A, C](
lens: Lens[TweetData, A],
mkCtx: (TweetData, TweetQuery.Options) => C,
h: ValueHydrator[A, C]
): TweetDataValueHydrator =
ValueHydrator[TweetData, TweetQuery.Options] { (td, opts) =>
h.run(lens.get(td), mkCtx(td, opts)).map { r =>
if (r.state.isEmpty) ValueState.unmodified(td) else r.map(lens.set(td, _))
}
}
/**
* Produces an [[Effect]] that can be applied to a [[TweetDataValueHydrator]] to write updated
* values back to cache.
*/
def cacheChanges(
cache: LockingCache[TweetId, Cached[TweetData]],
stats: StatsReceiver
): Effect[ValueState[TweetData]] = {
val updatedCounter = stats.counter("updated")
val unchangedCounter = stats.counter("unchanged")
val picker = new TweetRepoCachePicker[TweetData](_.cachedAt)
val cacheErrorCounter = stats.counter("cache_error")
val missingCacheableResultCounter = stats.counter("missing_cacheable_result")
Effect[TweetResult] { result =>
// cacheErrorEncountered will never be set on `cacheableTweetResult`, so we need to
// look at the outer tweet state.
val cacheErrorEncountered = result.state.cacheErrorEncountered
result.value.cacheableTweetResult match {
case Some(ValueState(td, state)) if state.modified && !cacheErrorEncountered =>
val tweetData = td.addHydrated(state.completedHydrations)
val now = Time.now
val cached = Cached(Some(tweetData), CachedValueStatus.Found, now, Some(now))
val handler = LockingCache.PickingHandler(cached, picker)
updatedCounter.incr()
cache.lockAndSet(tweetData.tweet.id, handler)
case Some(ValueState(_, _)) if cacheErrorEncountered =>
cacheErrorCounter.incr()
case None =>
missingCacheableResultCounter.incr()
case _ =>
unchangedCounter.incr()
}
}
}
/**
* Wraps a hydrator with a check such that it only executes the hydrator if `queryFilter`
* returns true for the `TweetQuery.Option` in the `Ctx` value, and the specified
* `HydrationType` is not already marked as having been completed in
* `ctx.tweetData.completedHydrations`. If these conditions pass, and the underlying
* hydrator is executed, and the result does not contain a field-level or total failure,
* then the resulting `HydrationState` is updated to indicate that the specified
* `HydrationType` has been completed.
*/
def completeOnlyOnce[A, C <: TweetCtx](
queryFilter: TweetQuery.Options => Boolean = _ => true,
hydrationType: HydrationType,
dependsOn: Set[HydrationType] = Set.empty,
hydrator: ValueHydrator[A, C]
): ValueHydrator[A, C] = {
val completedState = HydrationState.modified(hydrationType)
ValueHydrator[A, C] { (a, ctx) =>
hydrator(a, ctx).map { res =>
if (res.state.failedFields.isEmpty &&
dependsOn.forall(ctx.completedHydrations.contains)) {
// successful result!
if (!ctx.completedHydrations.contains(hydrationType)) {
res.copy(state = res.state ++ completedState)
} else {
// forced rehydration - don't add hydrationType or change modified flag
res
}
} else {
// hydration failed or not all dependencies satisfied so don't mark as complete
res
}
}
}.onlyIf { (a, ctx) =>
queryFilter(ctx.opts) &&
(!ctx.completedHydrations.contains(hydrationType))
}
}
/**
* Applies a `TweetDataValueHydrator` to a `TweetRepository.Type`-typed repository.
* The incoming `TweetQuery.Options` are first expanded using `optionsExpander`, and the
* resulting options passed to `repo` and `hydrator`. The resulting tweet result
* objects are passed to `cacheChangesEffect` for possible write-back to cache. Finally,
* the tweets are scrubbed according to the original input `TweetQuery.Options`.
*/
def hydrateRepo(
hydrator: TweetDataValueHydrator,
cacheChangesEffect: Effect[TweetResult],
optionsExpander: TweetQueryOptionsExpander.Type
)(
repo: TweetResultRepository.Type
): TweetResultRepository.Type =
(tweetId: TweetId, originalOpts: TweetQuery.Options) => {
val expandedOpts = optionsExpander(originalOpts)
for {
repoResult <- repo(tweetId, expandedOpts)
hydratorResult <- hydrator(repoResult.value, expandedOpts)
} yield {
val hydratingRepoResult =
TweetResult(hydratorResult.value, repoResult.state ++ hydratorResult.state)
if (originalOpts.cacheControl.writeToCache) {
cacheChangesEffect(hydratingRepoResult)
}
UnrequestedFieldScrubber(originalOpts).scrub(hydratingRepoResult)
}
}
/**
* A trivial wrapper around a Stitch[_] to provide a `joinWith`
* method that lets us choose the precedence of exceptions.
*
* This wrapper is useful for the case in which it's important that
* we specify which of the two exceptions wins (such as visibility
* filtering).
*
* Since this is an [[AnyVal]], using this is no more expensive than
* inlining the joinWith method.
*/
// exposed for testing
case class StitchExceptionPrecedence[A](toStitch: Stitch[A]) extends AnyVal {
/**
* Concurrently evaluate two Stitch[_] values. This is different
* from Stitch.join in that any exception from the expression on
* the left hand side will take precedence over an exception on
* the right hand side. This means that an exception from the
* right-hand side will not short-circuit evaluation, but an
* exception on the left-hand side *will* short-circuit. This is
* desirable because it allows us to return the failure with as
* little latency as possible. (Compare to lifting *both* to Try,
* which would force us to wait for both computations to complete
* before returning, even if the one with the higher precedence is
* already known to be an exception.)
*/
def joinWith[B, C](rhs: Stitch[B])(f: (A, B) => C): StitchExceptionPrecedence[C] =
StitchExceptionPrecedence {
Stitch
.joinMap(toStitch, rhs.liftToTry) { (a, tryB) => tryB.map(b => f(a, b)) }
.lowerFromTry
}
}
}