580 lines
23 KiB
Scala
580 lines
23 KiB
Scala
package com.twitter.timelines.prediction.common.aggregates
|
|
|
|
import com.twitter.conversions.DurationOps._
|
|
import com.twitter.ml.api.constant.SharedFeatures.AUTHOR_ID
|
|
import com.twitter.ml.api.constant.SharedFeatures.USER_ID
|
|
import com.twitter.timelines.data_processing.ml_util.aggregation_framework._
|
|
import com.twitter.timelines.data_processing.ml_util.aggregation_framework.metrics._
|
|
import com.twitter.timelines.data_processing.ml_util.transforms.DownsampleTransform
|
|
import com.twitter.timelines.data_processing.ml_util.transforms.RichRemoveAuthorIdZero
|
|
import com.twitter.timelines.data_processing.ml_util.transforms.RichRemoveUserIdZero
|
|
import com.twitter.timelines.prediction.features.common.TimelinesSharedFeatures
|
|
import com.twitter.timelines.prediction.features.engagement_features.EngagementDataRecordFeatures
|
|
import com.twitter.timelines.prediction.features.engagement_features.EngagementDataRecordFeatures.RichUnifyPublicEngagersTransform
|
|
import com.twitter.timelines.prediction.features.list_features.ListFeatures
|
|
import com.twitter.timelines.prediction.features.recap.RecapFeatures
|
|
import com.twitter.timelines.prediction.features.request_context.RequestContextFeatures
|
|
import com.twitter.timelines.prediction.features.semantic_core_features.SemanticCoreFeatures
|
|
import com.twitter.timelines.prediction.transform.filter.FilterInNetworkTransform
|
|
import com.twitter.timelines.prediction.transform.filter.FilterImageTweetTransform
|
|
import com.twitter.timelines.prediction.transform.filter.FilterVideoTweetTransform
|
|
import com.twitter.timelines.prediction.transform.filter.FilterOutImageVideoTweetTransform
|
|
import com.twitter.util.Duration
|
|
|
|
trait TimelinesAggregationConfigDetails extends Serializable {
|
|
|
|
import TimelinesAggregationSources._
|
|
|
|
def outputHdfsPath: String
|
|
|
|
/**
|
|
* Converts the given logical store to a physical store. The reason we do not specify the
|
|
* physical store directly with the [[AggregateGroup]] is because of a cyclic dependency when
|
|
* create physical stores that are DalDataset with PersonalDataType annotations derived from
|
|
* the [[AggregateGroup]].
|
|
*
|
|
*/
|
|
def mkPhysicalStore(store: AggregateStore): AggregateStore
|
|
|
|
def defaultMaxKvSourceFailures: Int = 100
|
|
|
|
val timelinesOfflineAggregateSink = new OfflineStoreCommonConfig {
|
|
override def apply(startDate: String) = OfflineAggregateStoreCommonConfig(
|
|
outputHdfsPathPrefix = outputHdfsPath,
|
|
dummyAppId = "timelines_aggregates_v2_ro",
|
|
dummyDatasetPrefix = "timelines_aggregates_v2_ro",
|
|
startDate = startDate
|
|
)
|
|
}
|
|
|
|
val UserAggregateStore = "user_aggregates"
|
|
val UserAuthorAggregateStore = "user_author_aggregates"
|
|
val UserOriginalAuthorAggregateStore = "user_original_author_aggregates"
|
|
val OriginalAuthorAggregateStore = "original_author_aggregates"
|
|
val UserEngagerAggregateStore = "user_engager_aggregates"
|
|
val UserMentionAggregateStore = "user_mention_aggregates"
|
|
val TwitterWideUserAggregateStore = "twitter_wide_user_aggregates"
|
|
val TwitterWideUserAuthorAggregateStore = "twitter_wide_user_author_aggregates"
|
|
val UserRequestHourAggregateStore = "user_request_hour_aggregates"
|
|
val UserRequestDowAggregateStore = "user_request_dow_aggregates"
|
|
val UserListAggregateStore = "user_list_aggregates"
|
|
val AuthorTopicAggregateStore = "author_topic_aggregates"
|
|
val UserTopicAggregateStore = "user_topic_aggregates"
|
|
val UserInferredTopicAggregateStore = "user_inferred_topic_aggregates"
|
|
val UserMediaUnderstandingAnnotationAggregateStore =
|
|
"user_media_understanding_annotation_aggregates"
|
|
val AuthorCountryCodeAggregateStore = "author_country_code_aggregates"
|
|
val OriginalAuthorCountryCodeAggregateStore = "original_author_country_code_aggregates"
|
|
|
|
/**
|
|
* Step 3: Configure all aggregates to compute.
|
|
* Note that different subsets of aggregates in this list
|
|
* can be launched by different summingbird job instances.
|
|
* Any given job can be responsible for a set of AggregateGroup
|
|
* configs whose outputStores share the same exact startDate.
|
|
* AggregateGroups that do not share the same inputSource,
|
|
* outputStore or startDate MUST be launched using different
|
|
* summingbird jobs and passed in a different --start-time argument
|
|
* See science/scalding/mesos/timelines/prod.yaml for an example
|
|
* of how to configure your own job.
|
|
*/
|
|
val negativeDownsampleTransform =
|
|
DownsampleTransform(
|
|
negativeSamplingRate = 0.03,
|
|
keepLabels = RecapUserFeatureAggregation.LabelsV2)
|
|
val negativeRecTweetDownsampleTransform = DownsampleTransform(
|
|
negativeSamplingRate = 0.03,
|
|
keepLabels = RectweetUserFeatureAggregation.RectweetLabelsForAggregation
|
|
)
|
|
|
|
val userAggregatesV2: AggregateGroup =
|
|
AggregateGroup(
|
|
inputSource = timelinesDailyRecapMinimalSource,
|
|
aggregatePrefix = "user_aggregate_v2",
|
|
preTransforms = Seq(RichRemoveUserIdZero), /* Eliminates reducer skew */
|
|
keys = Set(USER_ID),
|
|
features = RecapUserFeatureAggregation.UserFeaturesV2,
|
|
labels = RecapUserFeatureAggregation.LabelsV2,
|
|
metrics = Set(CountMetric, SumMetric),
|
|
halfLives = Set(50.days),
|
|
outputStore = mkPhysicalStore(
|
|
OfflineAggregateDataRecordStore(
|
|
name = UserAggregateStore,
|
|
startDate = "2016-07-15 00:00",
|
|
commonConfig = timelinesOfflineAggregateSink,
|
|
maxKvSourceFailures = defaultMaxKvSourceFailures
|
|
))
|
|
)
|
|
|
|
val userAuthorAggregatesV2: Set[AggregateGroup] = {
|
|
|
|
/**
|
|
* NOTE: We need to remove records from out-of-network authors from the recap input
|
|
* records (which now include out-of-network records as well after merging recap and
|
|
* rectweet models) that are used to compute user-author aggregates. This is necessary
|
|
* to limit the growth rate of user-author aggregates.
|
|
*/
|
|
val allFeatureAggregates = Set(
|
|
AggregateGroup(
|
|
inputSource = timelinesDailyRecapMinimalSource,
|
|
aggregatePrefix = "user_author_aggregate_v2",
|
|
preTransforms = Seq(FilterInNetworkTransform, RichRemoveUserIdZero),
|
|
keys = Set(USER_ID, AUTHOR_ID),
|
|
features = RecapUserFeatureAggregation.UserAuthorFeaturesV2,
|
|
labels = RecapUserFeatureAggregation.LabelsV2,
|
|
metrics = Set(SumMetric),
|
|
halfLives = Set(50.days),
|
|
outputStore = mkPhysicalStore(
|
|
OfflineAggregateDataRecordStore(
|
|
name = UserAuthorAggregateStore,
|
|
startDate = "2016-07-15 00:00",
|
|
commonConfig = timelinesOfflineAggregateSink,
|
|
maxKvSourceFailures = defaultMaxKvSourceFailures
|
|
))
|
|
)
|
|
)
|
|
|
|
val countAggregates: Set[AggregateGroup] = Set(
|
|
AggregateGroup(
|
|
inputSource = timelinesDailyRecapMinimalSource,
|
|
aggregatePrefix = "user_author_aggregate_v2",
|
|
preTransforms = Seq(FilterInNetworkTransform, RichRemoveUserIdZero),
|
|
keys = Set(USER_ID, AUTHOR_ID),
|
|
features = RecapUserFeatureAggregation.UserAuthorFeaturesV2Count,
|
|
labels = RecapUserFeatureAggregation.LabelsV2,
|
|
metrics = Set(CountMetric),
|
|
halfLives = Set(50.days),
|
|
outputStore = mkPhysicalStore(
|
|
OfflineAggregateDataRecordStore(
|
|
name = UserAuthorAggregateStore,
|
|
startDate = "2016-07-15 00:00",
|
|
commonConfig = timelinesOfflineAggregateSink,
|
|
maxKvSourceFailures = defaultMaxKvSourceFailures
|
|
))
|
|
)
|
|
)
|
|
|
|
allFeatureAggregates ++ countAggregates
|
|
}
|
|
|
|
val userAggregatesV5Continuous: AggregateGroup =
|
|
AggregateGroup(
|
|
inputSource = timelinesDailyRecapMinimalSource,
|
|
aggregatePrefix = "user_aggregate_v5.continuous",
|
|
preTransforms = Seq(RichRemoveUserIdZero),
|
|
keys = Set(USER_ID),
|
|
features = RecapUserFeatureAggregation.UserFeaturesV5Continuous,
|
|
labels = RecapUserFeatureAggregation.LabelsV2,
|
|
metrics = Set(CountMetric, SumMetric, SumSqMetric),
|
|
halfLives = Set(50.days),
|
|
outputStore = mkPhysicalStore(
|
|
OfflineAggregateDataRecordStore(
|
|
name = UserAggregateStore,
|
|
startDate = "2016-07-15 00:00",
|
|
commonConfig = timelinesOfflineAggregateSink,
|
|
maxKvSourceFailures = defaultMaxKvSourceFailures
|
|
))
|
|
)
|
|
|
|
val userAuthorAggregatesV5: AggregateGroup =
|
|
AggregateGroup(
|
|
inputSource = timelinesDailyRecapMinimalSource,
|
|
aggregatePrefix = "user_author_aggregate_v5",
|
|
preTransforms = Seq(FilterInNetworkTransform, RichRemoveUserIdZero),
|
|
keys = Set(USER_ID, AUTHOR_ID),
|
|
features = RecapUserFeatureAggregation.UserAuthorFeaturesV5,
|
|
labels = RecapUserFeatureAggregation.LabelsV2,
|
|
metrics = Set(CountMetric),
|
|
halfLives = Set(50.days),
|
|
outputStore = mkPhysicalStore(
|
|
OfflineAggregateDataRecordStore(
|
|
name = UserAuthorAggregateStore,
|
|
startDate = "2016-07-15 00:00",
|
|
commonConfig = timelinesOfflineAggregateSink,
|
|
maxKvSourceFailures = defaultMaxKvSourceFailures
|
|
))
|
|
)
|
|
|
|
val tweetSourceUserAuthorAggregatesV1: AggregateGroup =
|
|
AggregateGroup(
|
|
inputSource = timelinesDailyRecapMinimalSource,
|
|
aggregatePrefix = "user_author_aggregate_tweetsource_v1",
|
|
preTransforms = Seq(FilterInNetworkTransform, RichRemoveUserIdZero),
|
|
keys = Set(USER_ID, AUTHOR_ID),
|
|
features = RecapUserFeatureAggregation.UserAuthorTweetSourceFeaturesV1,
|
|
labels = RecapUserFeatureAggregation.LabelsV2,
|
|
metrics = Set(CountMetric, SumMetric),
|
|
halfLives = Set(50.days),
|
|
outputStore = mkPhysicalStore(
|
|
OfflineAggregateDataRecordStore(
|
|
name = UserAuthorAggregateStore,
|
|
startDate = "2016-07-15 00:00",
|
|
commonConfig = timelinesOfflineAggregateSink,
|
|
maxKvSourceFailures = defaultMaxKvSourceFailures
|
|
))
|
|
)
|
|
|
|
val userEngagerAggregates = AggregateGroup(
|
|
inputSource = timelinesDailyRecapMinimalSource,
|
|
aggregatePrefix = "user_engager_aggregate",
|
|
keys = Set(USER_ID, EngagementDataRecordFeatures.PublicEngagementUserIds),
|
|
features = Set.empty,
|
|
labels = RecapUserFeatureAggregation.LabelsV2,
|
|
metrics = Set(CountMetric),
|
|
halfLives = Set(50.days),
|
|
outputStore = mkPhysicalStore(
|
|
OfflineAggregateDataRecordStore(
|
|
name = UserEngagerAggregateStore,
|
|
startDate = "2016-09-02 00:00",
|
|
commonConfig = timelinesOfflineAggregateSink,
|
|
maxKvSourceFailures = defaultMaxKvSourceFailures
|
|
)),
|
|
preTransforms = Seq(
|
|
RichRemoveUserIdZero,
|
|
RichUnifyPublicEngagersTransform
|
|
)
|
|
)
|
|
|
|
val userMentionAggregates = AggregateGroup(
|
|
inputSource = timelinesDailyRecapMinimalSource,
|
|
preTransforms = Seq(RichRemoveUserIdZero), /* Eliminates reducer skew */
|
|
aggregatePrefix = "user_mention_aggregate",
|
|
keys = Set(USER_ID, RecapFeatures.MENTIONED_SCREEN_NAMES),
|
|
features = Set.empty,
|
|
labels = RecapUserFeatureAggregation.LabelsV2,
|
|
metrics = Set(CountMetric),
|
|
halfLives = Set(50.days),
|
|
outputStore = mkPhysicalStore(
|
|
OfflineAggregateDataRecordStore(
|
|
name = UserMentionAggregateStore,
|
|
startDate = "2017-03-01 00:00",
|
|
commonConfig = timelinesOfflineAggregateSink,
|
|
maxKvSourceFailures = defaultMaxKvSourceFailures
|
|
)),
|
|
includeAnyLabel = false
|
|
)
|
|
|
|
val twitterWideUserAggregates = AggregateGroup(
|
|
inputSource = timelinesDailyTwitterWideSource,
|
|
preTransforms = Seq(RichRemoveUserIdZero), /* Eliminates reducer skew */
|
|
aggregatePrefix = "twitter_wide_user_aggregate",
|
|
keys = Set(USER_ID),
|
|
features = RecapUserFeatureAggregation.TwitterWideFeatures,
|
|
labels = RecapUserFeatureAggregation.TwitterWideLabels,
|
|
metrics = Set(CountMetric, SumMetric),
|
|
halfLives = Set(50.days),
|
|
outputStore = mkPhysicalStore(
|
|
OfflineAggregateDataRecordStore(
|
|
name = TwitterWideUserAggregateStore,
|
|
startDate = "2016-12-28 00:00",
|
|
commonConfig = timelinesOfflineAggregateSink,
|
|
maxKvSourceFailures = defaultMaxKvSourceFailures
|
|
))
|
|
)
|
|
|
|
val twitterWideUserAuthorAggregates = AggregateGroup(
|
|
inputSource = timelinesDailyTwitterWideSource,
|
|
preTransforms = Seq(RichRemoveUserIdZero), /* Eliminates reducer skew */
|
|
aggregatePrefix = "twitter_wide_user_author_aggregate",
|
|
keys = Set(USER_ID, AUTHOR_ID),
|
|
features = RecapUserFeatureAggregation.TwitterWideFeatures,
|
|
labels = RecapUserFeatureAggregation.TwitterWideLabels,
|
|
metrics = Set(CountMetric),
|
|
halfLives = Set(50.days),
|
|
outputStore = mkPhysicalStore(
|
|
OfflineAggregateDataRecordStore(
|
|
name = TwitterWideUserAuthorAggregateStore,
|
|
startDate = "2016-12-28 00:00",
|
|
commonConfig = timelinesOfflineAggregateSink,
|
|
maxKvSourceFailures = defaultMaxKvSourceFailures
|
|
)),
|
|
includeAnyLabel = false
|
|
)
|
|
|
|
/**
|
|
* User-HourOfDay and User-DayOfWeek aggregations, both for recap and rectweet
|
|
*/
|
|
val userRequestHourAggregates = AggregateGroup(
|
|
inputSource = timelinesDailyRecapMinimalSource,
|
|
aggregatePrefix = "user_request_context_aggregate.hour",
|
|
preTransforms = Seq(RichRemoveUserIdZero, negativeDownsampleTransform),
|
|
keys = Set(USER_ID, RequestContextFeatures.TIMESTAMP_GMT_HOUR),
|
|
features = Set.empty,
|
|
labels = RecapUserFeatureAggregation.LabelsV2,
|
|
metrics = Set(CountMetric),
|
|
halfLives = Set(50.days),
|
|
outputStore = mkPhysicalStore(
|
|
OfflineAggregateDataRecordStore(
|
|
name = UserRequestHourAggregateStore,
|
|
startDate = "2017-08-01 00:00",
|
|
commonConfig = timelinesOfflineAggregateSink,
|
|
maxKvSourceFailures = defaultMaxKvSourceFailures
|
|
))
|
|
)
|
|
|
|
val userRequestDowAggregates = AggregateGroup(
|
|
inputSource = timelinesDailyRecapMinimalSource,
|
|
aggregatePrefix = "user_request_context_aggregate.dow",
|
|
preTransforms = Seq(RichRemoveUserIdZero, negativeDownsampleTransform),
|
|
keys = Set(USER_ID, RequestContextFeatures.TIMESTAMP_GMT_DOW),
|
|
features = Set.empty,
|
|
labels = RecapUserFeatureAggregation.LabelsV2,
|
|
metrics = Set(CountMetric),
|
|
halfLives = Set(50.days),
|
|
outputStore = mkPhysicalStore(
|
|
OfflineAggregateDataRecordStore(
|
|
name = UserRequestDowAggregateStore,
|
|
startDate = "2017-08-01 00:00",
|
|
commonConfig = timelinesOfflineAggregateSink,
|
|
maxKvSourceFailures = defaultMaxKvSourceFailures
|
|
))
|
|
)
|
|
|
|
val authorTopicAggregates = AggregateGroup(
|
|
inputSource = timelinesDailyRecapMinimalSource,
|
|
aggregatePrefix = "author_topic_aggregate",
|
|
preTransforms = Seq(RichRemoveUserIdZero),
|
|
keys = Set(AUTHOR_ID, TimelinesSharedFeatures.TOPIC_ID),
|
|
features = Set.empty,
|
|
labels = RecapUserFeatureAggregation.LabelsV2,
|
|
metrics = Set(CountMetric),
|
|
halfLives = Set(50.days),
|
|
outputStore = mkPhysicalStore(
|
|
OfflineAggregateDataRecordStore(
|
|
name = AuthorTopicAggregateStore,
|
|
startDate = "2020-05-19 00:00",
|
|
commonConfig = timelinesOfflineAggregateSink,
|
|
maxKvSourceFailures = defaultMaxKvSourceFailures
|
|
))
|
|
)
|
|
|
|
val userTopicAggregates = AggregateGroup(
|
|
inputSource = timelinesDailyRecapMinimalSource,
|
|
aggregatePrefix = "user_topic_aggregate",
|
|
preTransforms = Seq(RichRemoveUserIdZero),
|
|
keys = Set(USER_ID, TimelinesSharedFeatures.TOPIC_ID),
|
|
features = Set.empty,
|
|
labels = RecapUserFeatureAggregation.LabelsV2,
|
|
metrics = Set(CountMetric),
|
|
halfLives = Set(50.days),
|
|
outputStore = mkPhysicalStore(
|
|
OfflineAggregateDataRecordStore(
|
|
name = UserTopicAggregateStore,
|
|
startDate = "2020-05-23 00:00",
|
|
commonConfig = timelinesOfflineAggregateSink,
|
|
maxKvSourceFailures = defaultMaxKvSourceFailures
|
|
))
|
|
)
|
|
|
|
val userTopicAggregatesV2 = AggregateGroup(
|
|
inputSource = timelinesDailyRecapMinimalSource,
|
|
aggregatePrefix = "user_topic_aggregate_v2",
|
|
preTransforms = Seq(RichRemoveUserIdZero),
|
|
keys = Set(USER_ID, TimelinesSharedFeatures.TOPIC_ID),
|
|
features = RecapUserFeatureAggregation.UserTopicFeaturesV2Count,
|
|
labels = RecapUserFeatureAggregation.LabelsV2,
|
|
includeAnyFeature = false,
|
|
includeAnyLabel = false,
|
|
metrics = Set(CountMetric),
|
|
halfLives = Set(50.days),
|
|
outputStore = mkPhysicalStore(
|
|
OfflineAggregateDataRecordStore(
|
|
name = UserTopicAggregateStore,
|
|
startDate = "2020-05-23 00:00",
|
|
commonConfig = timelinesOfflineAggregateSink,
|
|
maxKvSourceFailures = defaultMaxKvSourceFailures
|
|
))
|
|
)
|
|
|
|
val userInferredTopicAggregates = AggregateGroup(
|
|
inputSource = timelinesDailyRecapMinimalSource,
|
|
aggregatePrefix = "user_inferred_topic_aggregate",
|
|
preTransforms = Seq(RichRemoveUserIdZero),
|
|
keys = Set(USER_ID, TimelinesSharedFeatures.INFERRED_TOPIC_IDS),
|
|
features = Set.empty,
|
|
labels = RecapUserFeatureAggregation.LabelsV2,
|
|
metrics = Set(CountMetric),
|
|
halfLives = Set(50.days),
|
|
outputStore = mkPhysicalStore(
|
|
OfflineAggregateDataRecordStore(
|
|
name = UserInferredTopicAggregateStore,
|
|
startDate = "2020-09-09 00:00",
|
|
commonConfig = timelinesOfflineAggregateSink,
|
|
maxKvSourceFailures = defaultMaxKvSourceFailures
|
|
))
|
|
)
|
|
|
|
val userInferredTopicAggregatesV2 = AggregateGroup(
|
|
inputSource = timelinesDailyRecapMinimalSource,
|
|
aggregatePrefix = "user_inferred_topic_aggregate_v2",
|
|
preTransforms = Seq(RichRemoveUserIdZero),
|
|
keys = Set(USER_ID, TimelinesSharedFeatures.INFERRED_TOPIC_IDS),
|
|
features = RecapUserFeatureAggregation.UserTopicFeaturesV2Count,
|
|
labels = RecapUserFeatureAggregation.LabelsV2,
|
|
includeAnyFeature = false,
|
|
includeAnyLabel = false,
|
|
metrics = Set(CountMetric),
|
|
halfLives = Set(50.days),
|
|
outputStore = mkPhysicalStore(
|
|
OfflineAggregateDataRecordStore(
|
|
name = UserInferredTopicAggregateStore,
|
|
startDate = "2020-09-09 00:00",
|
|
commonConfig = timelinesOfflineAggregateSink,
|
|
maxKvSourceFailures = defaultMaxKvSourceFailures
|
|
))
|
|
)
|
|
|
|
val userReciprocalEngagementAggregates = AggregateGroup(
|
|
inputSource = timelinesDailyRecapMinimalSource,
|
|
aggregatePrefix = "user_aggregate_v6",
|
|
preTransforms = Seq(RichRemoveUserIdZero),
|
|
keys = Set(USER_ID),
|
|
features = Set.empty,
|
|
labels = RecapUserFeatureAggregation.ReciprocalLabels,
|
|
metrics = Set(CountMetric),
|
|
halfLives = Set(50.days),
|
|
outputStore = mkPhysicalStore(
|
|
OfflineAggregateDataRecordStore(
|
|
name = UserAggregateStore,
|
|
startDate = "2016-07-15 00:00",
|
|
commonConfig = timelinesOfflineAggregateSink,
|
|
maxKvSourceFailures = defaultMaxKvSourceFailures
|
|
)),
|
|
includeAnyLabel = false
|
|
)
|
|
|
|
val userOriginalAuthorReciprocalEngagementAggregates = AggregateGroup(
|
|
inputSource = timelinesDailyRecapMinimalSource,
|
|
aggregatePrefix = "user_original_author_aggregate_v1",
|
|
preTransforms = Seq(RichRemoveUserIdZero, RichRemoveAuthorIdZero),
|
|
keys = Set(USER_ID, TimelinesSharedFeatures.ORIGINAL_AUTHOR_ID),
|
|
features = Set.empty,
|
|
labels = RecapUserFeatureAggregation.ReciprocalLabels,
|
|
metrics = Set(CountMetric),
|
|
halfLives = Set(50.days),
|
|
outputStore = mkPhysicalStore(
|
|
OfflineAggregateDataRecordStore(
|
|
name = UserOriginalAuthorAggregateStore,
|
|
startDate = "2018-12-26 00:00",
|
|
commonConfig = timelinesOfflineAggregateSink,
|
|
maxKvSourceFailures = defaultMaxKvSourceFailures
|
|
)),
|
|
includeAnyLabel = false
|
|
)
|
|
|
|
val originalAuthorReciprocalEngagementAggregates = AggregateGroup(
|
|
inputSource = timelinesDailyRecapMinimalSource,
|
|
aggregatePrefix = "original_author_aggregate_v1",
|
|
preTransforms = Seq(RichRemoveUserIdZero, RichRemoveAuthorIdZero),
|
|
keys = Set(TimelinesSharedFeatures.ORIGINAL_AUTHOR_ID),
|
|
features = Set.empty,
|
|
labels = RecapUserFeatureAggregation.ReciprocalLabels,
|
|
metrics = Set(CountMetric),
|
|
halfLives = Set(50.days),
|
|
outputStore = mkPhysicalStore(
|
|
OfflineAggregateDataRecordStore(
|
|
name = OriginalAuthorAggregateStore,
|
|
startDate = "2023-02-25 00:00",
|
|
commonConfig = timelinesOfflineAggregateSink,
|
|
maxKvSourceFailures = defaultMaxKvSourceFailures
|
|
)),
|
|
includeAnyLabel = false
|
|
)
|
|
|
|
val originalAuthorNegativeEngagementAggregates = AggregateGroup(
|
|
inputSource = timelinesDailyRecapMinimalSource,
|
|
aggregatePrefix = "original_author_aggregate_v2",
|
|
preTransforms = Seq(RichRemoveUserIdZero, RichRemoveAuthorIdZero),
|
|
keys = Set(TimelinesSharedFeatures.ORIGINAL_AUTHOR_ID),
|
|
features = Set.empty,
|
|
labels = RecapUserFeatureAggregation.NegativeEngagementLabels,
|
|
metrics = Set(CountMetric),
|
|
halfLives = Set(50.days),
|
|
outputStore = mkPhysicalStore(
|
|
OfflineAggregateDataRecordStore(
|
|
name = OriginalAuthorAggregateStore,
|
|
startDate = "2023-02-25 00:00",
|
|
commonConfig = timelinesOfflineAggregateSink,
|
|
maxKvSourceFailures = defaultMaxKvSourceFailures
|
|
)),
|
|
includeAnyLabel = false
|
|
)
|
|
|
|
val userListAggregates: AggregateGroup =
|
|
AggregateGroup(
|
|
inputSource = timelinesDailyRecapMinimalSource,
|
|
aggregatePrefix = "user_list_aggregate",
|
|
keys = Set(USER_ID, ListFeatures.LIST_ID),
|
|
features = Set.empty,
|
|
labels = RecapUserFeatureAggregation.LabelsV2,
|
|
metrics = Set(CountMetric),
|
|
halfLives = Set(50.days),
|
|
outputStore = mkPhysicalStore(
|
|
OfflineAggregateDataRecordStore(
|
|
name = UserListAggregateStore,
|
|
startDate = "2020-05-28 00:00",
|
|
commonConfig = timelinesOfflineAggregateSink,
|
|
maxKvSourceFailures = defaultMaxKvSourceFailures
|
|
)),
|
|
preTransforms = Seq(RichRemoveUserIdZero)
|
|
)
|
|
|
|
val userMediaUnderstandingAnnotationAggregates: AggregateGroup = AggregateGroup(
|
|
inputSource = timelinesDailyRecapMinimalSource,
|
|
aggregatePrefix = "user_media_annotation_aggregate",
|
|
preTransforms = Seq(RichRemoveUserIdZero),
|
|
keys =
|
|
Set(USER_ID, SemanticCoreFeatures.mediaUnderstandingHighRecallNonSensitiveEntityIdsFeature),
|
|
features = Set.empty,
|
|
labels = RecapUserFeatureAggregation.LabelsV2,
|
|
metrics = Set(CountMetric),
|
|
halfLives = Set(50.days),
|
|
outputStore = mkPhysicalStore(
|
|
OfflineAggregateDataRecordStore(
|
|
name = UserMediaUnderstandingAnnotationAggregateStore,
|
|
startDate = "2021-03-20 00:00",
|
|
commonConfig = timelinesOfflineAggregateSink
|
|
))
|
|
)
|
|
|
|
val userAuthorGoodClickAggregates = AggregateGroup(
|
|
inputSource = timelinesDailyRecapMinimalSource,
|
|
aggregatePrefix = "user_author_good_click_aggregate",
|
|
preTransforms = Seq(FilterInNetworkTransform, RichRemoveUserIdZero),
|
|
keys = Set(USER_ID, AUTHOR_ID),
|
|
features = RecapUserFeatureAggregation.UserAuthorFeaturesV2,
|
|
labels = RecapUserFeatureAggregation.GoodClickLabels,
|
|
metrics = Set(SumMetric),
|
|
halfLives = Set(14.days),
|
|
outputStore = mkPhysicalStore(
|
|
OfflineAggregateDataRecordStore(
|
|
name = UserAuthorAggregateStore,
|
|
startDate = "2016-07-15 00:00",
|
|
commonConfig = timelinesOfflineAggregateSink,
|
|
maxKvSourceFailures = defaultMaxKvSourceFailures
|
|
))
|
|
)
|
|
|
|
val userEngagerGoodClickAggregates = AggregateGroup(
|
|
inputSource = timelinesDailyRecapMinimalSource,
|
|
aggregatePrefix = "user_engager_good_click_aggregate",
|
|
keys = Set(USER_ID, EngagementDataRecordFeatures.PublicEngagementUserIds),
|
|
features = Set.empty,
|
|
labels = RecapUserFeatureAggregation.GoodClickLabels,
|
|
metrics = Set(CountMetric),
|
|
halfLives = Set(14.days),
|
|
outputStore = mkPhysicalStore(
|
|
OfflineAggregateDataRecordStore(
|
|
name = UserEngagerAggregateStore,
|
|
startDate = "2016-09-02 00:00",
|
|
commonConfig = timelinesOfflineAggregateSink,
|
|
maxKvSourceFailures = defaultMaxKvSourceFailures
|
|
)),
|
|
preTransforms = Seq(
|
|
RichRemoveUserIdZero,
|
|
RichUnifyPublicEngagersTransform
|
|
)
|
|
)
|
|
|
|
}
|