delete useless code

This commit is contained in:
kenan238 2023-04-05 20:48:06 +03:00 committed by GitHub
parent 223876769e
commit 1e159bee62
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 0 additions and 1182 deletions

View File

@ -1,8 +0,0 @@
target(
name = "earlybird_ranking",
dependencies = [
"timelines/data_processing/ad_hoc/earlybird_ranking/common",
"timelines/data_processing/ad_hoc/earlybird_ranking/model_evaluation",
"timelines/data_processing/ad_hoc/earlybird_ranking/training_data_generation",
],
)

View File

@ -1,24 +0,0 @@
scala_library(
name = "common",
sources = ["*.scala"],
platform = "java8",
tags = [
"bazel-compatible",
"bazel-compatible:migrated",
],
dependencies = [
"src/java/com/twitter/ml/api:api-base",
"src/java/com/twitter/ml/api/constant",
"src/java/com/twitter/ml/api/transform",
"src/java/com/twitter/search/modeling/tweet_ranking",
"src/scala/com/twitter/ml/api/util",
"src/scala/com/twitter/timelines/prediction/features/common",
"src/scala/com/twitter/timelines/prediction/features/itl",
"src/scala/com/twitter/timelines/prediction/features/real_graph",
"src/scala/com/twitter/timelines/prediction/features/recap",
"src/scala/com/twitter/timelines/prediction/features/request_context",
"src/scala/com/twitter/timelines/prediction/features/time_features",
"src/thrift/com/twitter/ml/api:data-java",
"src/thrift/com/twitter/ml/api:transform-java",
],
)

View File

@ -1,271 +0,0 @@
package com.twitter.timelines.data_processing.ad_hoc.earlybird_ranking.common
import com.twitter.ml.api.DataRecord
import com.twitter.ml.api.Feature
import com.twitter.ml.api.FeatureContext
import com.twitter.ml.api.ITransform
import com.twitter.ml.api.transform.CascadeTransform
import com.twitter.ml.api.transform.TransformFactory
import com.twitter.ml.api.util.SRichDataRecord
import com.twitter.ml.api.constant.SharedFeatures
import com.twitter.search.common.features.SearchResultFeature
import com.twitter.search.common.features.ExternalTweetFeature
import com.twitter.search.common.features.TweetFeature
import com.twitter.timelines.prediction.features.recap.RecapFeatures
import com.twitter.timelines.prediction.features.request_context.RequestContextFeatures
import com.twitter.timelines.prediction.features.time_features.TimeDataRecordFeatures
import com.twitter.timelines.prediction.features.common.TimelinesSharedFeatures
import com.twitter.timelines.prediction.features.real_graph.RealGraphDataRecordFeatures
import scala.collection.JavaConverters._
import java.lang.{Boolean => JBoolean}
case class LabelInfo(name: String, downsampleFraction: Double, importance: Double)
case class LabelInfoWithFeature(info: LabelInfo, feature: Feature[JBoolean])
trait EarlybirdTrainingConfiguration {
protected def labels: Map[String, Feature.Binary]
protected def weights: Map[String, Double] = Map(
"detail_expanded" -> 0.3,
"favorited" -> 1.0,
"open_linked" -> 0.1,
"photo_expanded" -> 0.03,
"profile_clicked" -> 1.0,
"replied" -> 9.0,
"retweeted" -> 1.0,
"video_playback50" -> 0.01
)
// we basically should not downsample any of the precious positive data.
// importance are currently set to match the full model's weights.
protected def PositiveSamplingRate: Double = 1.0
private def NegativeSamplingRate: Double = PositiveSamplingRate * 0.08
// we basically should not downsample any of the precious positive data.
// importance are currently set to match the full model's weights.
final lazy val LabelInfos: List[LabelInfoWithFeature] = {
assert(labels.keySet == weights.keySet)
labels.keySet.map(makeLabelInfoWithFeature).toList
}
def makeLabelInfoWithFeature(labelName: String): LabelInfoWithFeature = {
LabelInfoWithFeature(
LabelInfo(labelName, PositiveSamplingRate, weights(labelName)),
labels(labelName))
}
final lazy val NegativeInfo: LabelInfo = LabelInfo("negative", NegativeSamplingRate, 1.0)
// example of features available in schema based namespace:
protected def featureToSearchResultFeatureMap: Map[Feature[_], SearchResultFeature] = Map(
RecapFeatures.TEXT_SCORE -> TweetFeature.TEXT_SCORE,
RecapFeatures.REPLY_COUNT -> TweetFeature.REPLY_COUNT,
RecapFeatures.RETWEET_COUNT -> TweetFeature.RETWEET_COUNT,
RecapFeatures.FAV_COUNT -> TweetFeature.FAVORITE_COUNT,
RecapFeatures.HAS_CARD -> TweetFeature.HAS_CARD_FLAG,
RecapFeatures.HAS_CONSUMER_VIDEO -> TweetFeature.HAS_CONSUMER_VIDEO_FLAG,
RecapFeatures.HAS_PRO_VIDEO -> TweetFeature.HAS_PRO_VIDEO_FLAG,
// no corresponding HAS_NATIVE_VIDEO feature in TweetFeature
RecapFeatures.HAS_VINE -> TweetFeature.HAS_VINE_FLAG,
RecapFeatures.HAS_PERISCOPE -> TweetFeature.HAS_PERISCOPE_FLAG,
RecapFeatures.HAS_NATIVE_IMAGE -> TweetFeature.HAS_NATIVE_IMAGE_FLAG,
RecapFeatures.HAS_IMAGE -> TweetFeature.HAS_IMAGE_URL_FLAG,
RecapFeatures.HAS_NEWS -> TweetFeature.HAS_NEWS_URL_FLAG,
RecapFeatures.HAS_VIDEO -> TweetFeature.HAS_VIDEO_URL_FLAG,
RecapFeatures.HAS_TREND -> TweetFeature.HAS_TREND_FLAG,
RecapFeatures.HAS_MULTIPLE_HASHTAGS_OR_TRENDS -> TweetFeature.HAS_MULTIPLE_HASHTAGS_OR_TRENDS_FLAG,
RecapFeatures.IS_OFFENSIVE -> TweetFeature.IS_OFFENSIVE_FLAG,
RecapFeatures.IS_REPLY -> TweetFeature.IS_REPLY_FLAG,
RecapFeatures.IS_RETWEET -> TweetFeature.IS_RETWEET_FLAG,
RecapFeatures.IS_AUTHOR_BOT -> TweetFeature.IS_USER_BOT_FLAG,
RecapFeatures.FROM_VERIFIED_ACCOUNT -> TweetFeature.FROM_VERIFIED_ACCOUNT_FLAG,
RecapFeatures.USER_REP -> TweetFeature.USER_REPUTATION,
RecapFeatures.EMBEDS_IMPRESSION_COUNT -> TweetFeature.EMBEDS_IMPRESSION_COUNT,
RecapFeatures.EMBEDS_URL_COUNT -> TweetFeature.EMBEDS_URL_COUNT,
// RecapFeatures.VIDEO_VIEW_COUNT deprecated
RecapFeatures.FAV_COUNT_V2 -> TweetFeature.FAVORITE_COUNT_V2,
RecapFeatures.RETWEET_COUNT_V2 -> TweetFeature.RETWEET_COUNT_V2,
RecapFeatures.REPLY_COUNT_V2 -> TweetFeature.REPLY_COUNT_V2,
RecapFeatures.IS_SENSITIVE -> TweetFeature.IS_SENSITIVE_CONTENT,
RecapFeatures.HAS_MULTIPLE_MEDIA -> TweetFeature.HAS_MULTIPLE_MEDIA_FLAG,
RecapFeatures.IS_AUTHOR_PROFILE_EGG -> TweetFeature.PROFILE_IS_EGG_FLAG,
RecapFeatures.IS_AUTHOR_NEW -> TweetFeature.IS_USER_NEW_FLAG,
RecapFeatures.NUM_MENTIONS -> TweetFeature.NUM_MENTIONS,
RecapFeatures.NUM_HASHTAGS -> TweetFeature.NUM_HASHTAGS,
RecapFeatures.HAS_VISIBLE_LINK -> TweetFeature.HAS_VISIBLE_LINK_FLAG,
RecapFeatures.HAS_LINK -> TweetFeature.HAS_LINK_FLAG,
//note: DISCRETE features are not supported by the modelInterpreter tool.
// for the following features, we will create separate CONTINUOUS features instead of renaming
//RecapFeatures.LINK_LANGUAGE
//RecapFeatures.LANGUAGE
TimelinesSharedFeatures.HAS_QUOTE -> TweetFeature.HAS_QUOTE_FLAG,
TimelinesSharedFeatures.QUOTE_COUNT -> TweetFeature.QUOTE_COUNT,
TimelinesSharedFeatures.WEIGHTED_FAV_COUNT -> TweetFeature.WEIGHTED_FAVORITE_COUNT,
TimelinesSharedFeatures.WEIGHTED_QUOTE_COUNT -> TweetFeature.WEIGHTED_QUOTE_COUNT,
TimelinesSharedFeatures.WEIGHTED_REPLY_COUNT -> TweetFeature.WEIGHTED_REPLY_COUNT,
TimelinesSharedFeatures.WEIGHTED_RETWEET_COUNT -> TweetFeature.WEIGHTED_RETWEET_COUNT,
TimelinesSharedFeatures.DECAYED_FAVORITE_COUNT -> TweetFeature.DECAYED_FAVORITE_COUNT,
TimelinesSharedFeatures.DECAYED_RETWEET_COUNT -> TweetFeature.DECAYED_RETWEET_COUNT,
TimelinesSharedFeatures.DECAYED_REPLY_COUNT -> TweetFeature.DECAYED_RETWEET_COUNT,
TimelinesSharedFeatures.DECAYED_QUOTE_COUNT -> TweetFeature.DECAYED_QUOTE_COUNT,
TimelinesSharedFeatures.FAKE_FAVORITE_COUNT -> TweetFeature.FAKE_FAVORITE_COUNT,
TimelinesSharedFeatures.FAKE_RETWEET_COUNT -> TweetFeature.FAKE_RETWEET_COUNT,
TimelinesSharedFeatures.FAKE_REPLY_COUNT -> TweetFeature.FAKE_REPLY_COUNT,
TimelinesSharedFeatures.FAKE_QUOTE_COUNT -> TweetFeature.FAKE_QUOTE_COUNT,
TimelinesSharedFeatures.EMBEDS_IMPRESSION_COUNT_V2 -> TweetFeature.EMBEDS_IMPRESSION_COUNT_V2,
TimelinesSharedFeatures.EMBEDS_URL_COUNT_V2 -> TweetFeature.EMBEDS_URL_COUNT_V2,
TimelinesSharedFeatures.LABEL_ABUSIVE_FLAG -> TweetFeature.LABEL_ABUSIVE_FLAG,
TimelinesSharedFeatures.LABEL_ABUSIVE_HI_RCL_FLAG -> TweetFeature.LABEL_ABUSIVE_HI_RCL_FLAG,
TimelinesSharedFeatures.LABEL_DUP_CONTENT_FLAG -> TweetFeature.LABEL_DUP_CONTENT_FLAG,
TimelinesSharedFeatures.LABEL_NSFW_HI_PRC_FLAG -> TweetFeature.LABEL_NSFW_HI_PRC_FLAG,
TimelinesSharedFeatures.LABEL_NSFW_HI_RCL_FLAG -> TweetFeature.LABEL_NSFW_HI_RCL_FLAG,
TimelinesSharedFeatures.LABEL_SPAM_FLAG -> TweetFeature.LABEL_SPAM_FLAG,
TimelinesSharedFeatures.LABEL_SPAM_HI_RCL_FLAG -> TweetFeature.LABEL_SPAM_HI_RCL_FLAG
)
protected def derivedFeaturesAdder: ITransform =
new ITransform {
private val hasEnglishTweetDiffUiLangFeature =
featureInstanceFromSearchResultFeature(ExternalTweetFeature.HAS_ENGLISH_TWEET_DIFF_UI_LANG)
.asInstanceOf[Feature.Binary]
private val hasEnglishUiDiffTweetLangFeature =
featureInstanceFromSearchResultFeature(ExternalTweetFeature.HAS_ENGLISH_UI_DIFF_TWEET_LANG)
.asInstanceOf[Feature.Binary]
private val hasDiffLangFeature =
featureInstanceFromSearchResultFeature(ExternalTweetFeature.HAS_DIFF_LANG)
.asInstanceOf[Feature.Binary]
private val isSelfTweetFeature =
featureInstanceFromSearchResultFeature(ExternalTweetFeature.IS_SELF_TWEET)
.asInstanceOf[Feature.Binary]
private val tweetAgeInSecsFeature =
featureInstanceFromSearchResultFeature(ExternalTweetFeature.TWEET_AGE_IN_SECS)
.asInstanceOf[Feature.Continuous]
private val authorSpecificScoreFeature =
featureInstanceFromSearchResultFeature(ExternalTweetFeature.AUTHOR_SPECIFIC_SCORE)
.asInstanceOf[Feature.Continuous]
// see comments above
private val linkLanguageFeature = new Feature.Continuous(TweetFeature.LINK_LANGUAGE.getName)
private val languageFeature = new Feature.Continuous(TweetFeature.LANGUAGE.getName)
override def transformContext(featureContext: FeatureContext): FeatureContext =
featureContext.addFeatures(
authorSpecificScoreFeature,
// used when training against the full scoreEarlybirdModelEvaluationJob.scala
// TimelinesSharedFeatures.PREDICTED_SCORE_LOG,
hasEnglishTweetDiffUiLangFeature,
hasEnglishUiDiffTweetLangFeature,
hasDiffLangFeature,
isSelfTweetFeature,
tweetAgeInSecsFeature,
linkLanguageFeature,
languageFeature
)
override def transform(record: DataRecord): Unit = {
val srecord = SRichDataRecord(record)
srecord.getFeatureValueOpt(RealGraphDataRecordFeatures.WEIGHT).map { realgraphWeight =>
srecord.setFeatureValue(authorSpecificScoreFeature, realgraphWeight)
}
// use this when training against the log of the full score
// srecord.getFeatureValueOpt(TimelinesSharedFeatures.PREDICTED_SCORE).map { score =>
// if (score > 0.0) {
// srecord.setFeatureValue(TimelinesSharedFeatures.PREDICTED_SCORE_LOG, Math.log(score))
// }
// }
if (srecord.hasFeature(RequestContextFeatures.LANGUAGE_CODE) && srecord.hasFeature(
RecapFeatures.LANGUAGE)) {
val uilangIsEnglish = srecord
.getFeatureValue(RequestContextFeatures.LANGUAGE_CODE).toString == "en"
val tweetIsEnglish = srecord.getFeatureValue(RecapFeatures.LANGUAGE) == 5
srecord.setFeatureValue(
hasEnglishTweetDiffUiLangFeature,
tweetIsEnglish && !uilangIsEnglish
)
srecord.setFeatureValue(
hasEnglishUiDiffTweetLangFeature,
uilangIsEnglish && !tweetIsEnglish
)
}
srecord.getFeatureValueOpt(RecapFeatures.MATCH_UI_LANG).map { match_ui_lang =>
srecord.setFeatureValue(
hasDiffLangFeature,
!match_ui_lang
)
}
for {
author_id <- srecord.getFeatureValueOpt(SharedFeatures.AUTHOR_ID)
user_id <- srecord.getFeatureValueOpt(SharedFeatures.USER_ID)
} srecord.setFeatureValue(
isSelfTweetFeature,
author_id == user_id
)
srecord.getFeatureValueOpt(TimeDataRecordFeatures.TIME_SINCE_TWEET_CREATION).map {
time_since_tweet_creation =>
srecord.setFeatureValue(
tweetAgeInSecsFeature,
time_since_tweet_creation / 1000.0
)
}
srecord.getFeatureValueOpt(RecapFeatures.LINK_LANGUAGE).map { link_language =>
srecord.setFeatureValue(
linkLanguageFeature,
link_language.toDouble
)
}
srecord.getFeatureValueOpt(RecapFeatures.LANGUAGE).map { language =>
srecord.setFeatureValue(
languageFeature,
language.toDouble
)
}
}
}
protected def featureInstanceFromSearchResultFeature(
tweetFeature: SearchResultFeature
): Feature[_] = {
val featureType = tweetFeature.getType
val featureName = tweetFeature.getName
require(
!tweetFeature.isDiscrete && (
featureType == com.twitter.search.common.features.thrift.ThriftSearchFeatureType.BOOLEAN_VALUE ||
featureType == com.twitter.search.common.features.thrift.ThriftSearchFeatureType.DOUBLE_VALUE ||
featureType == com.twitter.search.common.features.thrift.ThriftSearchFeatureType.INT32_VALUE
)
)
if (featureType == com.twitter.search.common.features.thrift.ThriftSearchFeatureType.BOOLEAN_VALUE)
new Feature.Binary(featureName)
else
new Feature.Continuous(featureName)
}
lazy val EarlybirdFeatureRenamer: ITransform = {
val earlybirdFeatureRenameMap: Map[Feature[_], Feature[_]] =
featureToSearchResultFeatureMap.map {
case (originalFeature, tweetFeature) =>
originalFeature -> featureInstanceFromSearchResultFeature(tweetFeature)
}.toMap
new CascadeTransform(
List(
derivedFeaturesAdder,
TransformFactory.produceTransform(
TransformFactory.produceFeatureRenameTransformSpec(
earlybirdFeatureRenameMap.asJava
)
)
).asJava
)
}
}

View File

@ -1,17 +0,0 @@
package com.twitter.timelines.data_processing.ad_hoc.earlybird_ranking.common
import com.twitter.ml.api.Feature
import com.twitter.timelines.prediction.features.recap.RecapFeatures
class EarlybirdTrainingRecapConfiguration extends EarlybirdTrainingConfiguration {
override val labels: Map[String, Feature.Binary] = Map(
"detail_expanded" -> RecapFeatures.IS_CLICKED,
"favorited" -> RecapFeatures.IS_FAVORITED,
"open_linked" -> RecapFeatures.IS_OPEN_LINKED,
"photo_expanded" -> RecapFeatures.IS_PHOTO_EXPANDED,
"profile_clicked" -> RecapFeatures.IS_PROFILE_CLICKED,
"replied" -> RecapFeatures.IS_REPLIED,
"retweeted" -> RecapFeatures.IS_RETWEETED,
"video_playback50" -> RecapFeatures.IS_VIDEO_PLAYBACK_50
)
}

View File

@ -1,100 +0,0 @@
package com.twitter.timelines.data_processing.ad_hoc.earlybird_ranking.common
import com.twitter.ml.api.DataRecord
import com.twitter.ml.api.Feature
import com.twitter.ml.api.FeatureContext
import com.twitter.ml.api.ITransform
import com.twitter.ml.api.transform.CascadeTransform
import com.twitter.ml.api.util.SRichDataRecord
import com.twitter.search.common.features.SearchResultFeature
import com.twitter.search.common.features.TweetFeature
import com.twitter.timelines.prediction.features.itl.ITLFeatures._
import scala.collection.JavaConverters._
class EarlybirdTrainingRectweetConfiguration extends EarlybirdTrainingConfiguration {
override val labels: Map[String, Feature.Binary] = Map(
"detail_expanded" -> IS_CLICKED,
"favorited" -> IS_FAVORITED,
"open_linked" -> IS_OPEN_LINKED,
"photo_expanded" -> IS_PHOTO_EXPANDED,
"profile_clicked" -> IS_PROFILE_CLICKED,
"replied" -> IS_REPLIED,
"retweeted" -> IS_RETWEETED,
"video_playback50" -> IS_VIDEO_PLAYBACK_50
)
override val PositiveSamplingRate: Double = 0.5
override def featureToSearchResultFeatureMap: Map[Feature[_], SearchResultFeature] =
super.featureToSearchResultFeatureMap ++ Map(
TEXT_SCORE -> TweetFeature.TEXT_SCORE,
REPLY_COUNT -> TweetFeature.REPLY_COUNT,
RETWEET_COUNT -> TweetFeature.RETWEET_COUNT,
FAV_COUNT -> TweetFeature.FAVORITE_COUNT,
HAS_CARD -> TweetFeature.HAS_CARD_FLAG,
HAS_CONSUMER_VIDEO -> TweetFeature.HAS_CONSUMER_VIDEO_FLAG,
HAS_PRO_VIDEO -> TweetFeature.HAS_PRO_VIDEO_FLAG,
HAS_VINE -> TweetFeature.HAS_VINE_FLAG,
HAS_PERISCOPE -> TweetFeature.HAS_PERISCOPE_FLAG,
HAS_NATIVE_IMAGE -> TweetFeature.HAS_NATIVE_IMAGE_FLAG,
HAS_IMAGE -> TweetFeature.HAS_IMAGE_URL_FLAG,
HAS_NEWS -> TweetFeature.HAS_NEWS_URL_FLAG,
HAS_VIDEO -> TweetFeature.HAS_VIDEO_URL_FLAG,
// some features that exist for recap are not available in rectweet
// HAS_TREND
// HAS_MULTIPLE_HASHTAGS_OR_TRENDS
// IS_OFFENSIVE
// IS_REPLY
// IS_RETWEET
IS_AUTHOR_BOT -> TweetFeature.IS_USER_BOT_FLAG,
IS_AUTHOR_SPAM -> TweetFeature.IS_USER_SPAM_FLAG,
IS_AUTHOR_NSFW -> TweetFeature.IS_USER_NSFW_FLAG,
// FROM_VERIFIED_ACCOUNT
USER_REP -> TweetFeature.USER_REPUTATION,
// EMBEDS_IMPRESSION_COUNT
// EMBEDS_URL_COUNT
// VIDEO_VIEW_COUNT
FAV_COUNT_V2 -> TweetFeature.FAVORITE_COUNT_V2,
RETWEET_COUNT_V2 -> TweetFeature.RETWEET_COUNT_V2,
REPLY_COUNT_V2 -> TweetFeature.REPLY_COUNT_V2,
IS_SENSITIVE -> TweetFeature.IS_SENSITIVE_CONTENT,
HAS_MULTIPLE_MEDIA -> TweetFeature.HAS_MULTIPLE_MEDIA_FLAG,
IS_AUTHOR_PROFILE_EGG -> TweetFeature.PROFILE_IS_EGG_FLAG,
IS_AUTHOR_NEW -> TweetFeature.IS_USER_NEW_FLAG,
NUM_MENTIONS -> TweetFeature.NUM_MENTIONS,
NUM_HASHTAGS -> TweetFeature.NUM_HASHTAGS,
HAS_VISIBLE_LINK -> TweetFeature.HAS_VISIBLE_LINK_FLAG,
HAS_LINK -> TweetFeature.HAS_LINK_FLAG
)
override def derivedFeaturesAdder: CascadeTransform = {
// only LINK_LANGUAGE availabe in rectweet. no LANGUAGE feature
val linkLanguageTransform = new ITransform {
private val linkLanguageFeature = new Feature.Continuous(TweetFeature.LINK_LANGUAGE.getName)
override def transformContext(featureContext: FeatureContext): FeatureContext =
featureContext.addFeatures(
linkLanguageFeature
)
override def transform(record: DataRecord): Unit = {
val srecord = SRichDataRecord(record)
srecord.getFeatureValueOpt(LINK_LANGUAGE).map { link_language =>
srecord.setFeatureValue(
linkLanguageFeature,
link_language.toDouble
)
}
}
}
new CascadeTransform(
List(
super.derivedFeaturesAdder,
linkLanguageTransform
).asJava
)
}
}

View File

@ -1,36 +0,0 @@
scala_library(
name = "model_evaluation",
sources = ["*.scala"],
platform = "java8",
strict_deps = False,
dependencies = [
"3rdparty/src/jvm/com/twitter/scalding:json",
"src/scala/com/twitter/ml/api:api-base",
"src/scala/com/twitter/ml/api/prediction_engine",
"src/scala/com/twitter/ml/api/util",
"src/scala/com/twitter/scalding_internal/job",
"src/scala/com/twitter/timelines/prediction/adapters/recap",
"src/scala/com/twitter/timelines/prediction/features/recap",
"timelines/data_processing/ad_hoc/earlybird_ranking/common",
"timelines/data_processing/util:rich-request",
"timelines/data_processing/util/example",
"timelines/data_processing/util/execution",
"twadoop_config/configuration/log_categories/group/timelines:timelineservice_injection_request_log-scala",
],
)
hadoop_binary(
name = "bin",
basename = "earlybird_model_evaluation-deploy",
main = "com.twitter.timelines.data_processing.ad_hoc.earlybird_ranking.model_evaluation.EarlybirdModelEvaluationJob",
platform = "java8",
runtime_platform = "java8",
tags = [
"bazel-compatible",
"bazel-compatible:migrated",
"bazel-only",
],
dependencies = [
":model_evaluation",
],
)

View File

@ -1,203 +0,0 @@
package com.twitter.timelines.data_processing.ad_hoc.earlybird_ranking.model_evaluation
import scala.collection.GenTraversableOnce
case class CandidateRecord(tweetId: Long, fullScore: Double, earlyScore: Double, served: Boolean)
/**
* A metric that compares scores generated by a "full" prediction
* model to a "light" (Earlybird) model. The metric is calculated for candidates
* from a single request.
*/
sealed trait EarlybirdEvaluationMetric {
def name: String
def apply(candidates: Seq[CandidateRecord]): Option[Double]
}
/**
* Picks the set of `k` top candidates using light scores, and calculates
* recall of these light-score based candidates among set of `k` top candidates
* using full scores.
*
* If there are fewer than `k` candidates, then we can choose to filter out requests (will
* lower value of recall) or keep them by trivially computing recall as 1.0.
*/
case class TopKRecall(k: Int, filterFewerThanK: Boolean) extends EarlybirdEvaluationMetric {
override val name: String = s"top_${k}_recall${if (filterFewerThanK) "_filtered" else ""}"
override def apply(candidates: Seq[CandidateRecord]): Option[Double] = {
if (candidates.size <= k) {
if (filterFewerThanK) None else Some(1.0)
} else {
val topFull = candidates.sortBy(-_.fullScore).take(k)
val topLight = candidates.sortBy(-_.earlyScore).take(k)
val overlap = topFull.map(_.tweetId).intersect(topLight.map(_.tweetId))
val truePos = overlap.size.toDouble
Some(truePos / k.toDouble)
}
}
}
/**
* Calculates the probability that a random pair of candidates will be ordered the same by the
* full and earlybird models.
*
* Note: A pair with same scores for one model and different for the other will contribute 1
* to the sum. Pairs that are strictly ordered the same, will contribute 2.
* It follows that the score for a constant model is 0.5, which is approximately equal to a
* random model as expected.
*/
case object ProbabilityOfCorrectOrdering extends EarlybirdEvaluationMetric {
def fractionOf[A](trav: GenTraversableOnce[A])(p: A => Boolean): Double = {
if (trav.isEmpty)
0.0
else {
val (numPos, numElements) = trav.foldLeft((0, 0)) {
case ((numPosAcc, numElementsAcc), elem) =>
(if (p(elem)) numPosAcc + 1 else numPosAcc, numElementsAcc + 1)
}
numPos.toDouble / numElements
}
}
override def name: String = "probability_of_correct_ordering"
override def apply(candidates: Seq[CandidateRecord]): Option[Double] = {
if (candidates.size < 2)
None
else {
val pairs = for {
left <- candidates.iterator
right <- candidates.iterator
if left != right
} yield (left, right)
val probabilityOfCorrect = fractionOf(pairs) {
case (left, right) =>
(left.fullScore > right.fullScore) == (left.earlyScore > right.earlyScore)
}
Some(probabilityOfCorrect)
}
}
}
/**
* Like `TopKRecall`, but uses `n` % of top candidates instead.
*/
case class TopNPercentRecall(percent: Double) extends EarlybirdEvaluationMetric {
override val name: String = s"top_${percent}_pct_recall"
override def apply(candidates: Seq[CandidateRecord]): Option[Double] = {
val k = Math.floor(candidates.size * percent).toInt
if (k > 0) {
val topFull = candidates.sortBy(-_.fullScore).take(k)
val topLight = candidates.sortBy(-_.earlyScore).take(k)
val overlap = topFull.map(_.tweetId).intersect(topLight.map(_.tweetId))
val truePos = overlap.size.toDouble
Some(truePos / k.toDouble)
} else {
None
}
}
}
/**
* Picks the set of `k` top candidates using light scores, and calculates
* recall of selected light-score based candidates among set of actual
* shown candidates.
*/
case class ShownTweetRecall(k: Int) extends EarlybirdEvaluationMetric {
override val name: String = s"shown_tweet_recall_$k"
override def apply(candidates: Seq[CandidateRecord]): Option[Double] = {
if (candidates.size <= k) {
None
} else {
val topLight = candidates.sortBy(-_.earlyScore).take(k)
val truePos = topLight.count(_.served).toDouble
val allPos = candidates.count(_.served).toDouble
if (allPos > 0) Some(truePos / allPos)
else None
}
}
}
/**
* Like `ShownTweetRecall`, but uses `n` % of top candidates instead.
*/
case class ShownTweetPercentRecall(percent: Double) extends EarlybirdEvaluationMetric {
override val name: String = s"shown_tweet_recall_${percent}_pct"
override def apply(candidates: Seq[CandidateRecord]): Option[Double] = {
val k = Math.floor(candidates.size * percent).toInt
val topLight = candidates.sortBy(-_.earlyScore).take(k)
val truePos = topLight.count(_.served).toDouble
val allPos = candidates.count(_.served).toDouble
if (allPos > 0) Some(truePos / allPos)
else None
}
}
/**
* Like `ShownTweetRecall`, but calculated using *full* scores. This is a sanity metric,
* because by definition the top full-scored candidates will be served. If the value is
* < 1, this is due to the ranked section being smaller than k.
*/
case class ShownTweetRecallWithFullScores(k: Int) extends EarlybirdEvaluationMetric {
override val name: String = s"shown_tweet_recall_with_full_scores_$k"
override def apply(candidates: Seq[CandidateRecord]): Option[Double] = {
if (candidates.size <= k) {
None
} else {
val topFull = candidates.sortBy(-_.fullScore).take(k)
val truePos = topFull.count(_.served).toDouble
val allPos = candidates.count(_.served).toDouble
if (allPos > 0) Some(truePos / allPos)
else None
}
}
}
/**
* Picks the set of `k` top candidates using the light scores, and calculates
* average full score for the candidates.
*/
case class AverageFullScoreForTopLight(k: Int) extends EarlybirdEvaluationMetric {
override val name: String = s"average_full_score_for_top_light_$k"
override def apply(candidates: Seq[CandidateRecord]): Option[Double] = {
if (candidates.size <= k) {
None
} else {
val topLight = candidates.sortBy(-_.earlyScore).take(k)
Some(topLight.map(_.fullScore).sum / topLight.size)
}
}
}
/**
* Picks the set of `k` top candidates using the light scores, and calculates
* sum of full scores for those. Divides that by sum of `k` top full scores,
* overall, to get a "score recall".
*/
case class SumScoreRecallForTopLight(k: Int) extends EarlybirdEvaluationMetric {
override val name: String = s"sum_score_recall_for_top_light_$k"
override def apply(candidates: Seq[CandidateRecord]): Option[Double] = {
if (candidates.size <= k) {
None
} else {
val sumFullScoresForTopLight = candidates.sortBy(-_.earlyScore).take(k).map(_.fullScore).sum
val sumScoresForTopFull = candidates.sortBy(-_.fullScore).take(k).map(_.fullScore).sum
Some(sumFullScoresForTopLight / sumScoresForTopFull)
}
}
}
case class HasFewerThanKCandidates(k: Int) extends EarlybirdEvaluationMetric {
override val name: String = s"has_fewer_than_${k}_candidates"
override def apply(candidates: Seq[CandidateRecord]): Option[Double] =
Some(if (candidates.size <= k) 1.0 else 0.0)
}
case object NumberOfCandidates extends EarlybirdEvaluationMetric {
override val name: String = s"number_of_candidates"
override def apply(candidates: Seq[CandidateRecord]): Option[Double] =
Some(candidates.size.toDouble)
}

View File

@ -1,214 +0,0 @@
package com.twitter.timelines.data_processing.ad_hoc.earlybird_ranking.model_evaluation
import com.twitter.algebird.Aggregator
import com.twitter.algebird.AveragedValue
import com.twitter.ml.api.prediction_engine.PredictionEnginePlugin
import com.twitter.ml.api.util.FDsl
import com.twitter.ml.api.DataRecord
import com.twitter.ml.api.IRecordOneToManyAdapter
import com.twitter.scalding.Args
import com.twitter.scalding.DateRange
import com.twitter.scalding.Execution
import com.twitter.scalding.TypedJson
import com.twitter.scalding.TypedPipe
import com.twitter.scalding_internal.dalv2.DAL
import com.twitter.scalding_internal.job.TwitterExecutionApp
import com.twitter.timelines.data_processing.ad_hoc.earlybird_ranking.common.EarlybirdTrainingRecapConfiguration
import com.twitter.timelines.data_processing.util.RequestImplicits.RichRequest
import com.twitter.timelines.data_processing.util.example.RecapTweetExample
import com.twitter.timelines.data_processing.util.execution.UTCDateRangeFromArgs
import com.twitter.timelines.prediction.adapters.recap.RecapSuggestionRecordAdapter
import com.twitter.timelines.prediction.features.recap.RecapFeatures
import com.twitter.timelines.suggests.common.record.thriftscala.SuggestionRecord
import com.twitter.timelineservice.suggests.logging.recap.thriftscala.HighlightTweet
import com.twitter.timelineservice.suggests.logging.thriftscala.SuggestsRequestLog
import scala.collection.JavaConverters._
import scala.language.reflectiveCalls
import scala.util.Random
import twadoop_config.configuration.log_categories.group.timelines.TimelineserviceInjectionRequestLogScalaDataset
/**
* Evaluates an Earlybird model using 1% injection request logs.
*
* Arguments:
* --model_base_path path to Earlybird model snapshots
* --models list of model names to evaluate
* --output path to output stats
* --parallelism (default: 3) number of tasks to run in parallel
* --topks (optional) list of values of `k` (integers) for top-K metrics
* --topn_fractions (optional) list of values of `n` (doubles) for top-N-fraction metrics
* --seed (optional) seed for random number generator
*/
object EarlybirdModelEvaluationJob extends TwitterExecutionApp with UTCDateRangeFromArgs {
import FDsl._
import PredictionEnginePlugin._
private[this] val averager: Aggregator[Double, AveragedValue, Double] =
AveragedValue.aggregator
private[this] val recapAdapter: IRecordOneToManyAdapter[SuggestionRecord] =
new RecapSuggestionRecordAdapter(checkDwellTime = false)
override def job: Execution[Unit] = {
for {
args <- Execution.getArgs
dateRange <- dateRangeEx
metrics = getMetrics(args)
random = buildRandom(args)
modelBasePath = args("model_base_path")
models = args.list("models")
parallelism = args.int("parallelism", 3)
logs = logsHavingCandidates(dateRange)
modelScoredCandidates = models.map { model =>
(model, scoreCandidatesUsingModel(logs, s"$modelBasePath/$model"))
}
functionScoredCandidates = List(
("random", scoreCandidatesUsingFunction(logs, _ => Some(random.nextDouble()))),
("original_earlybird", scoreCandidatesUsingFunction(logs, extractOriginalEarlybirdScore)),
("blender", scoreCandidatesUsingFunction(logs, extractBlenderScore))
)
allCandidates = modelScoredCandidates ++ functionScoredCandidates
statsExecutions = allCandidates.map {
case (name, pipe) =>
for {
saved <- pipe.forceToDiskExecution
stats <- computeMetrics(saved, metrics, parallelism)
} yield (name, stats)
}
stats <- Execution.withParallelism(statsExecutions, parallelism)
_ <- TypedPipe.from(stats).writeExecution(TypedJson(args("output")))
} yield ()
}
private[this] def computeMetrics(
requests: TypedPipe[Seq[CandidateRecord]],
metricsToCompute: Seq[EarlybirdEvaluationMetric],
parallelism: Int
): Execution[Map[String, Double]] = {
val metricExecutions = metricsToCompute.map { metric =>
val metricEx = requests.flatMap(metric(_)).aggregate(averager).toOptionExecution
metricEx.map { value => value.map((metric.name, _)) }
}
Execution.withParallelism(metricExecutions, parallelism).map(_.flatten.toMap)
}
private[this] def getMetrics(args: Args): Seq[EarlybirdEvaluationMetric] = {
val topKs = args.list("topks").map(_.toInt)
val topNFractions = args.list("topn_fractions").map(_.toDouble)
val topKMetrics = topKs.flatMap { topK =>
Seq(
TopKRecall(topK, filterFewerThanK = false),
TopKRecall(topK, filterFewerThanK = true),
ShownTweetRecall(topK),
AverageFullScoreForTopLight(topK),
SumScoreRecallForTopLight(topK),
HasFewerThanKCandidates(topK),
ShownTweetRecallWithFullScores(topK),
ProbabilityOfCorrectOrdering
)
}
val topNPercentMetrics = topNFractions.flatMap { topNPercent =>
Seq(
TopNPercentRecall(topNPercent),
ShownTweetPercentRecall(topNPercent)
)
}
topKMetrics ++ topNPercentMetrics ++ Seq(NumberOfCandidates)
}
private[this] def buildRandom(args: Args): Random = {
val seedOpt = args.optional("seed").map(_.toLong)
seedOpt.map(new Random(_)).getOrElse(new Random())
}
private[this] def logsHavingCandidates(dateRange: DateRange): TypedPipe[SuggestsRequestLog] =
DAL
.read(TimelineserviceInjectionRequestLogScalaDataset, dateRange)
.toTypedPipe
.filter(_.recapCandidates.exists(_.nonEmpty))
/**
* Uses a model defined at `earlybirdModelPath` to score candidates and
* returns a Seq[CandidateRecord] for each request.
*/
private[this] def scoreCandidatesUsingModel(
logs: TypedPipe[SuggestsRequestLog],
earlybirdModelPath: String
): TypedPipe[Seq[CandidateRecord]] = {
logs
.usingScorer(earlybirdModelPath)
.map {
case (scorer: PredictionEngineScorer, log: SuggestsRequestLog) =>
val suggestionRecords =
RecapTweetExample
.extractCandidateTweetExamples(log)
.map(_.asSuggestionRecord)
val servedTweetIds = log.servedHighlightTweets.flatMap(_.tweetId).toSet
val renamer = (new EarlybirdTrainingRecapConfiguration).EarlybirdFeatureRenamer
suggestionRecords.flatMap { suggestionRecord =>
val dataRecordOpt = recapAdapter.adaptToDataRecords(suggestionRecord).asScala.headOption
dataRecordOpt.foreach(renamer.transform)
for {
tweetId <- suggestionRecord.itemId
fullScore <- suggestionRecord.recapFeatures.flatMap(_.combinedModelScore)
earlybirdScore <- dataRecordOpt.flatMap(calculateLightScore(_, scorer))
} yield CandidateRecord(
tweetId = tweetId,
fullScore = fullScore,
earlyScore = earlybirdScore,
served = servedTweetIds.contains(tweetId)
)
}
}
}
/**
* Uses a simple function to score candidates and returns a Seq[CandidateRecord] for each
* request.
*/
private[this] def scoreCandidatesUsingFunction(
logs: TypedPipe[SuggestsRequestLog],
earlyScoreExtractor: HighlightTweet => Option[Double]
): TypedPipe[Seq[CandidateRecord]] = {
logs
.map { log =>
val tweetCandidates = log.recapTweetCandidates.getOrElse(Nil)
val servedTweetIds = log.servedHighlightTweets.flatMap(_.tweetId).toSet
for {
candidate <- tweetCandidates
tweetId <- candidate.tweetId
fullScore <- candidate.recapFeatures.flatMap(_.combinedModelScore)
earlyScore <- earlyScoreExtractor(candidate)
} yield CandidateRecord(
tweetId = tweetId,
fullScore = fullScore,
earlyScore = earlyScore,
served = servedTweetIds.contains(tweetId)
)
}
}
private[this] def extractOriginalEarlybirdScore(candidate: HighlightTweet): Option[Double] =
for {
recapFeatures <- candidate.recapFeatures
tweetFeatures <- recapFeatures.tweetFeatures
} yield tweetFeatures.earlybirdScore
private[this] def extractBlenderScore(candidate: HighlightTweet): Option[Double] =
for {
recapFeatures <- candidate.recapFeatures
tweetFeatures <- recapFeatures.tweetFeatures
} yield tweetFeatures.blenderScore
private[this] def calculateLightScore(
dataRecord: DataRecord,
scorer: PredictionEngineScorer
): Option[Double] = {
val scoredRecord = scorer(dataRecord)
if (scoredRecord.hasFeature(RecapFeatures.PREDICTED_IS_UNIFIED_ENGAGEMENT)) {
Some(scoredRecord.getFeatureValue(RecapFeatures.PREDICTED_IS_UNIFIED_ENGAGEMENT).toDouble)
} else {
None
}
}
}

View File

@ -1,89 +0,0 @@
create_datarecord_datasets(
base_name = "earlybird_recap_data_records",
platform = "java8",
role = "timelines",
segment_type = "partitioned",
tags = [
"bazel-compatible",
"bazel-compatible:migrated",
],
)
create_datarecord_datasets(
base_name = "earlybird_rectweet_data_records",
platform = "java8",
role = "timelines",
segment_type = "partitioned",
tags = [
"bazel-compatible",
"bazel-compatible:migrated",
],
)
scala_library(
name = "training_data_generation",
sources = ["*.scala"],
platform = "java8",
strict_deps = True,
tags = [
"bazel-compatible",
"bazel-compatible:migrated",
],
dependencies = [
":earlybird_recap_data_records-java",
":earlybird_rectweet_data_records-java",
"3rdparty/jvm/com/ibm/icu:icu4j",
"3rdparty/src/jvm/com/twitter/scalding:json",
"src/java/com/twitter/ml/api:api-base",
"src/java/com/twitter/ml/api/constant",
"src/java/com/twitter/ml/api/matcher",
"src/java/com/twitter/search/common/features",
"src/scala/com/twitter/ml/api:api-base",
"src/scala/com/twitter/ml/api/analytics",
"src/scala/com/twitter/ml/api/util",
"src/scala/com/twitter/scalding_internal/dalv2",
"src/scala/com/twitter/scalding_internal/dalv2/dataset",
"src/scala/com/twitter/scalding_internal/job",
"src/scala/com/twitter/scalding_internal/job/analytics_batch",
"src/scala/com/twitter/timelines/prediction/features/common",
"src/scala/com/twitter/timelines/prediction/features/recap",
"src/thrift/com/twitter/ml/api:data-java",
"src/thrift/com/twitter/ml/api:dataset-analytics-java",
"timelines/data_processing/ad_hoc/earlybird_ranking/common",
"timelines/data_processing/ad_hoc/recap/dataset_utils",
"timelines/data_processing/ad_hoc/recap/offline_execution",
"timelines/data_processing/util/execution",
],
)
hadoop_binary(
name = "bin",
basename = "earlybird_training_data_generation-deploy",
main = "com.twitter.scalding.Tool",
platform = "java8",
runtime_platform = "java8",
tags = [
"bazel-compatible",
"bazel-compatible:migrated",
"bazel-only",
],
dependencies = [
":training_data_generation",
],
)
hadoop_binary(
name = "earlybird_training_data_generation_prod",
basename = "earlybird_training_data_generation_prod-deploy",
main = "com.twitter.timelines.data_processing.ad_hoc.earlybird_ranking.training_data_generation.EarlybirdTrainingDataProdJob",
platform = "java8",
runtime_platform = "java8",
tags = [
"bazel-compatible",
"bazel-compatible:migrated",
"bazel-only",
],
dependencies = [
":training_data_generation",
],
)

View File

@ -1,65 +0,0 @@
package com.twitter.timelines.data_processing.ad_hoc.earlybird_ranking.training_data_generation
import com.twitter.ml.api.constant.SharedFeatures
import com.twitter.ml.api.DataSetPipe
import com.twitter.ml.api.Feature
import com.twitter.timelines.data_processing.ad_hoc.earlybird_ranking.common.LabelInfo
import com.twitter.timelines.data_processing.ad_hoc.earlybird_ranking.common.LabelInfoWithFeature
import com.twitter.timelines.prediction.features.recap.RecapFeatures
import java.lang.{Double => JDouble}
import scala.util.Random
/**
* Adds an IsGlobalEngagement label to records containing any recap label, and adjusts
* weights accordingly. See [[weightAndSample]] for details on operation.
*/
class EarlybirdExampleSampler(
random: Random,
labelInfos: List[LabelInfoWithFeature],
negativeInfo: LabelInfo) {
import com.twitter.ml.api.util.FDsl._
private[this] val ImportanceFeature: Feature[JDouble] =
SharedFeatures.RECORD_WEIGHT_FEATURE_BUILDER
.extensionBuilder()
.addExtension("type", "earlybird")
.build()
private[this] def uniformSample(labelInfo: LabelInfo) =
random.nextDouble() < labelInfo.downsampleFraction
private[this] def weightedImportance(labelInfo: LabelInfo) =
labelInfo.importance / labelInfo.downsampleFraction
/**
* Generates a IsGlobalEngagement label for records that contain any
* recap label. Adds an "importance" value per recap label found
* in the record. Simultaneously, downsamples positive and negative examples based on provided
* downsample rates.
*/
def weightAndSample(data: DataSetPipe): DataSetPipe = {
val updatedRecords = data.records.flatMap { record =>
val featuresOn = labelInfos.filter(labelInfo => record.hasFeature(labelInfo.feature))
if (featuresOn.nonEmpty) {
val sampled = featuresOn.map(_.info).filter(uniformSample)
if (sampled.nonEmpty) {
record.setFeatureValue(RecapFeatures.IS_EARLYBIRD_UNIFIED_ENGAGEMENT, true)
Some(record.setFeatureValue(ImportanceFeature, sampled.map(weightedImportance).sum))
} else {
None
}
} else if (uniformSample(negativeInfo)) {
Some(record.setFeatureValue(ImportanceFeature, weightedImportance(negativeInfo)))
} else {
None
}
}
DataSetPipe(
updatedRecords,
data.featureContext
.addFeatures(ImportanceFeature, RecapFeatures.IS_EARLYBIRD_UNIFIED_ENGAGEMENT)
)
}
}

View File

@ -1,63 +0,0 @@
package com.twitter.timelines.data_processing.ad_hoc.earlybird_ranking.training_data_generation
import com.twitter.ml.api.analytics.DataSetAnalyticsPlugin
import com.twitter.ml.api.matcher.FeatureMatcher
import com.twitter.ml.api.util.FDsl
import com.twitter.ml.api.DailySuffixFeatureSource
import com.twitter.ml.api.DataRecord
import com.twitter.ml.api.DataSetPipe
import com.twitter.ml.api.FeatureStats
import com.twitter.ml.api.IMatcher
import com.twitter.scalding.typed.TypedPipe
import com.twitter.scalding.Execution
import com.twitter.scalding.TypedJson
import com.twitter.scalding_internal.job.TwitterExecutionApp
import com.twitter.timelines.data_processing.util.execution.UTCDateRangeFromArgs
import com.twitter.timelines.data_processing.ad_hoc.earlybird_ranking.common.EarlybirdTrainingConfiguration
import com.twitter.timelines.data_processing.ad_hoc.earlybird_ranking.common.EarlybirdTrainingRecapConfiguration
import com.twitter.timelines.prediction.features.recap.RecapFeatures
import scala.collection.JavaConverters._
/**
* Compute counts and fractions for all labels in a Recap data source.
*
* Arguments:
* --input recap data source (containing all labels)
* --output path to output JSON file containing stats
*/
object EarlybirdStatsJob extends TwitterExecutionApp with UTCDateRangeFromArgs {
import DataSetAnalyticsPlugin._
import FDsl._
import RecapFeatures.IS_EARLYBIRD_UNIFIED_ENGAGEMENT
lazy val constants: EarlybirdTrainingConfiguration = new EarlybirdTrainingRecapConfiguration
private[this] def addGlobalEngagementLabel(record: DataRecord) = {
if (constants.LabelInfos.exists { labelInfo => record.hasFeature(labelInfo.feature) }) {
record.setFeatureValue(IS_EARLYBIRD_UNIFIED_ENGAGEMENT, true)
}
record
}
private[this] def labelFeatureMatcher: IMatcher = {
val allLabels =
(IS_EARLYBIRD_UNIFIED_ENGAGEMENT :: constants.LabelInfos.map(_.feature)).map(_.getFeatureName)
FeatureMatcher.names(allLabels.asJava)
}
private[this] def computeStats(data: DataSetPipe): TypedPipe[FeatureStats] = {
data
.viaRecords { _.map(addGlobalEngagementLabel) }
.project(labelFeatureMatcher)
.collectFeatureStats()
}
override def job: Execution[Unit] = {
for {
args <- Execution.getArgs
dateRange <- dateRangeEx
data = DailySuffixFeatureSource(args("input"))(dateRange).read
_ <- computeStats(data).writeExecution(TypedJson(args("output")))
} yield ()
}
}

View File

@ -1,92 +0,0 @@
package com.twitter.timelines.data_processing.ad_hoc.earlybird_ranking.training_data_generation
import com.twitter.ml.api.HourlySuffixFeatureSource
import com.twitter.ml.api.IRecord
import com.twitter.scalding.Args
import com.twitter.scalding.DateRange
import com.twitter.scalding.Days
import com.twitter.scalding.Execution
import com.twitter.scalding.ExecutionUtil
import com.twitter.scalding_internal.dalv2.DALWrite.D
import com.twitter.timelines.data_processing.ad_hoc.earlybird_ranking.common.EarlybirdTrainingRecapConfiguration
import com.twitter.timelines.data_processing.ad_hoc.earlybird_ranking.common.EarlybirdTrainingRectweetConfiguration
import com.twitter.timelines.data_processing.ad_hoc.recap.offline_execution.OfflineAdhocExecution
import com.twitter.timelines.data_processing.ad_hoc.recap.offline_execution.OfflineAnalyticsBatchExecution
import com.twitter.timelines.data_processing.ad_hoc.recap.offline_execution.OfflineExecution
import scala.util.Random
import com.twitter.scalding_internal.dalv2.dataset.DALWrite._
import com.twitter.timelines.prediction.features.common.TimelinesSharedFeatures
import timelines.data_processing.ad_hoc.earlybird_ranking.training_data_generation._
/**
* Generates data for training an Earlybird-friendly model.
* Produces a single "global" engagement, and samples data accordingly.
* Also converts features from Earlybird to their original Earlybird
* feature names so they can be used as is in EB.
*
* Arguments:
* --input path to raw Recap training data (all labels)
* --output path to write sampled Earlybird-friendly training data
* --seed (optional) for random number generator (in sampling)
* --parallelism (default: 1) number of days to generate data for in parallel
* [splits long date range into single days]
*/
trait GenerateEarlybirdTrainingData { _: OfflineExecution =>
def isEligibleForEarlybirdScoring(record: IRecord): Boolean = {
// The rationale behind this logic is available in TQ-9678.
record.getFeatureValue(TimelinesSharedFeatures.EARLYBIRD_SCORE) <= 100.0
}
override def executionFromParams(args: Args)(implicit dateRange: DateRange): Execution[Unit] = {
val seedOpt = args.optional("seed").map(_.toLong)
val parallelism = args.int("parallelism", 1)
val rectweet = args.boolean("rectweet")
ExecutionUtil
.runDateRangeWithParallelism(Days(1), parallelism) { splitRange =>
val data = HourlySuffixFeatureSource(args("input"))(splitRange).read
.filter(isEligibleForEarlybirdScoring _)
lazy val rng = seedOpt.map(new Random(_)).getOrElse(new Random())
val (constants, sink) =
if (rectweet)
(new EarlybirdTrainingRectweetConfiguration, EarlybirdRectweetDataRecordsJavaDataset)
else (new EarlybirdTrainingRecapConfiguration, EarlybirdRecapDataRecordsJavaDataset)
val earlybirdSampler =
new EarlybirdExampleSampler(
random = rng,
labelInfos = constants.LabelInfos,
negativeInfo = constants.NegativeInfo
)
val outputPath = args("output")
earlybirdSampler
.weightAndSample(data)
.transform(constants.EarlybirdFeatureRenamer)
// shuffle row-wise in order to get rid of clustered replies
// also keep number of part files small
.viaRecords { record =>
record
.groupRandomly(partitions = 500)
.sortBy { _ => rng.nextDouble() }
.values
}
.writeDALExecution(
sink,
D.Daily,
D.Suffix(outputPath),
D.EBLzo()
)(splitRange)
}(dateRange).unit
}
}
object EarlybirdTrainingDataAdHocJob
extends OfflineAdhocExecution
with GenerateEarlybirdTrainingData
object EarlybirdTrainingDataProdJob
extends OfflineAnalyticsBatchExecution
with GenerateEarlybirdTrainingData