Compare commits

...

2 Commits

Author SHA1 Message Date
ImperiumTakp
5af967d834
Merge 7be5868b01 into fb54d8b549 2023-05-22 17:37:34 -05:00
denon1
7be5868b01 Update split files into more files 2023-04-02 13:58:17 +02:00
28 changed files with 2080 additions and 1591 deletions

View File

@ -0,0 +1,112 @@
package com.twitter.simclusters_v2.scalding.embedding
import com.twitter.onboarding.relevance.candidates.thriftscala.InterestBasedUserRecommendations
import com.twitter.onboarding.relevance.candidates.thriftscala.UTTInterest
import com.twitter.onboarding.relevance.source.UttAccountRecommendationsScalaDataset
import com.twitter.scalding.Args
import com.twitter.scalding.DateRange
import com.twitter.scalding.Days
import com.twitter.scalding.Duration
import com.twitter.scalding.Execution
import com.twitter.scalding.RichDate
import com.twitter.scalding.UniqueID
import com.twitter.scalding.typed.TypedPipe
import com.twitter.scalding.typed.UnsortedGrouped
import com.twitter.scalding_internal.dalv2.DAL
import com.twitter.scalding_internal.dalv2.DALWrite.*
import com.twitter.scalding_internal.dalv2.remote_access.ExplicitLocation
import com.twitter.scalding_internal.dalv2.remote_access.ProcAtla
import com.twitter.scalding_internal.multiformat.format.keyval.KeyVal
import com.twitter.simclusters_v2.common.ModelVersions
import com.twitter.simclusters_v2.common.SimClustersEmbedding
import com.twitter.simclusters_v2.hdfs_sources.AdhocKeyValSources
import com.twitter.simclusters_v2.hdfs_sources.ProducerEmbeddingSources
import com.twitter.simclusters_v2.hdfs_sources.SemanticCoreEmbeddingsFromProducerScalaDataset
import com.twitter.simclusters_v2.scalding.embedding.common.EmbeddingUtil.*
import com.twitter.simclusters_v2.thriftscala
import com.twitter.simclusters_v2.thriftscala.EmbeddingType
import com.twitter.simclusters_v2.thriftscala.InternalId
import com.twitter.simclusters_v2.thriftscala.ModelVersion
import com.twitter.simclusters_v2.thriftscala.SimClusterWithScore
import com.twitter.simclusters_v2.thriftscala.SimClustersEmbeddingId
import com.twitter.simclusters_v2.thriftscala.TopSimClustersWithScore
import com.twitter.wtf.scalding.jobs.common.AdhocExecutionApp
import com.twitter.wtf.scalding.jobs.common.ScheduledExecutionApp
import com.twitter.wtf.scalding.jobs.common.StatsUtil.*
import java.util.TimeZone
/*
$ ./bazel bundle src/scala/com/twitter/simclusters_v2/scalding/embedding:entity_embedding_from_producer_embedding-adhoc
$ scalding remote run \
--main-class com.twitter.simclusters_v2.scalding.embedding.EntityEmbeddingFromProducerEmbeddingAdhocJob \
--target src/scala/com/twitter/simclusters_v2/scalding/embedding:entity_embedding_from_producer_embedding-adhoc \
--user recos-platform \
-- --date 2019-10-23 --model_version 20M_145K_updated
*/
object EntityEmbeddingFromProducerEmbeddingAdhocJob extends AdhocExecutionApp {
override def runOnDateRange(
args: Args
)(
implicit dateRange: DateRange,
timeZone: TimeZone,
uniqueID: UniqueID
): Execution[Unit] = {
// step 1: read in (entity, producer) pairs and remove duplicates
val topK = args.getOrElse("top_k", "100").toInt
val modelVersion = ModelVersions.toModelVersion(
args.getOrElse("model_version", ModelVersions.Model20M145KUpdated))
val entityKnownForProducers =
EntityEmbeddingFromProducerEmbeddingJob
.getNormalizedEntityProducerMatrix(dateRange.embiggen(Days(7)))
.count("num unique entity producer pairs").map {
case (entityId, producerId, score) => (producerId, (entityId, score))
}
// step 2: read in producer to simclusters embeddings
val producersEmbeddingsFollowBased =
ProducerEmbeddingSources.producerEmbeddingSourceLegacy(
EmbeddingType.ProducerFollowBasedSemanticCoreEntity,
modelVersion)(dateRange.embiggen(Days(7)))
val producersEmbeddingsFavBased =
ProducerEmbeddingSources.producerEmbeddingSourceLegacy(
EmbeddingType.ProducerFavBasedSemanticCoreEntity,
modelVersion)(dateRange.embiggen(Days(7)))
// step 3: join producer embedding with entity, producer pairs and reformat result into format [SimClustersEmbeddingId, SimClustersEmbedding]
val producerBasedEntityEmbeddingsFollowBased =
EntityEmbeddingFromProducerEmbeddingJob
.computeEmbedding(
producersEmbeddingsFollowBased,
entityKnownForProducers,
topK,
modelVersion,
EmbeddingType.ProducerFollowBasedSemanticCoreEntity).toTypedPipe.count(
"follow_based_entity_count")
val producerBasedEntityEmbeddingsFavBased =
EntityEmbeddingFromProducerEmbeddingJob
.computeEmbedding(
producersEmbeddingsFavBased,
entityKnownForProducers,
topK,
modelVersion,
EmbeddingType.ProducerFavBasedSemanticCoreEntity).toTypedPipe.count(
"fav_based_entity_count")
val producerBasedEntityEmbeddings =
producerBasedEntityEmbeddingsFollowBased ++ producerBasedEntityEmbeddingsFavBased
// step 4 write results to file
producerBasedEntityEmbeddings
.count("total_count").writeExecution(
AdhocKeyValSources.entityToClustersSource(
getHdfsPath(isAdhoc = true, isManhattanKeyVal = true, modelVersion, "producer")))
}
}

View File

@ -35,149 +35,6 @@ import com.twitter.wtf.scalding.jobs.common.ScheduledExecutionApp
import com.twitter.wtf.scalding.jobs.common.StatsUtil._ import com.twitter.wtf.scalding.jobs.common.StatsUtil._
import java.util.TimeZone import java.util.TimeZone
/*
$ ./bazel bundle src/scala/com/twitter/simclusters_v2/scalding/embedding:entity_embedding_from_producer_embedding-adhoc
$ scalding remote run \
--main-class com.twitter.simclusters_v2.scalding.embedding.EntityEmbeddingFromProducerEmbeddingAdhocJob \
--target src/scala/com/twitter/simclusters_v2/scalding/embedding:entity_embedding_from_producer_embedding-adhoc \
--user recos-platform \
-- --date 2019-10-23 --model_version 20M_145K_updated
*/
object EntityEmbeddingFromProducerEmbeddingAdhocJob extends AdhocExecutionApp {
override def runOnDateRange(
args: Args
)(
implicit dateRange: DateRange,
timeZone: TimeZone,
uniqueID: UniqueID
): Execution[Unit] = {
// step 1: read in (entity, producer) pairs and remove duplicates
val topK = args.getOrElse("top_k", "100").toInt
val modelVersion = ModelVersions.toModelVersion(
args.getOrElse("model_version", ModelVersions.Model20M145KUpdated))
val entityKnownForProducers =
EntityEmbeddingFromProducerEmbeddingJob
.getNormalizedEntityProducerMatrix(dateRange.embiggen(Days(7)))
.count("num unique entity producer pairs").map {
case (entityId, producerId, score) => (producerId, (entityId, score))
}
// step 2: read in producer to simclusters embeddings
val producersEmbeddingsFollowBased =
ProducerEmbeddingSources.producerEmbeddingSourceLegacy(
EmbeddingType.ProducerFollowBasedSemanticCoreEntity,
modelVersion)(dateRange.embiggen(Days(7)))
val producersEmbeddingsFavBased =
ProducerEmbeddingSources.producerEmbeddingSourceLegacy(
EmbeddingType.ProducerFavBasedSemanticCoreEntity,
modelVersion)(dateRange.embiggen(Days(7)))
// step 3: join producer embedding with entity, producer pairs and reformat result into format [SimClustersEmbeddingId, SimClustersEmbedding]
val producerBasedEntityEmbeddingsFollowBased =
EntityEmbeddingFromProducerEmbeddingJob
.computeEmbedding(
producersEmbeddingsFollowBased,
entityKnownForProducers,
topK,
modelVersion,
EmbeddingType.ProducerFollowBasedSemanticCoreEntity).toTypedPipe.count(
"follow_based_entity_count")
val producerBasedEntityEmbeddingsFavBased =
EntityEmbeddingFromProducerEmbeddingJob
.computeEmbedding(
producersEmbeddingsFavBased,
entityKnownForProducers,
topK,
modelVersion,
EmbeddingType.ProducerFavBasedSemanticCoreEntity).toTypedPipe.count(
"fav_based_entity_count")
val producerBasedEntityEmbeddings =
producerBasedEntityEmbeddingsFollowBased ++ producerBasedEntityEmbeddingsFavBased
// step 4 write results to file
producerBasedEntityEmbeddings
.count("total_count").writeExecution(
AdhocKeyValSources.entityToClustersSource(
getHdfsPath(isAdhoc = true, isManhattanKeyVal = true, modelVersion, "producer")))
}
}
/*
$ ./bazel bundle src/scala/com/twitter/simclusters_v2/scalding/embedding:entity_embedding_from_producer_embedding_job
$ capesospy-v2 update \
--build_locally \
--start_cron entity_embedding_from_producer_embedding_job src/scala/com/twitter/simclusters_v2/capesos_config/atla_proc3.yaml
*/
object EntityEmbeddingFromProducerEmbeddingScheduledJob extends ScheduledExecutionApp {
override def firstTime: RichDate = RichDate("2019-10-16")
override def batchIncrement: Duration = Days(7)
override def runOnDateRange(
args: Args
)(
implicit dateRange: DateRange,
timeZone: TimeZone,
uniqueID: UniqueID
): Execution[Unit] = {
// parse args: modelVersion, topK
val topK = args.getOrElse("top_k", "100").toInt
// only support dec11 now since updated model is not productionized for producer embedding
val modelVersion =
ModelVersions.toModelVersion(
args.getOrElse("model_version", ModelVersions.Model20M145KUpdated))
val entityKnownForProducers =
EntityEmbeddingFromProducerEmbeddingJob
.getNormalizedEntityProducerMatrix(dateRange.embiggen(Days(7)))
.count("num unique entity producer pairs").map {
case (entityId, producerId, score) => (producerId, (entityId, score))
}
val favBasedEmbeddings = EntityEmbeddingFromProducerEmbeddingJob
.computeEmbedding(
ProducerEmbeddingSources.producerEmbeddingSourceLegacy(
EmbeddingType.ProducerFavBasedSemanticCoreEntity,
modelVersion)(dateRange.embiggen(Days(7))),
entityKnownForProducers,
topK,
modelVersion,
EmbeddingType.ProducerFavBasedSemanticCoreEntity
).toTypedPipe.count("follow_based_entity_count")
val followBasedEmbeddings = EntityEmbeddingFromProducerEmbeddingJob
.computeEmbedding(
ProducerEmbeddingSources.producerEmbeddingSourceLegacy(
EmbeddingType.ProducerFollowBasedSemanticCoreEntity,
modelVersion)(dateRange.embiggen(Days(7))),
entityKnownForProducers,
topK,
modelVersion,
EmbeddingType.ProducerFollowBasedSemanticCoreEntity
).toTypedPipe.count("fav_based_entity_count")
val embedding = favBasedEmbeddings ++ followBasedEmbeddings
embedding
.count("total_count")
.map {
case (embeddingId, embedding) => KeyVal(embeddingId, embedding)
}.writeDALVersionedKeyValExecution(
SemanticCoreEmbeddingsFromProducerScalaDataset,
D.Suffix(getHdfsPath(isAdhoc = false, isManhattanKeyVal = true, modelVersion, "producer"))
)
}
}
private object EntityEmbeddingFromProducerEmbeddingJob { private object EntityEmbeddingFromProducerEmbeddingJob {
def computeEmbedding( def computeEmbedding(

View File

@ -0,0 +1,105 @@
package com.twitter.simclusters_v2.scalding.embedding
import com.twitter.onboarding.relevance.candidates.thriftscala.InterestBasedUserRecommendations
import com.twitter.onboarding.relevance.candidates.thriftscala.UTTInterest
import com.twitter.onboarding.relevance.source.UttAccountRecommendationsScalaDataset
import com.twitter.scalding.Args
import com.twitter.scalding.DateRange
import com.twitter.scalding.Days
import com.twitter.scalding.Duration
import com.twitter.scalding.Execution
import com.twitter.scalding.RichDate
import com.twitter.scalding.UniqueID
import com.twitter.scalding.typed.TypedPipe
import com.twitter.scalding.typed.UnsortedGrouped
import com.twitter.scalding_internal.dalv2.DAL
import com.twitter.scalding_internal.dalv2.DALWrite._
import com.twitter.scalding_internal.dalv2.remote_access.ExplicitLocation
import com.twitter.scalding_internal.dalv2.remote_access.ProcAtla
import com.twitter.scalding_internal.multiformat.format.keyval.KeyVal
import com.twitter.simclusters_v2.common.ModelVersions
import com.twitter.simclusters_v2.common.SimClustersEmbedding
import com.twitter.simclusters_v2.hdfs_sources.AdhocKeyValSources
import com.twitter.simclusters_v2.hdfs_sources.ProducerEmbeddingSources
import com.twitter.simclusters_v2.hdfs_sources.SemanticCoreEmbeddingsFromProducerScalaDataset
import com.twitter.simclusters_v2.scalding.embedding.common.EmbeddingUtil._
import com.twitter.simclusters_v2.thriftscala
import com.twitter.simclusters_v2.thriftscala.EmbeddingType
import com.twitter.simclusters_v2.thriftscala.InternalId
import com.twitter.simclusters_v2.thriftscala.ModelVersion
import com.twitter.simclusters_v2.thriftscala.SimClusterWithScore
import com.twitter.simclusters_v2.thriftscala.SimClustersEmbeddingId
import com.twitter.simclusters_v2.thriftscala.TopSimClustersWithScore
import com.twitter.wtf.scalding.jobs.common.AdhocExecutionApp
import com.twitter.wtf.scalding.jobs.common.ScheduledExecutionApp
import com.twitter.wtf.scalding.jobs.common.StatsUtil._
import java.util.TimeZone
/*
$ ./bazel bundle src/scala/com/twitter/simclusters_v2/scalding/embedding:entity_embedding_from_producer_embedding_job
$ capesospy-v2 update \
--build_locally \
--start_cron entity_embedding_from_producer_embedding_job src/scala/com/twitter/simclusters_v2/capesos_config/atla_proc3.yaml
*/
object EntityEmbeddingFromProducerEmbeddingScheduledJob extends ScheduledExecutionApp {
override def firstTime: RichDate = RichDate("2019-10-16")
override def batchIncrement: Duration = Days(7)
override def runOnDateRange(
args: Args
)(
implicit dateRange: DateRange,
timeZone: TimeZone,
uniqueID: UniqueID
): Execution[Unit] = {
// parse args: modelVersion, topK
val topK = args.getOrElse("top_k", "100").toInt
// only support dec11 now since updated model is not productionized for producer embedding
val modelVersion =
ModelVersions.toModelVersion(
args.getOrElse("model_version", ModelVersions.Model20M145KUpdated))
val entityKnownForProducers =
EntityEmbeddingFromProducerEmbeddingJob
.getNormalizedEntityProducerMatrix(dateRange.embiggen(Days(7)))
.count("num unique entity producer pairs").map {
case (entityId, producerId, score) => (producerId, (entityId, score))
}
val favBasedEmbeddings = EntityEmbeddingFromProducerEmbeddingJob
.computeEmbedding(
ProducerEmbeddingSources.producerEmbeddingSourceLegacy(
EmbeddingType.ProducerFavBasedSemanticCoreEntity,
modelVersion)(dateRange.embiggen(Days(7))),
entityKnownForProducers,
topK,
modelVersion,
EmbeddingType.ProducerFavBasedSemanticCoreEntity
).toTypedPipe.count("follow_based_entity_count")
val followBasedEmbeddings = EntityEmbeddingFromProducerEmbeddingJob
.computeEmbedding(
ProducerEmbeddingSources.producerEmbeddingSourceLegacy(
EmbeddingType.ProducerFollowBasedSemanticCoreEntity,
modelVersion)(dateRange.embiggen(Days(7))),
entityKnownForProducers,
topK,
modelVersion,
EmbeddingType.ProducerFollowBasedSemanticCoreEntity
).toTypedPipe.count("fav_based_entity_count")
val embedding = favBasedEmbeddings ++ followBasedEmbeddings
embedding
.count("total_count")
.map {
case (embeddingId, embedding) => KeyVal(embeddingId, embedding)
}.writeDALVersionedKeyValExecution(
SemanticCoreEmbeddingsFromProducerScalaDataset,
D.Suffix(getHdfsPath(isAdhoc = false, isManhattanKeyVal = true, modelVersion, "producer"))
)
}
}

View File

@ -0,0 +1,126 @@
package com.twitter.simclusters_v2.scalding.embedding
import com.twitter.dal.client.dataset.KeyValDALDataset
import com.twitter.recos.entities.thriftscala.{Entity, Hashtag, SemanticCoreEntity}
import com.twitter.scalding.*
import com.twitter.scalding_internal.dalv2.DALWrite.*
import com.twitter.scalding_internal.multiformat.format.keyval.KeyVal
import com.twitter.simclusters_v2.common.{ModelVersions, SimClustersEmbedding}
import com.twitter.simclusters_v2.hdfs_sources.*
import com.twitter.simclusters_v2.scalding.embedding.common.EmbeddingUtil.*
import com.twitter.simclusters_v2.scalding.embedding.common.{EmbeddingUtil, EntityEmbeddingUtil, SimClustersEmbeddingJob}
import com.twitter.simclusters_v2.thriftscala.{SimClustersEmbedding as ThriftSimClustersEmbedding, *}
import com.twitter.wtf.entity_real_graph.common.EntityUtil
import com.twitter.wtf.entity_real_graph.thriftscala.EntityType
import com.twitter.wtf.scalding.jobs.common.{AdhocExecutionApp, DataSources, ScheduledExecutionApp}
import java.util.TimeZone
/**
* $ ./bazel bundle src/scala/com/twitter/simclusters_v2/scalding/embedding:entity_embeddings_job-adhoc
*
* ---------------------- Deploy to atla ----------------------
* $ scalding remote run \
--main-class com.twitter.simclusters_v2.scalding.embedding.EntityToSimClustersEmbeddingAdhocApp \
--target src/scala/com/twitter/simclusters_v2/scalding/embedding:entity_embeddings_job-adhoc \
--user recos-platform \
-- --date 2019-09-09 --model-version 20M_145K_updated --entity-type SemanticCore
*/
object EntityToSimClustersEmbeddingAdhocApp extends AdhocExecutionApp {
import EmbeddingUtil.*
import EntityEmbeddingUtil.*
import EntityToSimClustersEmbeddingsJob.*
import SimClustersEmbeddingJob.*
override def runOnDateRange(
args: Args
)(
implicit dateRange: DateRange,
timeZone: TimeZone,
uniqueID: UniqueID
): Execution[Unit] = {
val jobConfig = EntityEmbeddingsJobConfig(args, isAdhoc = true)
val numReducers = args.getOrElse("m", "1000").toInt
/*
Using the ERG daily dataset in the adhoc job for quick prototyping, note that there may be
issues with scaling the job when productionizing on ERG aggregated dataset.
*/
val entityRealGraphSource = DataSources.entityRealGraphDailyDataSetSource
val entityUserMatrix: TypedPipe[(Entity, (UserId, Double))] =
(jobConfig.entityType match {
case EntityType.SemanticCore =>
getEntityUserMatrix(entityRealGraphSource, jobConfig.halfLife, EntityType.SemanticCore)
case EntityType.Hashtag =>
getEntityUserMatrix(entityRealGraphSource, jobConfig.halfLife, EntityType.Hashtag)
case _ =>
throw new IllegalArgumentException(
s"Argument [--entity-type] must be provided. Supported options [${EntityType.SemanticCore.name}, ${EntityType.Hashtag.name}]")
}).forceToDisk
val normalizedUserEntityMatrix =
getNormalizedTransposeInputMatrix(entityUserMatrix, numReducers = Some(numReducers))
//determine which data source to use based on model version
val simClustersSource = jobConfig.modelVersion match {
case ModelVersion.Model20m145kUpdated =>
InterestedInSources.simClustersInterestedInUpdatedSource(dateRange, timeZone)
case _ =>
InterestedInSources.simClustersInterestedInDec11Source(dateRange, timeZone)
}
val embeddings = computeEmbeddings(
simClustersSource,
normalizedUserEntityMatrix,
scoreExtractors,
ModelVersion.Model20m145kUpdated,
toSimClustersEmbeddingId(jobConfig.modelVersion),
numReducers = Some(numReducers * 2)
)
val topKEmbeddings =
embeddings.group
.sortedReverseTake(jobConfig.topK)(Ordering.by(_._2))
.withReducers(numReducers)
writeOutput(embeddings, topKEmbeddings, jobConfig)
}
def writeOutput(
embeddings: TypedPipe[(SimClustersEmbeddingId, (ClusterId, EmbeddingScore))],
topKEmbeddings: TypedPipe[(SimClustersEmbeddingId, Seq[(ClusterId, EmbeddingScore)])],
jobConfig: EntityEmbeddingsJobConfig
): Execution[Unit] = {
val toSimClusterEmbeddingExec = topKEmbeddings
.mapValues(SimClustersEmbedding.apply(_).toThrift)
.writeExecution(
AdhocKeyValSources.entityToClustersSource(
EntityToSimClustersEmbeddingsJob.getHdfsPath(
isAdhoc = true,
isManhattanKeyVal = true,
isReverseIndex = false,
jobConfig.modelVersion,
jobConfig.entityType)))
val fromSimClusterEmbeddingExec =
toReverseIndexSimClusterEmbedding(embeddings, jobConfig.topK)
.writeExecution(
AdhocKeyValSources.clusterToEntitiesSource(
EntityToSimClustersEmbeddingsJob.getHdfsPath(
isAdhoc = true,
isManhattanKeyVal = true,
isReverseIndex = true,
jobConfig.modelVersion,
jobConfig.entityType)))
Execution.zip(toSimClusterEmbeddingExec, fromSimClusterEmbeddingExec).unit
}
}

View File

@ -0,0 +1,169 @@
package com.twitter.simclusters_v2.scalding.embedding
import com.twitter.dal.client.dataset.KeyValDALDataset
import com.twitter.recos.entities.thriftscala.{Entity, Hashtag, SemanticCoreEntity}
import com.twitter.scalding.*
import com.twitter.scalding_internal.dalv2.DALWrite.*
import com.twitter.scalding_internal.multiformat.format.keyval.KeyVal
import com.twitter.simclusters_v2.common.{ModelVersions, SimClustersEmbedding}
import com.twitter.simclusters_v2.hdfs_sources.*
import com.twitter.simclusters_v2.scalding.embedding.common.EmbeddingUtil.*
import com.twitter.simclusters_v2.scalding.embedding.common.{EmbeddingUtil, EntityEmbeddingUtil, SimClustersEmbeddingJob}
import com.twitter.simclusters_v2.thriftscala.{SimClustersEmbedding as ThriftSimClustersEmbedding, *}
import com.twitter.wtf.entity_real_graph.common.EntityUtil
import com.twitter.wtf.entity_real_graph.thriftscala.EntityType
import com.twitter.wtf.scalding.jobs.common.{AdhocExecutionApp, DataSources, ScheduledExecutionApp}
import java.util.TimeZone
trait EntityToSimClustersEmbeddingApp extends ScheduledExecutionApp {
import EmbeddingUtil.*
import EntityEmbeddingUtil.*
import EntityToSimClustersEmbeddingsJob.*
import SimClustersEmbeddingJob.*
override val firstTime: RichDate = RichDate("2023-01-01")
override val batchIncrement: Duration = Days(7)
override def runOnDateRange(
args: Args
)(
implicit dateRange: DateRange,
timeZone: TimeZone,
uniqueID: UniqueID
): Execution[Unit] = {
val jobConfig = EntityEmbeddingsJobConfig(args, isAdhoc = false)
val embeddingsDataset = EntityEmbeddingsSources.getEntityEmbeddingsDataset(
jobConfig.entityType,
ModelVersions.toKnownForModelVersion(jobConfig.modelVersion)
)
val reverseIndexEmbeddingsDataset =
EntityEmbeddingsSources.getReverseIndexedEntityEmbeddingsDataset(
jobConfig.entityType,
ModelVersions.toKnownForModelVersion(jobConfig.modelVersion)
)
val entityRealGraphSource =
DataSources.entityRealGraphAggregationDataSetSource(dateRange.embiggen(Days(7)))
val entityUserMatrix: TypedPipe[(Entity, (UserId, Double))] =
getEntityUserMatrix(
entityRealGraphSource,
jobConfig.halfLife,
jobConfig.entityType).forceToDisk
val normalizedUserEntityMatrix = getNormalizedTransposeInputMatrix(entityUserMatrix)
val simClustersEmbedding = jobConfig.modelVersion match {
case ModelVersion.Model20m145k2020 =>
val simClustersSource2020 =
InterestedInSources.simClustersInterestedIn2020Source(dateRange, timeZone)
computeEmbeddings(
simClustersSource2020,
normalizedUserEntityMatrix,
scoreExtractors,
ModelVersion.Model20m145k2020,
toSimClustersEmbeddingId(ModelVersion.Model20m145k2020)
)
case modelVersion =>
throw new IllegalArgumentException(s"Model Version ${modelVersion.name} not supported")
}
val topKEmbeddings =
simClustersEmbedding.group.sortedReverseTake(jobConfig.topK)(Ordering.by(_._2))
val simClustersEmbeddingsExec =
writeOutput(
simClustersEmbedding,
topKEmbeddings,
jobConfig,
embeddingsDataset,
reverseIndexEmbeddingsDataset)
// We don't support embeddingsLite for the 2020 model version.
val embeddingsLiteExec = if (jobConfig.modelVersion == ModelVersion.Model20m145kUpdated) {
topKEmbeddings
.collect {
case (
SimClustersEmbeddingId(
EmbeddingType.FavBasedSematicCoreEntity,
ModelVersion.Model20m145kUpdated,
InternalId.EntityId(entityId)),
clustersWithScores) =>
entityId -> clustersWithScores
}
.flatMap {
case (entityId, clustersWithScores) =>
clustersWithScores.map {
case (clusterId, score) => EmbeddingsLite(entityId, clusterId, score)
}
case _ => Nil
}.writeDALSnapshotExecution(
SimclustersV2EmbeddingsLiteScalaDataset,
D.Daily,
D.Suffix(embeddingsLitePath(ModelVersion.Model20m145kUpdated, "fav_based")),
D.EBLzo(),
dateRange.end)
} else {
Execution.unit
}
Execution
.zip(simClustersEmbeddingsExec, embeddingsLiteExec).unit
}
private def writeOutput(
embeddings: TypedPipe[(SimClustersEmbeddingId, (ClusterId, EmbeddingScore))],
topKEmbeddings: TypedPipe[(SimClustersEmbeddingId, Seq[(ClusterId, EmbeddingScore)])],
jobConfig: EntityEmbeddingsJobConfig,
clusterEmbeddingsDataset: KeyValDALDataset[
KeyVal[SimClustersEmbeddingId, ThriftSimClustersEmbedding]
],
entityEmbeddingsDataset: KeyValDALDataset[KeyVal[SimClustersEmbeddingId, InternalIdEmbedding]]
): Execution[Unit] = {
val toSimClustersEmbeddings =
topKEmbeddings
.mapValues(SimClustersEmbedding.apply(_).toThrift)
.map {
case (entityId, topSimClusters) => KeyVal(entityId, topSimClusters)
}
.writeDALVersionedKeyValExecution(
clusterEmbeddingsDataset,
D.Suffix(
EntityToSimClustersEmbeddingsJob.getHdfsPath(
isAdhoc = false,
isManhattanKeyVal = true,
isReverseIndex = false,
jobConfig.modelVersion,
jobConfig.entityType))
)
val fromSimClustersEmbeddings =
toReverseIndexSimClusterEmbedding(embeddings, jobConfig.topK)
.map {
case (embeddingId, internalIdsWithScore) =>
KeyVal(embeddingId, internalIdsWithScore)
}
.writeDALVersionedKeyValExecution(
entityEmbeddingsDataset,
D.Suffix(
EntityToSimClustersEmbeddingsJob.getHdfsPath(
isAdhoc = false,
isManhattanKeyVal = true,
isReverseIndex = true,
jobConfig.modelVersion,
jobConfig.entityType))
)
Execution.zip(toSimClustersEmbeddings, fromSimClustersEmbeddings).unit
}
}

View File

@ -25,271 +25,6 @@ import com.twitter.wtf.scalding.jobs.common.DataSources
import com.twitter.wtf.scalding.jobs.common.ScheduledExecutionApp import com.twitter.wtf.scalding.jobs.common.ScheduledExecutionApp
import java.util.TimeZone import java.util.TimeZone
/**
* $ ./bazel bundle src/scala/com/twitter/simclusters_v2/scalding/embedding:entity_embeddings_job-adhoc
*
* ---------------------- Deploy to atla ----------------------
* $ scalding remote run \
--main-class com.twitter.simclusters_v2.scalding.embedding.EntityToSimClustersEmbeddingAdhocApp \
--target src/scala/com/twitter/simclusters_v2/scalding/embedding:entity_embeddings_job-adhoc \
--user recos-platform \
-- --date 2019-09-09 --model-version 20M_145K_updated --entity-type SemanticCore
*/
object EntityToSimClustersEmbeddingAdhocApp extends AdhocExecutionApp {
import EmbeddingUtil._
import EntityEmbeddingUtil._
import EntityToSimClustersEmbeddingsJob._
import EntityUtil._
import SimClustersEmbeddingJob._
def writeOutput(
embeddings: TypedPipe[(SimClustersEmbeddingId, (ClusterId, EmbeddingScore))],
topKEmbeddings: TypedPipe[(SimClustersEmbeddingId, Seq[(ClusterId, EmbeddingScore)])],
jobConfig: EntityEmbeddingsJobConfig
): Execution[Unit] = {
val toSimClusterEmbeddingExec = topKEmbeddings
.mapValues(SimClustersEmbedding.apply(_).toThrift)
.writeExecution(
AdhocKeyValSources.entityToClustersSource(
EntityToSimClustersEmbeddingsJob.getHdfsPath(
isAdhoc = true,
isManhattanKeyVal = true,
isReverseIndex = false,
jobConfig.modelVersion,
jobConfig.entityType)))
val fromSimClusterEmbeddingExec =
toReverseIndexSimClusterEmbedding(embeddings, jobConfig.topK)
.writeExecution(
AdhocKeyValSources.clusterToEntitiesSource(
EntityToSimClustersEmbeddingsJob.getHdfsPath(
isAdhoc = true,
isManhattanKeyVal = true,
isReverseIndex = true,
jobConfig.modelVersion,
jobConfig.entityType)))
Execution.zip(toSimClusterEmbeddingExec, fromSimClusterEmbeddingExec).unit
}
override def runOnDateRange(
args: Args
)(
implicit dateRange: DateRange,
timeZone: TimeZone,
uniqueID: UniqueID
): Execution[Unit] = {
val jobConfig = EntityEmbeddingsJobConfig(args, isAdhoc = true)
val numReducers = args.getOrElse("m", "1000").toInt
/*
Using the ERG daily dataset in the adhoc job for quick prototyping, note that there may be
issues with scaling the job when productionizing on ERG aggregated dataset.
*/
val entityRealGraphSource = DataSources.entityRealGraphDailyDataSetSource
val entityUserMatrix: TypedPipe[(Entity, (UserId, Double))] =
(jobConfig.entityType match {
case EntityType.SemanticCore =>
getEntityUserMatrix(entityRealGraphSource, jobConfig.halfLife, EntityType.SemanticCore)
case EntityType.Hashtag =>
getEntityUserMatrix(entityRealGraphSource, jobConfig.halfLife, EntityType.Hashtag)
case _ =>
throw new IllegalArgumentException(
s"Argument [--entity-type] must be provided. Supported options [${EntityType.SemanticCore.name}, ${EntityType.Hashtag.name}]")
}).forceToDisk
val normalizedUserEntityMatrix =
getNormalizedTransposeInputMatrix(entityUserMatrix, numReducers = Some(numReducers))
//determine which data source to use based on model version
val simClustersSource = jobConfig.modelVersion match {
case ModelVersion.Model20m145kUpdated =>
InterestedInSources.simClustersInterestedInUpdatedSource(dateRange, timeZone)
case _ =>
InterestedInSources.simClustersInterestedInDec11Source(dateRange, timeZone)
}
val embeddings = computeEmbeddings(
simClustersSource,
normalizedUserEntityMatrix,
scoreExtractors,
ModelVersion.Model20m145kUpdated,
toSimClustersEmbeddingId(jobConfig.modelVersion),
numReducers = Some(numReducers * 2)
)
val topKEmbeddings =
embeddings.group
.sortedReverseTake(jobConfig.topK)(Ordering.by(_._2))
.withReducers(numReducers)
writeOutput(embeddings, topKEmbeddings, jobConfig)
}
}
/**
* $ ./bazel bundle src/scala/com/twitter/simclusters_v2/scalding/embedding:semantic_core_entity_embeddings_2020_job
* $ capesospy-v2 update \
--build_locally \
--start_cron semantic_core_entity_embeddings_2020_job src/scala/com/twitter/simclusters_v2/capesos_config/atla_proc3.yaml
*/
object SemanticCoreEntityEmbeddings2020App extends EntityToSimClustersEmbeddingApp
trait EntityToSimClustersEmbeddingApp extends ScheduledExecutionApp {
import EmbeddingUtil._
import EntityEmbeddingUtil._
import EntityToSimClustersEmbeddingsJob._
import EntityUtil._
import SimClustersEmbeddingJob._
override val firstTime: RichDate = RichDate("2023-01-01")
override val batchIncrement: Duration = Days(7)
private def writeOutput(
embeddings: TypedPipe[(SimClustersEmbeddingId, (ClusterId, EmbeddingScore))],
topKEmbeddings: TypedPipe[(SimClustersEmbeddingId, Seq[(ClusterId, EmbeddingScore)])],
jobConfig: EntityEmbeddingsJobConfig,
clusterEmbeddingsDataset: KeyValDALDataset[
KeyVal[SimClustersEmbeddingId, ThriftSimClustersEmbedding]
],
entityEmbeddingsDataset: KeyValDALDataset[KeyVal[SimClustersEmbeddingId, InternalIdEmbedding]]
): Execution[Unit] = {
val toSimClustersEmbeddings =
topKEmbeddings
.mapValues(SimClustersEmbedding.apply(_).toThrift)
.map {
case (entityId, topSimClusters) => KeyVal(entityId, topSimClusters)
}
.writeDALVersionedKeyValExecution(
clusterEmbeddingsDataset,
D.Suffix(
EntityToSimClustersEmbeddingsJob.getHdfsPath(
isAdhoc = false,
isManhattanKeyVal = true,
isReverseIndex = false,
jobConfig.modelVersion,
jobConfig.entityType))
)
val fromSimClustersEmbeddings =
toReverseIndexSimClusterEmbedding(embeddings, jobConfig.topK)
.map {
case (embeddingId, internalIdsWithScore) =>
KeyVal(embeddingId, internalIdsWithScore)
}
.writeDALVersionedKeyValExecution(
entityEmbeddingsDataset,
D.Suffix(
EntityToSimClustersEmbeddingsJob.getHdfsPath(
isAdhoc = false,
isManhattanKeyVal = true,
isReverseIndex = true,
jobConfig.modelVersion,
jobConfig.entityType))
)
Execution.zip(toSimClustersEmbeddings, fromSimClustersEmbeddings).unit
}
override def runOnDateRange(
args: Args
)(
implicit dateRange: DateRange,
timeZone: TimeZone,
uniqueID: UniqueID
): Execution[Unit] = {
val jobConfig = EntityEmbeddingsJobConfig(args, isAdhoc = false)
val embeddingsDataset = EntityEmbeddingsSources.getEntityEmbeddingsDataset(
jobConfig.entityType,
ModelVersions.toKnownForModelVersion(jobConfig.modelVersion)
)
val reverseIndexEmbeddingsDataset =
EntityEmbeddingsSources.getReverseIndexedEntityEmbeddingsDataset(
jobConfig.entityType,
ModelVersions.toKnownForModelVersion(jobConfig.modelVersion)
)
val entityRealGraphSource =
DataSources.entityRealGraphAggregationDataSetSource(dateRange.embiggen(Days(7)))
val entityUserMatrix: TypedPipe[(Entity, (UserId, Double))] =
getEntityUserMatrix(
entityRealGraphSource,
jobConfig.halfLife,
jobConfig.entityType).forceToDisk
val normalizedUserEntityMatrix = getNormalizedTransposeInputMatrix(entityUserMatrix)
val simClustersEmbedding = jobConfig.modelVersion match {
case ModelVersion.Model20m145k2020 =>
val simClustersSource2020 =
InterestedInSources.simClustersInterestedIn2020Source(dateRange, timeZone)
computeEmbeddings(
simClustersSource2020,
normalizedUserEntityMatrix,
scoreExtractors,
ModelVersion.Model20m145k2020,
toSimClustersEmbeddingId(ModelVersion.Model20m145k2020)
)
case modelVersion =>
throw new IllegalArgumentException(s"Model Version ${modelVersion.name} not supported")
}
val topKEmbeddings =
simClustersEmbedding.group.sortedReverseTake(jobConfig.topK)(Ordering.by(_._2))
val simClustersEmbeddingsExec =
writeOutput(
simClustersEmbedding,
topKEmbeddings,
jobConfig,
embeddingsDataset,
reverseIndexEmbeddingsDataset)
// We don't support embeddingsLite for the 2020 model version.
val embeddingsLiteExec = if (jobConfig.modelVersion == ModelVersion.Model20m145kUpdated) {
topKEmbeddings
.collect {
case (
SimClustersEmbeddingId(
EmbeddingType.FavBasedSematicCoreEntity,
ModelVersion.Model20m145kUpdated,
InternalId.EntityId(entityId)),
clustersWithScores) =>
entityId -> clustersWithScores
}
.flatMap {
case (entityId, clustersWithScores) =>
clustersWithScores.map {
case (clusterId, score) => EmbeddingsLite(entityId, clusterId, score)
}
case _ => Nil
}.writeDALSnapshotExecution(
SimclustersV2EmbeddingsLiteScalaDataset,
D.Daily,
D.Suffix(embeddingsLitePath(ModelVersion.Model20m145kUpdated, "fav_based")),
D.EBLzo(),
dateRange.end)
} else {
Execution.unit
}
Execution
.zip(simClustersEmbeddingsExec, embeddingsLiteExec).unit
}
}
object EntityToSimClustersEmbeddingsJob { object EntityToSimClustersEmbeddingsJob {
def toSimClustersEmbeddingId( def toSimClustersEmbeddingId(

View File

@ -1,32 +1,18 @@
package com.twitter.simclusters_v2.scalding.embedding package com.twitter.simclusters_v2.scalding.embedding
import com.twitter.dal.client.dataset.KeyValDALDataset import com.twitter.dal.client.dataset.{KeyValDALDataset, SnapshotDALDataset}
import com.twitter.dal.client.dataset.SnapshotDALDataset import com.twitter.scalding.*
import com.twitter.scalding.DateRange
import com.twitter.scalding.Days
import com.twitter.scalding.UniqueID
import com.twitter.scalding._
import com.twitter.scalding.typed.TypedPipe import com.twitter.scalding.typed.TypedPipe
import com.twitter.scalding_internal.dalv2.DALWrite.D import com.twitter.scalding_internal.dalv2.DALWrite.{D, ExplicitEndTime, WriteExtension}
import com.twitter.scalding_internal.dalv2.DALWrite.ExplicitEndTime
import com.twitter.scalding_internal.dalv2.DALWrite.WriteExtension
import com.twitter.scalding_internal.job.RequiredBinaryComparators.ordSer import com.twitter.scalding_internal.job.RequiredBinaryComparators.ordSer
import com.twitter.scalding_internal.multiformat.format.keyval.KeyVal import com.twitter.scalding_internal.multiformat.format.keyval.KeyVal
import com.twitter.simclusters_v2.common.Country import com.twitter.simclusters_v2.common.*
import com.twitter.simclusters_v2.common.Language import com.twitter.simclusters_v2.hdfs_sources.{InterestedInSources, SimclustersV2GlobalLanguageEmbeddingScalaDataset, SimclustersV2GlobalLanguageEmbeddingThriftScalaDataset}
import com.twitter.simclusters_v2.common.Timestamp
import com.twitter.simclusters_v2.common.TweetId
import com.twitter.simclusters_v2.common.UserId
import com.twitter.simclusters_v2.hdfs_sources.InterestedInSources
import com.twitter.simclusters_v2.scalding.embedding.common.ExternalDataSources import com.twitter.simclusters_v2.scalding.embedding.common.ExternalDataSources
import com.twitter.simclusters_v2.thriftscala.ClustersUserIsInterestedIn
import com.twitter.simclusters_v2.thriftscala.InternalId.ClusterId import com.twitter.simclusters_v2.thriftscala.InternalId.ClusterId
import com.twitter.simclusters_v2.thriftscala.ModelVersion import com.twitter.simclusters_v2.thriftscala.{ClustersUserIsInterestedIn, LanguageToClusters, ModelVersion, UserToInterestedInClusterScores}
import com.twitter.simclusters_v2.thriftscala.UserToInterestedInClusterScores
import com.twitter.wtf.scalding.jobs.common.ScheduledExecutionApp import com.twitter.wtf.scalding.jobs.common.ScheduledExecutionApp
import com.twitter.simclusters_v2.hdfs_sources.SimclustersV2GlobalLanguageEmbeddingScalaDataset
import com.twitter.simclusters_v2.hdfs_sources.SimclustersV2GlobalLanguageEmbeddingThriftScalaDataset
import com.twitter.simclusters_v2.thriftscala.LanguageToClusters
import java.util.TimeZone import java.util.TimeZone
/** /**

View File

@ -0,0 +1,130 @@
package com.twitter.simclusters_v2.scalding.embedding
import com.twitter.dal.client.dataset.KeyValDALDataset
import com.twitter.recos.entities.thriftscala.{Entity, Hashtag, SemanticCoreEntity}
import com.twitter.scalding.*
import com.twitter.scalding_internal.dalv2.DALWrite.*
import com.twitter.scalding_internal.multiformat.format.keyval.KeyVal
import com.twitter.simclusters_v2.common.{ModelVersions, SimClustersEmbedding}
import com.twitter.simclusters_v2.hdfs_sources.{AdhocKeyValSources, EntityEmbeddingsSources, InterestedInSources}
import com.twitter.simclusters_v2.hdfs_sources.presto_hdfs_sources.*
import com.twitter.simclusters_v2.scalding.embedding.LocaleEntitySimClustersEmbeddingsJob.*
import com.twitter.simclusters_v2.scalding.embedding.common.EmbeddingUtil.*
import com.twitter.simclusters_v2.scalding.embedding.common.EntityEmbeddingUtil.*
import com.twitter.simclusters_v2.scalding.embedding.common.{EmbeddingUtil, ExternalDataSources}
import com.twitter.simclusters_v2.scalding.embedding.common.SimClustersEmbeddingJob.*
import com.twitter.simclusters_v2.thriftscala.{SimClustersEmbedding as ThriftSimClustersEmbedding, *}
import com.twitter.wtf.entity_real_graph.common.EntityUtil
import com.twitter.wtf.entity_real_graph.thriftscala.{Edge, EntityType}
import com.twitter.wtf.scalding.jobs.common.{AdhocExecutionApp, DataSources, ScheduledExecutionApp}
import java.util.TimeZone
/**
* $ ./bazel bundle src/scala/com/twitter/simclusters_v2/scalding/embedding:entity_per_language_embeddings_job-adhoc
*
* ---------------------- Deploy to atla ----------------------
* $ scalding remote run \
--main-class com.twitter.simclusters_v2.scalding.embedding.LocaleEntitySimClustersEmbeddingAdhocApp \
--target src/scala/com/twitter/simclusters_v2/scalding/embedding:entity_per_language_embeddings_job-adhoc \
--user recos-platform \
-- --date 2019-12-17 --model-version 20M_145K_updated --entity-type SemanticCore
*/
object LocaleEntitySimClustersEmbeddingAdhocApp extends AdhocExecutionApp {
// Import implicits
override def runOnDateRange(
args: Args
)(
implicit dateRange: DateRange,
timeZone: TimeZone,
uniqueID: UniqueID
): Execution[Unit] = {
val jobConfig = EntityEmbeddingsJobConfig(args, isAdhoc = true)
val numReducers = args.getOrElse("m", "2000").toInt
/*
Can use the ERG daily dataset in the adhoc job for quick prototyping, note that there may be
issues with scaling the job when productionizing on ERG aggregated dataset.
*/
val userEntityMatrix: TypedPipe[(UserId, (Entity, Double))] =
getUserEntityMatrix(
jobConfig,
DataSources.entityRealGraphAggregationDataSetSource(dateRange.embiggen(Days(7))),
Some(ExternalDataSources.uttEntitiesSource())
).forceToDisk
//determine which data source to use based on model version
val simClustersSource = jobConfig.modelVersion match {
case ModelVersion.Model20m145kUpdated =>
InterestedInSources.simClustersInterestedInUpdatedSource(dateRange, timeZone)
case modelVersion =>
throw new IllegalArgumentException(
s"SimClusters model version not supported ${modelVersion.name}")
}
val entityPerLanguage = userEntityMatrix.join(ExternalDataSources.userSource).map {
case (userId, ((entity, score), (_, language))) =>
((entity, language), (userId, score))
}
val normalizedUserEntityMatrix =
getNormalizedTransposeInputMatrix(entityPerLanguage, numReducers = Some(numReducers))
val embeddings = computeEmbeddings[(Entity, String)](
simClustersSource,
normalizedUserEntityMatrix,
scoreExtractors,
ModelVersion.Model20m145kUpdated,
toSimClustersEmbeddingId(jobConfig.modelVersion),
numReducers = Some(numReducers * 2)
)
val topKEmbeddings =
embeddings.group
.sortedReverseTake(jobConfig.topK)(Ordering.by(_._2))
.withReducers(numReducers)
writeOutput(embeddings, topKEmbeddings, jobConfig)
}
def writeOutput(
embeddings: TypedPipe[(SimClustersEmbeddingId, (ClusterId, EmbeddingScore))],
topKEmbeddings: TypedPipe[(SimClustersEmbeddingId, Seq[(ClusterId, EmbeddingScore)])],
jobConfig: EntityEmbeddingsJobConfig
): Execution[Unit] = {
val toSimClusterEmbeddingExec = topKEmbeddings
.mapValues(SimClustersEmbedding.apply(_).toThrift)
.writeExecution(
AdhocKeyValSources.entityToClustersSource(
LocaleEntitySimClustersEmbeddingsJob.getHdfsPath(
isAdhoc = true,
isManhattanKeyVal = true,
isReverseIndex = false,
isLogFav = false,
jobConfig.modelVersion,
jobConfig.entityType)))
val fromSimClusterEmbeddingExec =
toReverseIndexSimClusterEmbedding(embeddings, jobConfig.topK)
.writeExecution(
AdhocKeyValSources.clusterToEntitiesSource(
LocaleEntitySimClustersEmbeddingsJob.getHdfsPath(
isAdhoc = true,
isManhattanKeyVal = true,
isReverseIndex = true,
isLogFav = false,
jobConfig.modelVersion,
jobConfig.entityType)))
Execution.zip(toSimClusterEmbeddingExec, fromSimClusterEmbeddingExec).unit
}
}

View File

@ -0,0 +1,215 @@
package com.twitter.simclusters_v2.scalding.embedding
import com.twitter.dal.client.dataset.KeyValDALDataset
import com.twitter.recos.entities.thriftscala.{Entity, Hashtag, SemanticCoreEntity}
import com.twitter.scalding.*
import com.twitter.scalding_internal.dalv2.DALWrite.*
import com.twitter.scalding_internal.multiformat.format.keyval.KeyVal
import com.twitter.simclusters_v2.common.{ModelVersions, SimClustersEmbedding}
import com.twitter.simclusters_v2.hdfs_sources.{AdhocKeyValSources, EntityEmbeddingsSources, InterestedInSources}
import com.twitter.simclusters_v2.hdfs_sources.presto_hdfs_sources.*
import com.twitter.simclusters_v2.scalding.embedding.LocaleEntitySimClustersEmbeddingsJob.*
import com.twitter.simclusters_v2.scalding.embedding.common.EmbeddingUtil.*
import com.twitter.simclusters_v2.scalding.embedding.common.EntityEmbeddingUtil.*
import com.twitter.simclusters_v2.scalding.embedding.common.{EmbeddingUtil, ExternalDataSources}
import com.twitter.simclusters_v2.scalding.embedding.common.SimClustersEmbeddingJob.*
import com.twitter.simclusters_v2.thriftscala.{SimClustersEmbedding as ThriftSimClustersEmbedding, *}
import com.twitter.wtf.entity_real_graph.common.EntityUtil
import com.twitter.wtf.entity_real_graph.thriftscala.{Edge, EntityType}
import com.twitter.wtf.scalding.jobs.common.{AdhocExecutionApp, DataSources, ScheduledExecutionApp}
import java.util.TimeZone
/**
* $ ./bazel bundle src/scala/com/twitter/simclusters_v2/scalding/embedding:semantic_core_entity_embeddings_per_language_job
* $ capesospy-v2 update \
--build_locally \
--start_cron semantic_core_entity_embeddings_per_language_job src/scala/com/twitter/simclusters_v2/capesos_config/atla_proc3.yaml
*/
object LocaleEntitySimClustersEmbeddingScheduledApp extends ScheduledExecutionApp {
// Import implicits
import EmbeddingUtil.*
override val firstTime: RichDate = RichDate("2019-10-22")
override val batchIncrement: Duration = Days(7)
override def runOnDateRange(
args: Args
)(
implicit dateRange: DateRange,
timeZone: TimeZone,
uniqueID: UniqueID
): Execution[Unit] = {
val jobConfig = EntityEmbeddingsJobConfig(args, isAdhoc = false)
val embeddingsDataset = EntityEmbeddingsSources.getEntityEmbeddingsDataset(
jobConfig.entityType,
ModelVersions.toKnownForModelVersion(jobConfig.modelVersion),
isEmbeddingsPerLocale = true
)
val reverseIndexEmbeddingsDataset =
EntityEmbeddingsSources.getReverseIndexedEntityEmbeddingsDataset(
jobConfig.entityType,
ModelVersions.toKnownForModelVersion(jobConfig.modelVersion),
isEmbeddingsPerLocale = true
)
val userEntityMatrix: TypedPipe[(UserId, (Entity, Double))] =
getUserEntityMatrix(
jobConfig,
DataSources.entityRealGraphAggregationDataSetSource(dateRange.embiggen(Days(7))),
Some(ExternalDataSources.uttEntitiesSource())
).forceToDisk
//determine which data source to use based on model version
val simClustersSource = jobConfig.modelVersion match {
case ModelVersion.Model20m145kUpdated =>
InterestedInSources.simClustersInterestedInUpdatedSource(dateRange, timeZone)
case modelVersion =>
throw new IllegalArgumentException(
s"SimClusters model version not supported ${modelVersion.name}")
}
val entityPerLanguage = userEntityMatrix.join(ExternalDataSources.userSource).map {
case (userId, ((entity, score), (_, language))) =>
((entity, language), (userId, score))
}
val normalizedUserEntityMatrix =
getNormalizedTransposeInputMatrix(entityPerLanguage, numReducers = Some(3000))
val simClustersEmbedding = jobConfig.modelVersion match {
case ModelVersion.Model20m145kUpdated =>
computeEmbeddings(
simClustersSource,
normalizedUserEntityMatrix,
scoreExtractors,
ModelVersion.Model20m145kUpdated,
toSimClustersEmbeddingId(ModelVersion.Model20m145kUpdated),
numReducers = Some(8000)
)
case modelVersion =>
throw new IllegalArgumentException(
s"SimClusters model version not supported ${modelVersion.name}")
}
val topKEmbeddings =
simClustersEmbedding.group.sortedReverseTake(jobConfig.topK)(Ordering.by(_._2))
writeOutput(
simClustersEmbedding,
topKEmbeddings,
jobConfig,
embeddingsDataset,
reverseIndexEmbeddingsDataset)
}
private def writeOutput(
embeddings: TypedPipe[(SimClustersEmbeddingId, (ClusterId, EmbeddingScore))],
topKEmbeddings: TypedPipe[(SimClustersEmbeddingId, Seq[(ClusterId, EmbeddingScore)])],
jobConfig: EntityEmbeddingsJobConfig,
clusterEmbeddingsDataset: KeyValDALDataset[
KeyVal[SimClustersEmbeddingId, ThriftSimClustersEmbedding]
],
entityEmbeddingsDataset: KeyValDALDataset[KeyVal[SimClustersEmbeddingId, InternalIdEmbedding]]
)(
implicit dateRange: DateRange,
timeZone: TimeZone
): Execution[Unit] = {
val thriftSimClustersEmbedding = topKEmbeddings
.mapValues(SimClustersEmbedding.apply(_).toThrift)
val writeSimClustersEmbeddingKeyValDataset =
thriftSimClustersEmbedding
.map {
case (entityId, topSimClusters) => KeyVal(entityId, topSimClusters)
}
.writeDALVersionedKeyValExecution(
clusterEmbeddingsDataset,
D.Suffix(
LocaleEntitySimClustersEmbeddingsJob.getHdfsPath(
isAdhoc = false,
isManhattanKeyVal = true,
isReverseIndex = false,
isLogFav = false,
jobConfig.modelVersion,
jobConfig.entityType))
)
val writeSimClustersEmbeddingDataset = thriftSimClustersEmbedding
.map {
case (embeddingId, embedding) => SimClustersEmbeddingWithId(embeddingId, embedding)
}
.writeDALSnapshotExecution(
SemanticCorePerLanguageSimclustersEmbeddingsPrestoScalaDataset,
D.Daily,
D.Suffix(
LocaleEntitySimClustersEmbeddingsJob.getHdfsPath(
isAdhoc = false,
isManhattanKeyVal = false,
isReverseIndex = false,
isLogFav = false,
jobConfig.modelVersion,
jobConfig.entityType)),
D.EBLzo(),
dateRange.end
)
val thriftReversedSimclustersEmbeddings =
toReverseIndexSimClusterEmbedding(embeddings, jobConfig.topK)
val writeReverseSimClustersEmbeddingKeyValDataset =
thriftReversedSimclustersEmbeddings
.map {
case (embeddingId, internalIdsWithScore) =>
KeyVal(embeddingId, internalIdsWithScore)
}
.writeDALVersionedKeyValExecution(
entityEmbeddingsDataset,
D.Suffix(
LocaleEntitySimClustersEmbeddingsJob.getHdfsPath(
isAdhoc = false,
isManhattanKeyVal = true,
isReverseIndex = true,
isLogFav = false,
jobConfig.modelVersion,
jobConfig.entityType))
)
val writeReverseSimClustersEmbeddingDataset =
thriftReversedSimclustersEmbeddings
.map {
case (embeddingId, embedding) => InternalIdEmbeddingWithId(embeddingId, embedding)
}.writeDALSnapshotExecution(
ReverseIndexSemanticCorePerLanguageSimclustersEmbeddingsPrestoScalaDataset,
D.Daily,
D.Suffix(
LocaleEntitySimClustersEmbeddingsJob.getHdfsPath(
isAdhoc = false,
isManhattanKeyVal = false,
isReverseIndex = true,
isLogFav = false,
jobConfig.modelVersion,
jobConfig.entityType)),
D.EBLzo(),
dateRange.end
)
Execution
.zip(
writeSimClustersEmbeddingDataset,
writeSimClustersEmbeddingKeyValDataset,
writeReverseSimClustersEmbeddingDataset,
writeReverseSimClustersEmbeddingKeyValDataset
).unit
}
}

View File

@ -0,0 +1,91 @@
package com.twitter.simclusters_v2.scalding.embedding
import com.twitter.bijection.{Bufferable, Injection}
import com.twitter.recos.entities.thriftscala.{Entity, SemanticCoreEntity}
import com.twitter.scalding.*
import com.twitter.scalding_internal.dalv2.DALWrite.*
import com.twitter.scalding_internal.multiformat.format.keyval.KeyVal
import com.twitter.simclusters_v2.common.*
import com.twitter.simclusters_v2.hdfs_sources.{AdhocKeyValSources, EntityEmbeddingsSources}
import com.twitter.simclusters_v2.scalding.common.matrix.{SparseMatrix, SparseRowMatrix}
import com.twitter.simclusters_v2.scalding.embedding.common.EmbeddingUtil.ClusterId
import com.twitter.simclusters_v2.scalding.embedding.common.{EmbeddingUtil, ExternalDataSources, SimClustersEmbeddingBaseJob}
import com.twitter.simclusters_v2.thriftscala.*
import com.twitter.wtf.entity_real_graph.thriftscala.{Edge, FeatureName}
import com.twitter.wtf.scalding.jobs.common.{AdhocExecutionApp, DataSources, ScheduledExecutionApp}
import java.util.TimeZone
/**
* $ ./bazel bundle src/scala/com/twitter/simclusters_v2/scalding/embedding:locale_entity_simclusters_embedding_v2-adhoc
*
* $ scalding remote run \
--main-class com.twitter.simclusters_v2.scalding.embedding.LocaleEntitySimClustersEmbeddingV2AdhocApp \
--target src/scala/com/twitter/simclusters_v2/scalding/embedding:locale_entity_simclusters_embedding_v2-adhoc \
--user recos-platform --reducers 2000\
-- --date 2020-04-06
*/
object LocaleEntitySimClustersEmbeddingV2AdhocApp
extends LocaleEntitySimClustersEmbeddingV2Job
with AdhocExecutionApp {
override def writeNounToClustersIndex(
output: TypedPipe[(LocaleEntity, Seq[(ClusterId, Double)])]
)(
implicit dateRange: DateRange,
timeZone: TimeZone,
uniqueID: UniqueID
): Execution[Unit] = {
output
.map {
case ((entityId, lang), clustersWithScores) =>
SimClustersEmbeddingId(
EmbeddingType.LogFavBasedLocaleSemanticCoreEntity,
ModelVersion.Model20m145kUpdated,
InternalId.LocaleEntityId(LocaleEntityId(entityId, lang))
) -> SimClustersEmbedding(clustersWithScores).toThrift
}.writeExecution(
AdhocKeyValSources.entityToClustersSource(
EmbeddingUtil.getHdfsPath(
isAdhoc = true,
isManhattanKeyVal = true,
ModelVersion.Model20m145kUpdated,
pathSuffix = "log_fav_erg_based_embeddings")))
}
override def writeClusterToNounsIndex(
output: TypedPipe[(ClusterId, Seq[(LocaleEntity, Double)])]
)(
implicit dateRange: DateRange,
timeZone: TimeZone,
uniqueID: UniqueID
): Execution[Unit] = {
output
.map {
case (clusterId, nounsWithScore) =>
SimClustersEmbeddingId(
EmbeddingType.LogFavBasedLocaleSemanticCoreEntity,
ModelVersion.Model20m145kUpdated,
InternalId.ClusterId(clusterId)
) ->
InternalIdEmbedding(nounsWithScore.map {
case ((entityId, lang), score) =>
InternalIdWithScore(
InternalId.LocaleEntityId(LocaleEntityId(entityId, lang)),
score)
})
}
.writeExecution(
AdhocKeyValSources.clusterToEntitiesSource(
EmbeddingUtil.getHdfsPath(
isAdhoc = true,
isManhattanKeyVal = true,
ModelVersion.Model20m145kUpdated,
pathSuffix = "reverse_index_log_fav_erg_based_embeddings")))
}
}

View File

@ -27,162 +27,6 @@ import com.twitter.wtf.entity_real_graph.thriftscala.{Edge, FeatureName}
import com.twitter.wtf.scalding.jobs.common.{AdhocExecutionApp, DataSources, ScheduledExecutionApp} import com.twitter.wtf.scalding.jobs.common.{AdhocExecutionApp, DataSources, ScheduledExecutionApp}
import java.util.TimeZone import java.util.TimeZone
/**
* Scheduled production job which generates topic embeddings per locale based on Entity Real Graph.
*
* V2 Uses the log transform of the ERG favScores and the SimCluster InterestedIn scores.
*
* $ ./bazel bundle src/scala/com/twitter/simclusters_v2/scalding/embedding:locale_entity_simclusters_embedding_v2
* $ capesospy-v2 update \
--build_locally \
--start_cron locale_entity_simclusters_embedding_v2 src/scala/com/twitter/simclusters_v2/capesos_config/atla_proc3.yaml
*/
object LocaleEntitySimClustersEmbeddingV2ScheduledApp
extends LocaleEntitySimClustersEmbeddingV2Job
with ScheduledExecutionApp {
override val firstTime: RichDate = RichDate("2020-04-08")
override val batchIncrement: Duration = Days(1)
override def writeNounToClustersIndex(
output: TypedPipe[(LocaleEntity, Seq[(ClusterId, Double)])]
)(
implicit dateRange: DateRange,
timeZone: TimeZone,
uniqueID: UniqueID
): Execution[Unit] = {
output
.map {
case ((entityId, lang), clustersWithScores) =>
KeyVal(
SimClustersEmbeddingId(
EmbeddingType.LogFavBasedLocaleSemanticCoreEntity,
ModelVersion.Model20m145kUpdated,
InternalId.LocaleEntityId(LocaleEntityId(entityId, lang))
),
SimClustersEmbedding(clustersWithScores).toThrift
)
}
.writeDALVersionedKeyValExecution(
EntityEmbeddingsSources.LogFavSemanticCorePerLanguageSimClustersEmbeddingsDataset,
D.Suffix(
EmbeddingUtil.getHdfsPath(
isAdhoc = false,
isManhattanKeyVal = true,
ModelVersion.Model20m145kUpdated,
pathSuffix = "log_fav_erg_based_embeddings"))
)
}
override def writeClusterToNounsIndex(
output: TypedPipe[(ClusterId, Seq[(LocaleEntity, Double)])]
)(
implicit dateRange: DateRange,
timeZone: TimeZone,
uniqueID: UniqueID
): Execution[Unit] = {
output
.map {
case (clusterId, nounsWithScore) =>
KeyVal(
SimClustersEmbeddingId(
EmbeddingType.LogFavBasedLocaleSemanticCoreEntity,
ModelVersion.Model20m145kUpdated,
InternalId.ClusterId(clusterId)
),
InternalIdEmbedding(nounsWithScore.map {
case ((entityId, lang), score) =>
InternalIdWithScore(
InternalId.LocaleEntityId(LocaleEntityId(entityId, lang)),
score)
})
)
}
.writeDALVersionedKeyValExecution(
EntityEmbeddingsSources.LogFavReverseIndexSemanticCorePerLanguageSimClustersEmbeddingsDataset,
D.Suffix(
EmbeddingUtil.getHdfsPath(
isAdhoc = false,
isManhattanKeyVal = true,
ModelVersion.Model20m145kUpdated,
pathSuffix = "reverse_index_log_fav_erg_based_embeddings"))
)
}
}
/**
* $ ./bazel bundle src/scala/com/twitter/simclusters_v2/scalding/embedding:locale_entity_simclusters_embedding_v2-adhoc
*
* $ scalding remote run \
--main-class com.twitter.simclusters_v2.scalding.embedding.LocaleEntitySimClustersEmbeddingV2AdhocApp \
--target src/scala/com/twitter/simclusters_v2/scalding/embedding:locale_entity_simclusters_embedding_v2-adhoc \
--user recos-platform --reducers 2000\
-- --date 2020-04-06
*/
object LocaleEntitySimClustersEmbeddingV2AdhocApp
extends LocaleEntitySimClustersEmbeddingV2Job
with AdhocExecutionApp {
override def writeNounToClustersIndex(
output: TypedPipe[(LocaleEntity, Seq[(ClusterId, Double)])]
)(
implicit dateRange: DateRange,
timeZone: TimeZone,
uniqueID: UniqueID
): Execution[Unit] = {
output
.map {
case ((entityId, lang), clustersWithScores) =>
SimClustersEmbeddingId(
EmbeddingType.LogFavBasedLocaleSemanticCoreEntity,
ModelVersion.Model20m145kUpdated,
InternalId.LocaleEntityId(LocaleEntityId(entityId, lang))
) -> SimClustersEmbedding(clustersWithScores).toThrift
}.writeExecution(
AdhocKeyValSources.entityToClustersSource(
EmbeddingUtil.getHdfsPath(
isAdhoc = true,
isManhattanKeyVal = true,
ModelVersion.Model20m145kUpdated,
pathSuffix = "log_fav_erg_based_embeddings")))
}
override def writeClusterToNounsIndex(
output: TypedPipe[(ClusterId, Seq[(LocaleEntity, Double)])]
)(
implicit dateRange: DateRange,
timeZone: TimeZone,
uniqueID: UniqueID
): Execution[Unit] = {
output
.map {
case (clusterId, nounsWithScore) =>
SimClustersEmbeddingId(
EmbeddingType.LogFavBasedLocaleSemanticCoreEntity,
ModelVersion.Model20m145kUpdated,
InternalId.ClusterId(clusterId)
) ->
InternalIdEmbedding(nounsWithScore.map {
case ((entityId, lang), score) =>
InternalIdWithScore(
InternalId.LocaleEntityId(LocaleEntityId(entityId, lang)),
score)
})
}
.writeExecution(
AdhocKeyValSources.clusterToEntitiesSource(
EmbeddingUtil.getHdfsPath(
isAdhoc = true,
isManhattanKeyVal = true,
ModelVersion.Model20m145kUpdated,
pathSuffix = "reverse_index_log_fav_erg_based_embeddings")))
}
}
trait LocaleEntitySimClustersEmbeddingV2Job extends SimClustersEmbeddingBaseJob[LocaleEntity] { trait LocaleEntitySimClustersEmbeddingV2Job extends SimClustersEmbeddingBaseJob[LocaleEntity] {

View File

@ -0,0 +1,106 @@
package com.twitter.simclusters_v2.scalding.embedding
import com.twitter.bijection.{Bufferable, Injection}
import com.twitter.recos.entities.thriftscala.{Entity, SemanticCoreEntity}
import com.twitter.scalding.*
import com.twitter.scalding_internal.dalv2.DALWrite.*
import com.twitter.scalding_internal.multiformat.format.keyval.KeyVal
import com.twitter.simclusters_v2.common.*
import com.twitter.simclusters_v2.hdfs_sources.{AdhocKeyValSources, EntityEmbeddingsSources}
import com.twitter.simclusters_v2.scalding.common.matrix.{SparseMatrix, SparseRowMatrix}
import com.twitter.simclusters_v2.scalding.embedding.common.EmbeddingUtil.ClusterId
import com.twitter.simclusters_v2.scalding.embedding.common.{EmbeddingUtil, ExternalDataSources, SimClustersEmbeddingBaseJob}
import com.twitter.simclusters_v2.thriftscala.*
import com.twitter.wtf.entity_real_graph.thriftscala.{Edge, FeatureName}
import com.twitter.wtf.scalding.jobs.common.{AdhocExecutionApp, DataSources, ScheduledExecutionApp}
import java.util.TimeZone
/**
* Scheduled production job which generates topic embeddings per locale based on Entity Real Graph.
*
* V2 Uses the log transform of the ERG favScores and the SimCluster InterestedIn scores.
*
* $ ./bazel bundle src/scala/com/twitter/simclusters_v2/scalding/embedding:locale_entity_simclusters_embedding_v2
* $ capesospy-v2 update \
--build_locally \
--start_cron locale_entity_simclusters_embedding_v2 src/scala/com/twitter/simclusters_v2/capesos_config/atla_proc3.yaml
*/
object LocaleEntitySimClustersEmbeddingV2ScheduledApp
extends LocaleEntitySimClustersEmbeddingV2Job
with ScheduledExecutionApp {
override val firstTime: RichDate = RichDate("2020-04-08")
override val batchIncrement: Duration = Days(1)
override def writeNounToClustersIndex(
output: TypedPipe[(LocaleEntity, Seq[(ClusterId, Double)])]
)(
implicit dateRange: DateRange,
timeZone: TimeZone,
uniqueID: UniqueID
): Execution[Unit] = {
output
.map {
case ((entityId, lang), clustersWithScores) =>
KeyVal(
SimClustersEmbeddingId(
EmbeddingType.LogFavBasedLocaleSemanticCoreEntity,
ModelVersion.Model20m145kUpdated,
InternalId.LocaleEntityId(LocaleEntityId(entityId, lang))
),
SimClustersEmbedding(clustersWithScores).toThrift
)
}
.writeDALVersionedKeyValExecution(
EntityEmbeddingsSources.LogFavSemanticCorePerLanguageSimClustersEmbeddingsDataset,
D.Suffix(
EmbeddingUtil.getHdfsPath(
isAdhoc = false,
isManhattanKeyVal = true,
ModelVersion.Model20m145kUpdated,
pathSuffix = "log_fav_erg_based_embeddings"))
)
}
override def writeClusterToNounsIndex(
output: TypedPipe[(ClusterId, Seq[(LocaleEntity, Double)])]
)(
implicit dateRange: DateRange,
timeZone: TimeZone,
uniqueID: UniqueID
): Execution[Unit] = {
output
.map {
case (clusterId, nounsWithScore) =>
KeyVal(
SimClustersEmbeddingId(
EmbeddingType.LogFavBasedLocaleSemanticCoreEntity,
ModelVersion.Model20m145kUpdated,
InternalId.ClusterId(clusterId)
),
InternalIdEmbedding(nounsWithScore.map {
case ((entityId, lang), score) =>
InternalIdWithScore(
InternalId.LocaleEntityId(LocaleEntityId(entityId, lang)),
score)
})
)
}
.writeDALVersionedKeyValExecution(
EntityEmbeddingsSources.LogFavReverseIndexSemanticCorePerLanguageSimClustersEmbeddingsDataset,
D.Suffix(
EmbeddingUtil.getHdfsPath(
isAdhoc = false,
isManhattanKeyVal = true,
ModelVersion.Model20m145kUpdated,
pathSuffix = "reverse_index_log_fav_erg_based_embeddings"))
)
}
}

View File

@ -31,305 +31,6 @@ import com.twitter.wtf.scalding.jobs.common.DataSources
import com.twitter.wtf.scalding.jobs.common.ScheduledExecutionApp import com.twitter.wtf.scalding.jobs.common.ScheduledExecutionApp
import java.util.TimeZone import java.util.TimeZone
/**
* $ ./bazel bundle src/scala/com/twitter/simclusters_v2/scalding/embedding:entity_per_language_embeddings_job-adhoc
*
* ---------------------- Deploy to atla ----------------------
* $ scalding remote run \
--main-class com.twitter.simclusters_v2.scalding.embedding.LocaleEntitySimClustersEmbeddingAdhocApp \
--target src/scala/com/twitter/simclusters_v2/scalding/embedding:entity_per_language_embeddings_job-adhoc \
--user recos-platform \
-- --date 2019-12-17 --model-version 20M_145K_updated --entity-type SemanticCore
*/
object LocaleEntitySimClustersEmbeddingAdhocApp extends AdhocExecutionApp {
// Import implicits
import EntityUtil._
def writeOutput(
embeddings: TypedPipe[(SimClustersEmbeddingId, (ClusterId, EmbeddingScore))],
topKEmbeddings: TypedPipe[(SimClustersEmbeddingId, Seq[(ClusterId, EmbeddingScore)])],
jobConfig: EntityEmbeddingsJobConfig
): Execution[Unit] = {
val toSimClusterEmbeddingExec = topKEmbeddings
.mapValues(SimClustersEmbedding.apply(_).toThrift)
.writeExecution(
AdhocKeyValSources.entityToClustersSource(
LocaleEntitySimClustersEmbeddingsJob.getHdfsPath(
isAdhoc = true,
isManhattanKeyVal = true,
isReverseIndex = false,
isLogFav = false,
jobConfig.modelVersion,
jobConfig.entityType)))
val fromSimClusterEmbeddingExec =
toReverseIndexSimClusterEmbedding(embeddings, jobConfig.topK)
.writeExecution(
AdhocKeyValSources.clusterToEntitiesSource(
LocaleEntitySimClustersEmbeddingsJob.getHdfsPath(
isAdhoc = true,
isManhattanKeyVal = true,
isReverseIndex = true,
isLogFav = false,
jobConfig.modelVersion,
jobConfig.entityType)))
Execution.zip(toSimClusterEmbeddingExec, fromSimClusterEmbeddingExec).unit
}
override def runOnDateRange(
args: Args
)(
implicit dateRange: DateRange,
timeZone: TimeZone,
uniqueID: UniqueID
): Execution[Unit] = {
val jobConfig = EntityEmbeddingsJobConfig(args, isAdhoc = true)
val numReducers = args.getOrElse("m", "2000").toInt
/*
Can use the ERG daily dataset in the adhoc job for quick prototyping, note that there may be
issues with scaling the job when productionizing on ERG aggregated dataset.
*/
val userEntityMatrix: TypedPipe[(UserId, (Entity, Double))] =
getUserEntityMatrix(
jobConfig,
DataSources.entityRealGraphAggregationDataSetSource(dateRange.embiggen(Days(7))),
Some(ExternalDataSources.uttEntitiesSource())
).forceToDisk
//determine which data source to use based on model version
val simClustersSource = jobConfig.modelVersion match {
case ModelVersion.Model20m145kUpdated =>
InterestedInSources.simClustersInterestedInUpdatedSource(dateRange, timeZone)
case modelVersion =>
throw new IllegalArgumentException(
s"SimClusters model version not supported ${modelVersion.name}")
}
val entityPerLanguage = userEntityMatrix.join(ExternalDataSources.userSource).map {
case (userId, ((entity, score), (_, language))) =>
((entity, language), (userId, score))
}
val normalizedUserEntityMatrix =
getNormalizedTransposeInputMatrix(entityPerLanguage, numReducers = Some(numReducers))
val embeddings = computeEmbeddings[(Entity, String)](
simClustersSource,
normalizedUserEntityMatrix,
scoreExtractors,
ModelVersion.Model20m145kUpdated,
toSimClustersEmbeddingId(jobConfig.modelVersion),
numReducers = Some(numReducers * 2)
)
val topKEmbeddings =
embeddings.group
.sortedReverseTake(jobConfig.topK)(Ordering.by(_._2))
.withReducers(numReducers)
writeOutput(embeddings, topKEmbeddings, jobConfig)
}
}
/**
* $ ./bazel bundle src/scala/com/twitter/simclusters_v2/scalding/embedding:semantic_core_entity_embeddings_per_language_job
* $ capesospy-v2 update \
--build_locally \
--start_cron semantic_core_entity_embeddings_per_language_job src/scala/com/twitter/simclusters_v2/capesos_config/atla_proc3.yaml
*/
object LocaleEntitySimClustersEmbeddingScheduledApp extends ScheduledExecutionApp {
// Import implicits
import EmbeddingUtil._
import EntityUtil._
override val firstTime: RichDate = RichDate("2019-10-22")
override val batchIncrement: Duration = Days(7)
private def writeOutput(
embeddings: TypedPipe[(SimClustersEmbeddingId, (ClusterId, EmbeddingScore))],
topKEmbeddings: TypedPipe[(SimClustersEmbeddingId, Seq[(ClusterId, EmbeddingScore)])],
jobConfig: EntityEmbeddingsJobConfig,
clusterEmbeddingsDataset: KeyValDALDataset[
KeyVal[SimClustersEmbeddingId, ThriftSimClustersEmbedding]
],
entityEmbeddingsDataset: KeyValDALDataset[KeyVal[SimClustersEmbeddingId, InternalIdEmbedding]]
)(
implicit dateRange: DateRange,
timeZone: TimeZone
): Execution[Unit] = {
val thriftSimClustersEmbedding = topKEmbeddings
.mapValues(SimClustersEmbedding.apply(_).toThrift)
val writeSimClustersEmbeddingKeyValDataset =
thriftSimClustersEmbedding
.map {
case (entityId, topSimClusters) => KeyVal(entityId, topSimClusters)
}
.writeDALVersionedKeyValExecution(
clusterEmbeddingsDataset,
D.Suffix(
LocaleEntitySimClustersEmbeddingsJob.getHdfsPath(
isAdhoc = false,
isManhattanKeyVal = true,
isReverseIndex = false,
isLogFav = false,
jobConfig.modelVersion,
jobConfig.entityType))
)
val writeSimClustersEmbeddingDataset = thriftSimClustersEmbedding
.map {
case (embeddingId, embedding) => SimClustersEmbeddingWithId(embeddingId, embedding)
}
.writeDALSnapshotExecution(
SemanticCorePerLanguageSimclustersEmbeddingsPrestoScalaDataset,
D.Daily,
D.Suffix(
LocaleEntitySimClustersEmbeddingsJob.getHdfsPath(
isAdhoc = false,
isManhattanKeyVal = false,
isReverseIndex = false,
isLogFav = false,
jobConfig.modelVersion,
jobConfig.entityType)),
D.EBLzo(),
dateRange.end
)
val thriftReversedSimclustersEmbeddings =
toReverseIndexSimClusterEmbedding(embeddings, jobConfig.topK)
val writeReverseSimClustersEmbeddingKeyValDataset =
thriftReversedSimclustersEmbeddings
.map {
case (embeddingId, internalIdsWithScore) =>
KeyVal(embeddingId, internalIdsWithScore)
}
.writeDALVersionedKeyValExecution(
entityEmbeddingsDataset,
D.Suffix(
LocaleEntitySimClustersEmbeddingsJob.getHdfsPath(
isAdhoc = false,
isManhattanKeyVal = true,
isReverseIndex = true,
isLogFav = false,
jobConfig.modelVersion,
jobConfig.entityType))
)
val writeReverseSimClustersEmbeddingDataset =
thriftReversedSimclustersEmbeddings
.map {
case (embeddingId, embedding) => InternalIdEmbeddingWithId(embeddingId, embedding)
}.writeDALSnapshotExecution(
ReverseIndexSemanticCorePerLanguageSimclustersEmbeddingsPrestoScalaDataset,
D.Daily,
D.Suffix(
LocaleEntitySimClustersEmbeddingsJob.getHdfsPath(
isAdhoc = false,
isManhattanKeyVal = false,
isReverseIndex = true,
isLogFav = false,
jobConfig.modelVersion,
jobConfig.entityType)),
D.EBLzo(),
dateRange.end
)
Execution
.zip(
writeSimClustersEmbeddingDataset,
writeSimClustersEmbeddingKeyValDataset,
writeReverseSimClustersEmbeddingDataset,
writeReverseSimClustersEmbeddingKeyValDataset
).unit
}
override def runOnDateRange(
args: Args
)(
implicit dateRange: DateRange,
timeZone: TimeZone,
uniqueID: UniqueID
): Execution[Unit] = {
val jobConfig = EntityEmbeddingsJobConfig(args, isAdhoc = false)
val embeddingsDataset = EntityEmbeddingsSources.getEntityEmbeddingsDataset(
jobConfig.entityType,
ModelVersions.toKnownForModelVersion(jobConfig.modelVersion),
isEmbeddingsPerLocale = true
)
val reverseIndexEmbeddingsDataset =
EntityEmbeddingsSources.getReverseIndexedEntityEmbeddingsDataset(
jobConfig.entityType,
ModelVersions.toKnownForModelVersion(jobConfig.modelVersion),
isEmbeddingsPerLocale = true
)
val userEntityMatrix: TypedPipe[(UserId, (Entity, Double))] =
getUserEntityMatrix(
jobConfig,
DataSources.entityRealGraphAggregationDataSetSource(dateRange.embiggen(Days(7))),
Some(ExternalDataSources.uttEntitiesSource())
).forceToDisk
//determine which data source to use based on model version
val simClustersSource = jobConfig.modelVersion match {
case ModelVersion.Model20m145kUpdated =>
InterestedInSources.simClustersInterestedInUpdatedSource(dateRange, timeZone)
case modelVersion =>
throw new IllegalArgumentException(
s"SimClusters model version not supported ${modelVersion.name}")
}
val entityPerLanguage = userEntityMatrix.join(ExternalDataSources.userSource).map {
case (userId, ((entity, score), (_, language))) =>
((entity, language), (userId, score))
}
val normalizedUserEntityMatrix =
getNormalizedTransposeInputMatrix(entityPerLanguage, numReducers = Some(3000))
val simClustersEmbedding = jobConfig.modelVersion match {
case ModelVersion.Model20m145kUpdated =>
computeEmbeddings(
simClustersSource,
normalizedUserEntityMatrix,
scoreExtractors,
ModelVersion.Model20m145kUpdated,
toSimClustersEmbeddingId(ModelVersion.Model20m145kUpdated),
numReducers = Some(8000)
)
case modelVersion =>
throw new IllegalArgumentException(
s"SimClusters model version not supported ${modelVersion.name}")
}
val topKEmbeddings =
simClustersEmbedding.group.sortedReverseTake(jobConfig.topK)(Ordering.by(_._2))
writeOutput(
simClustersEmbedding,
topKEmbeddings,
jobConfig,
embeddingsDataset,
reverseIndexEmbeddingsDataset)
}
}
object LocaleEntitySimClustersEmbeddingsJob { object LocaleEntitySimClustersEmbeddingsJob {
def getUserEntityMatrix( def getUserEntityMatrix(

View File

@ -12,525 +12,6 @@ import com.twitter.simclusters_v2.thriftscala._
import com.twitter.wtf.scalding.jobs.common.{AdhocExecutionApp, ScheduledExecutionApp} import com.twitter.wtf.scalding.jobs.common.{AdhocExecutionApp, ScheduledExecutionApp}
import java.util.TimeZone import java.util.TimeZone
object ProducerEmbeddingsFromInterestedInBatchAppUtil {
import ProducerEmbeddingsFromInterestedIn._
val user = System.getenv("USER")
val rootPath: String = s"/user/$user/manhattan_sequence_files"
// Helps speed up the multiplication step which can get very big
val numReducersForMatrixMultiplication: Int = 12000
/**
* Given the producer x cluster matrix, key by producer / cluster individually, and write output
* to individual DAL datasets
*/
def writeOutput(
producerClusterEmbedding: TypedPipe[((ClusterId, UserId), Double)],
producerTopKEmbeddingsDataset: KeyValDALDataset[KeyVal[Long, TopSimClustersWithScore]],
clusterTopKProducersDataset: KeyValDALDataset[
KeyVal[PersistedFullClusterId, TopProducersWithScore]
],
producerTopKEmbeddingsPath: String,
clusterTopKProducersPath: String,
modelVersion: ModelVersion
): Execution[Unit] = {
val keyedByProducer =
toSimClusterEmbedding(producerClusterEmbedding, topKClustersToKeep, modelVersion)
.map { case (userId, clusters) => KeyVal(userId, clusters) }
.writeDALVersionedKeyValExecution(
producerTopKEmbeddingsDataset,
D.Suffix(producerTopKEmbeddingsPath)
)
val keyedBySimCluster = fromSimClusterEmbedding(
producerClusterEmbedding,
topKUsersToKeep,
modelVersion
).map {
case (clusterId, topProducers) => KeyVal(clusterId, topProducersToThrift(topProducers))
}
.writeDALVersionedKeyValExecution(
clusterTopKProducersDataset,
D.Suffix(clusterTopKProducersPath)
)
Execution.zip(keyedByProducer, keyedBySimCluster).unit
}
}
/**
* Base class for Fav based producer embeddings. Helps reuse the code for different model versions
*/
trait ProducerEmbeddingsFromInterestedInByFavScoreBase extends ScheduledExecutionApp {
import ProducerEmbeddingsFromInterestedIn._
import ProducerEmbeddingsFromInterestedInBatchAppUtil._
def modelVersion: ModelVersion
val producerTopKEmbeddingsByFavScorePathPrefix: String =
"/producer_top_k_simcluster_embeddings_by_fav_score_"
val clusterTopKProducersByFavScorePathPrefix: String =
"/simcluster_embedding_top_k_producers_by_fav_score_"
val minNumFavers: Int = minNumFaversForProducer
def producerTopKSimclusterEmbeddingsByFavScoreDataset: KeyValDALDataset[
KeyVal[Long, TopSimClustersWithScore]
]
def simclusterEmbeddingTopKProducersByFavScoreDataset: KeyValDALDataset[
KeyVal[PersistedFullClusterId, TopProducersWithScore]
]
def getInterestedInFn: (DateRange, TimeZone) => TypedPipe[(Long, ClustersUserIsInterestedIn)]
override def runOnDateRange(
args: Args
)(
implicit dateRange: DateRange,
timeZone: TimeZone,
uniqueID: UniqueID
): Execution[Unit] = {
val producerTopKEmbeddingsByFavScorePathUpdated: String =
rootPath + producerTopKEmbeddingsByFavScorePathPrefix + ModelVersions
.toKnownForModelVersion(modelVersion)
val clusterTopKProducersByFavScorePathUpdated: String =
rootPath + clusterTopKProducersByFavScorePathPrefix + ModelVersions
.toKnownForModelVersion(modelVersion)
val producerClusterEmbeddingByFavScore = getProducerClusterEmbedding(
getInterestedInFn(dateRange.embiggen(Days(5)), timeZone),
DataSources.userUserNormalizedGraphSource,
DataSources.userNormsAndCounts,
userToProducerFavScore,
userToClusterFavScore, // Fav score
_.faverCount.exists(_ > minNumFavers),
numReducersForMatrixMultiplication,
modelVersion,
cosineSimilarityThreshold
).forceToDisk
writeOutput(
producerClusterEmbeddingByFavScore,
producerTopKSimclusterEmbeddingsByFavScoreDataset,
simclusterEmbeddingTopKProducersByFavScoreDataset,
producerTopKEmbeddingsByFavScorePathUpdated,
clusterTopKProducersByFavScorePathUpdated,
modelVersion
)
}
}
/**
* Base class for Follow based producer embeddings. Helps reuse the code for different model versions
*/
trait ProducerEmbeddingsFromInterestedInByFollowScoreBase extends ScheduledExecutionApp {
import ProducerEmbeddingsFromInterestedIn._
import ProducerEmbeddingsFromInterestedInBatchAppUtil._
def modelVersion: ModelVersion
val producerTopKEmbeddingsByFollowScorePathPrefix: String =
"/producer_top_k_simcluster_embeddings_by_follow_score_"
val clusterTopKProducersByFollowScorePathPrefix: String =
"/simcluster_embedding_top_k_producers_by_follow_score_"
def producerTopKSimclusterEmbeddingsByFollowScoreDataset: KeyValDALDataset[
KeyVal[Long, TopSimClustersWithScore]
]
def simclusterEmbeddingTopKProducersByFollowScoreDataset: KeyValDALDataset[
KeyVal[PersistedFullClusterId, TopProducersWithScore]
]
def getInterestedInFn: (DateRange, TimeZone) => TypedPipe[(Long, ClustersUserIsInterestedIn)]
val minNumFollowers: Int = minNumFollowersForProducer
override def runOnDateRange(
args: Args
)(
implicit dateRange: DateRange,
timeZone: TimeZone,
uniqueID: UniqueID
): Execution[Unit] = {
val producerTopKEmbeddingsByFollowScorePath: String =
rootPath + producerTopKEmbeddingsByFollowScorePathPrefix + ModelVersions
.toKnownForModelVersion(modelVersion)
val clusterTopKProducersByFollowScorePath: String =
rootPath + clusterTopKProducersByFollowScorePathPrefix + ModelVersions
.toKnownForModelVersion(modelVersion)
val producerClusterEmbeddingByFollowScore = getProducerClusterEmbedding(
getInterestedInFn(dateRange.embiggen(Days(5)), timeZone),
DataSources.userUserNormalizedGraphSource,
DataSources.userNormsAndCounts,
userToProducerFollowScore,
userToClusterFollowScore, // Follow score
_.followerCount.exists(_ > minNumFollowers),
numReducersForMatrixMultiplication,
modelVersion,
cosineSimilarityThreshold
).forceToDisk
writeOutput(
producerClusterEmbeddingByFollowScore,
producerTopKSimclusterEmbeddingsByFollowScoreDataset,
simclusterEmbeddingTopKProducersByFollowScoreDataset,
producerTopKEmbeddingsByFollowScorePath,
clusterTopKProducersByFollowScorePath,
modelVersion
)
}
}
/**
capesospy-v2 update --build_locally --start_cron \
--start_cron producer_embeddings_from_interested_in_by_fav_score \
src/scala/com/twitter/simclusters_v2/capesos_config/atla_proc3.yaml
*/
object ProducerEmbeddingsFromInterestedInByFavScoreBatchApp
extends ProducerEmbeddingsFromInterestedInByFavScoreBase {
override def modelVersion: ModelVersion = ModelVersion.Model20m145kUpdated
override def getInterestedInFn: (
DateRange,
TimeZone
) => TypedPipe[(UserId, ClustersUserIsInterestedIn)] =
InterestedInSources.simClustersInterestedInUpdatedSource
override val firstTime: RichDate = RichDate("2019-09-10")
override val batchIncrement: Duration = Days(7)
override def producerTopKSimclusterEmbeddingsByFavScoreDataset: KeyValDALDataset[
KeyVal[Long, TopSimClustersWithScore]
] =
ProducerTopKSimclusterEmbeddingsByFavScoreUpdatedScalaDataset
override def simclusterEmbeddingTopKProducersByFavScoreDataset: KeyValDALDataset[
KeyVal[PersistedFullClusterId, TopProducersWithScore]
] =
SimclusterEmbeddingTopKProducersByFavScoreUpdatedScalaDataset
}
/**
capesospy-v2 update --build_locally --start_cron \
--start_cron producer_embeddings_from_interested_in_by_fav_score_2020 \
src/scala/com/twitter/simclusters_v2/capesos_config/atla_proc3.yaml
*/
object ProducerEmbeddingsFromInterestedInByFavScore2020BatchApp
extends ProducerEmbeddingsFromInterestedInByFavScoreBase {
override def modelVersion: ModelVersion = ModelVersion.Model20m145k2020
override def getInterestedInFn: (
DateRange,
TimeZone
) => TypedPipe[(UserId, ClustersUserIsInterestedIn)] =
InterestedInSources.simClustersInterestedIn2020Source
override val firstTime: RichDate = RichDate("2021-03-01")
override val batchIncrement: Duration = Days(7)
override def producerTopKSimclusterEmbeddingsByFavScoreDataset: KeyValDALDataset[
KeyVal[Long, TopSimClustersWithScore]
] =
ProducerTopKSimclusterEmbeddingsByFavScore2020ScalaDataset
override def simclusterEmbeddingTopKProducersByFavScoreDataset: KeyValDALDataset[
KeyVal[PersistedFullClusterId, TopProducersWithScore]
] =
SimclusterEmbeddingTopKProducersByFavScore2020ScalaDataset
}
/**
capesospy-v2 update --build_locally --start_cron \
--start_cron producer_embeddings_from_interested_in_by_fav_score_dec11 \
src/scala/com/twitter/simclusters_v2/capesos_config/atla_proc3.yaml
*/
object ProducerEmbeddingsFromInterestedInByFavScoreDec11BatchApp
extends ProducerEmbeddingsFromInterestedInByFavScoreBase {
override def modelVersion: ModelVersion = ModelVersion.Model20m145kDec11
override def getInterestedInFn: (
DateRange,
TimeZone
) => TypedPipe[(UserId, ClustersUserIsInterestedIn)] =
InterestedInSources.simClustersInterestedInDec11Source
override val firstTime: RichDate = RichDate("2019-11-18")
override val batchIncrement: Duration = Days(7)
override def producerTopKSimclusterEmbeddingsByFavScoreDataset: KeyValDALDataset[
KeyVal[Long, TopSimClustersWithScore]
] =
ProducerTopKSimclusterEmbeddingsByFavScoreScalaDataset
override def simclusterEmbeddingTopKProducersByFavScoreDataset: KeyValDALDataset[
KeyVal[PersistedFullClusterId, TopProducersWithScore]
] =
SimclusterEmbeddingTopKProducersByFavScoreScalaDataset
}
/**
capesospy-v2 update --build_locally --start_cron \
--start_cron producer_embeddings_from_interested_in_by_follow_score \
src/scala/com/twitter/simclusters_v2/capesos_config/atla_proc3.yaml
*/
object ProducerEmbeddingsFromInterestedInByFollowScoreBatchApp
extends ProducerEmbeddingsFromInterestedInByFollowScoreBase {
override def modelVersion: ModelVersion = ModelVersion.Model20m145kUpdated
override def getInterestedInFn: (
DateRange,
TimeZone
) => TypedPipe[(UserId, ClustersUserIsInterestedIn)] =
InterestedInSources.simClustersInterestedInUpdatedSource
override val firstTime: RichDate = RichDate("2019-09-10")
override val batchIncrement: Duration = Days(7)
override def producerTopKSimclusterEmbeddingsByFollowScoreDataset: KeyValDALDataset[
KeyVal[Long, TopSimClustersWithScore]
] =
ProducerTopKSimclusterEmbeddingsByFollowScoreUpdatedScalaDataset
override def simclusterEmbeddingTopKProducersByFollowScoreDataset: KeyValDALDataset[
KeyVal[PersistedFullClusterId, TopProducersWithScore]
] =
SimclusterEmbeddingTopKProducersByFollowScoreUpdatedScalaDataset
}
/**
capesospy-v2 update --build_locally --start_cron \
--start_cron producer_embeddings_from_interested_in_by_follow_score_2020 \
src/scala/com/twitter/simclusters_v2/capesos_config/atla_proc3.yaml
*/
object ProducerEmbeddingsFromInterestedInByFollowScore2020BatchApp
extends ProducerEmbeddingsFromInterestedInByFollowScoreBase {
override def modelVersion: ModelVersion = ModelVersion.Model20m145k2020
override def getInterestedInFn: (
DateRange,
TimeZone
) => TypedPipe[(UserId, ClustersUserIsInterestedIn)] =
InterestedInSources.simClustersInterestedIn2020Source
override val firstTime: RichDate = RichDate("2021-03-01")
override val batchIncrement: Duration = Days(7)
override def producerTopKSimclusterEmbeddingsByFollowScoreDataset: KeyValDALDataset[
KeyVal[Long, TopSimClustersWithScore]
] =
ProducerTopKSimclusterEmbeddingsByFollowScore2020ScalaDataset
override def simclusterEmbeddingTopKProducersByFollowScoreDataset: KeyValDALDataset[
KeyVal[PersistedFullClusterId, TopProducersWithScore]
] =
SimclusterEmbeddingTopKProducersByFollowScore2020ScalaDataset
}
/**
capesospy-v2 update --build_locally --start_cron \
--start_cron producer_embeddings_from_interested_in_by_follow_score_dec11 \
src/scala/com/twitter/simclusters_v2/capesos_config/atla_proc3.yaml
*/
object ProducerEmbeddingsFromInterestedInByFollowScoreDec11BatchApp
extends ProducerEmbeddingsFromInterestedInByFollowScoreBase {
override def modelVersion: ModelVersion = ModelVersion.Model20m145kDec11
override def getInterestedInFn: (
DateRange,
TimeZone
) => TypedPipe[(UserId, ClustersUserIsInterestedIn)] =
InterestedInSources.simClustersInterestedInDec11Source
override val firstTime: RichDate = RichDate("2019-11-18")
override val batchIncrement: Duration = Days(7)
override def producerTopKSimclusterEmbeddingsByFollowScoreDataset: KeyValDALDataset[
KeyVal[Long, TopSimClustersWithScore]
] =
ProducerTopKSimclusterEmbeddingsByFollowScoreScalaDataset
override def simclusterEmbeddingTopKProducersByFollowScoreDataset: KeyValDALDataset[
KeyVal[PersistedFullClusterId, TopProducersWithScore]
] =
SimclusterEmbeddingTopKProducersByFollowScoreScalaDataset
}
/**
* Adhoc job to calculate producer's simcluster embeddings, which essentially assigns interestedIn
* SimClusters to each producer, regardless of whether the producer has a knownFor assignment.
*
$ ./bazel bundle src/scala/com/twitter/simclusters_v2/scalding/embedding:producer_embeddings_from_interested_in-adhoc
$ scalding remote run \
--main-class com.twitter.simclusters_v2.scalding.embedding.ProducerEmbeddingsFromInterestedInAdhocApp \
--target src/scala/com/twitter/simclusters_v2/scalding/embedding:producer_embeddings_from_interested_in-adhoc \
--user cassowary --cluster bluebird-qus1 \
--keytab /var/lib/tss/keys/fluffy/keytabs/client/cassowary.keytab \
--principal service_acoount@TWITTER.BIZ \
-- --date 2020-08-25 --model_version 20M_145K_updated \
--outputDir /gcs/user/cassowary/adhoc/producerEmbeddings/
*/
object ProducerEmbeddingsFromInterestedInAdhocApp extends AdhocExecutionApp {
import ProducerEmbeddingsFromInterestedIn._
private val numReducersForMatrixMultiplication = 12000
/**
* Calculate the embedding and writes the results keyed by producers and clusters separately into
* individual locations
*/
private def runAdhocByScore(
interestedInClusters: TypedPipe[(Long, ClustersUserIsInterestedIn)],
userUserNormalGraph: TypedPipe[UserAndNeighbors],
userNormsAndCounts: TypedPipe[NormsAndCounts],
keyedByProducerSinkPath: String,
keyedByClusterSinkPath: String,
userToProducerScoringFn: NeighborWithWeights => Double,
userToClusterScoringFn: UserToInterestedInClusterScores => Double,
userFilter: NormsAndCounts => Boolean,
modelVersion: ModelVersion
)(
implicit uniqueID: UniqueID
): Execution[Unit] = {
val producerClusterEmbedding = getProducerClusterEmbedding(
interestedInClusters,
userUserNormalGraph,
userNormsAndCounts,
userToProducerScoringFn,
userToClusterScoringFn,
userFilter,
numReducersForMatrixMultiplication,
modelVersion,
cosineSimilarityThreshold
).forceToDisk
val keyByProducerExec =
toSimClusterEmbedding(producerClusterEmbedding, topKClustersToKeep, modelVersion)
.writeExecution(
AdhocKeyValSources.topProducerToClusterEmbeddingsSource(keyedByProducerSinkPath))
val keyByClusterExec =
fromSimClusterEmbedding(producerClusterEmbedding, topKUsersToKeep, modelVersion)
.map { case (clusterId, topProducers) => (clusterId, topProducersToThrift(topProducers)) }
.writeExecution(
AdhocKeyValSources.topClusterEmbeddingsToProducerSource(keyedByClusterSinkPath))
Execution.zip(keyByProducerExec, keyByClusterExec).unit
}
// Calculate the embeddings using follow scores
private def runFollowScore(
interestedInClusters: TypedPipe[(Long, ClustersUserIsInterestedIn)],
userUserNormalGraph: TypedPipe[UserAndNeighbors],
userNormsAndCounts: TypedPipe[NormsAndCounts],
modelVersion: ModelVersion,
outputDir: String
)(
implicit uniqueID: UniqueID
): Execution[Unit] = {
val keyByClusterSinkPath = outputDir + "keyedByCluster/byFollowScore_" + modelVersion
val keyByProducerSinkPath = outputDir + "keyedByProducer/byFollowScore_" + modelVersion
runAdhocByScore(
interestedInClusters,
userUserNormalGraph,
userNormsAndCounts,
keyedByProducerSinkPath = keyByProducerSinkPath,
keyedByClusterSinkPath = keyByClusterSinkPath,
userToProducerScoringFn = userToProducerFollowScore,
userToClusterScoringFn = userToClusterFollowScore,
_.followerCount.exists(_ > minNumFollowersForProducer),
modelVersion
)
}
// Calculate the embeddings using fav scores
private def runFavScore(
interestedInClusters: TypedPipe[(Long, ClustersUserIsInterestedIn)],
userUserNormalGraph: TypedPipe[UserAndNeighbors],
userNormsAndCounts: TypedPipe[NormsAndCounts],
modelVersion: ModelVersion,
outputDir: String
)(
implicit uniqueID: UniqueID
): Execution[Unit] = {
val keyByClusterSinkPath = outputDir + "keyedByCluster/byFavScore_" + modelVersion
val keyByProducerSinkPath = outputDir + "keyedByProducer/byFavScore_" + modelVersion
runAdhocByScore(
interestedInClusters,
userUserNormalGraph,
userNormsAndCounts,
keyedByProducerSinkPath = keyByProducerSinkPath,
keyedByClusterSinkPath = keyByClusterSinkPath,
userToProducerScoringFn = userToProducerFavScore,
userToClusterScoringFn = userToClusterFavScore,
_.faverCount.exists(_ > minNumFaversForProducer),
modelVersion
)
}
override def runOnDateRange(
args: Args
)(
implicit dateRange: DateRange,
timeZone: TimeZone,
uniqueID: UniqueID
): Execution[Unit] = {
val outputDir = args("outputDir")
val modelVersion =
ModelVersions.toModelVersion(args.required("model_version"))
val interestedInClusters = modelVersion match {
case ModelVersion.Model20m145k2020 =>
InterestedInSources.simClustersInterestedIn2020Source(dateRange, timeZone).forceToDisk
case ModelVersion.Model20m145kUpdated =>
InterestedInSources.simClustersInterestedInUpdatedSource(dateRange, timeZone).forceToDisk
case _ =>
InterestedInSources.simClustersInterestedInDec11Source(dateRange, timeZone).forceToDisk
}
Execution
.zip(
runFavScore(
interestedInClusters,
DataSources.userUserNormalizedGraphSource,
DataSources.userNormsAndCounts,
modelVersion,
outputDir
),
runFollowScore(
interestedInClusters,
DataSources.userUserNormalizedGraphSource,
DataSources.userNormsAndCounts,
modelVersion,
outputDir
)
).unit
}
}
/** /**
* Computes the producer's interestedIn cluster embedding. i.e. If a tweet author (producer) is not * Computes the producer's interestedIn cluster embedding. i.e. If a tweet author (producer) is not
* associated with a KnownFor cluster, do a cross-product between * associated with a KnownFor cluster, do a cross-product between

View File

@ -0,0 +1,176 @@
package com.twitter.simclusters_v2.scalding.embedding
import com.twitter.dal.client.dataset.KeyValDALDataset
import com.twitter.scalding.*
import com.twitter.scalding_internal.dalv2.DALWrite.*
import com.twitter.scalding_internal.multiformat.format.keyval.KeyVal
import com.twitter.simclusters_v2.common.ModelVersions
import com.twitter.simclusters_v2.hdfs_sources.*
import com.twitter.simclusters_v2.scalding.embedding.common.EmbeddingUtil.*
import com.twitter.simclusters_v2.scalding.embedding.common.SimClustersEmbeddingJob
import com.twitter.simclusters_v2.thriftscala.*
import com.twitter.wtf.scalding.jobs.common.{AdhocExecutionApp, ScheduledExecutionApp}
import java.util.TimeZone
/**
* Adhoc job to calculate producer's simcluster embeddings, which essentially assigns interestedIn
* SimClusters to each producer, regardless of whether the producer has a knownFor assignment.
*
$ ./bazel bundle src/scala/com/twitter/simclusters_v2/scalding/embedding:producer_embeddings_from_interested_in-adhoc
$ scalding remote run \
--main-class com.twitter.simclusters_v2.scalding.embedding.ProducerEmbeddingsFromInterestedInAdhocApp \
--target src/scala/com/twitter/simclusters_v2/scalding/embedding:producer_embeddings_from_interested_in-adhoc \
--user cassowary --cluster bluebird-qus1 \
--keytab /var/lib/tss/keys/fluffy/keytabs/client/cassowary.keytab \
--principal service_acoount@TWITTER.BIZ \
-- --date 2020-08-25 --model_version 20M_145K_updated \
--outputDir /gcs/user/cassowary/adhoc/producerEmbeddings/
*/
object ProducerEmbeddingsFromInterestedInAdhocApp extends AdhocExecutionApp {
import ProducerEmbeddingsFromInterestedIn.*
private val numReducersForMatrixMultiplication = 12000
override def runOnDateRange(
args: Args
)(
implicit dateRange: DateRange,
timeZone: TimeZone,
uniqueID: UniqueID
): Execution[Unit] = {
val outputDir = args("outputDir")
val modelVersion =
ModelVersions.toModelVersion(args.required("model_version"))
val interestedInClusters = modelVersion match {
case ModelVersion.Model20m145k2020 =>
InterestedInSources.simClustersInterestedIn2020Source(dateRange, timeZone).forceToDisk
case ModelVersion.Model20m145kUpdated =>
InterestedInSources.simClustersInterestedInUpdatedSource(dateRange, timeZone).forceToDisk
case _ =>
InterestedInSources.simClustersInterestedInDec11Source(dateRange, timeZone).forceToDisk
}
Execution
.zip(
runFavScore(
interestedInClusters,
DataSources.userUserNormalizedGraphSource,
DataSources.userNormsAndCounts,
modelVersion,
outputDir
),
runFollowScore(
interestedInClusters,
DataSources.userUserNormalizedGraphSource,
DataSources.userNormsAndCounts,
modelVersion,
outputDir
)
).unit
}
// Calculate the embeddings using follow scores
private def runFollowScore(
interestedInClusters: TypedPipe[(Long, ClustersUserIsInterestedIn)],
userUserNormalGraph: TypedPipe[UserAndNeighbors],
userNormsAndCounts: TypedPipe[NormsAndCounts],
modelVersion: ModelVersion,
outputDir: String
)(
implicit uniqueID: UniqueID
): Execution[Unit] = {
val keyByClusterSinkPath = outputDir + "keyedByCluster/byFollowScore_" + modelVersion
val keyByProducerSinkPath = outputDir + "keyedByProducer/byFollowScore_" + modelVersion
runAdhocByScore(
interestedInClusters,
userUserNormalGraph,
userNormsAndCounts,
keyedByProducerSinkPath = keyByProducerSinkPath,
keyedByClusterSinkPath = keyByClusterSinkPath,
userToProducerScoringFn = userToProducerFollowScore,
userToClusterScoringFn = userToClusterFollowScore,
_.followerCount.exists(_ > minNumFollowersForProducer),
modelVersion
)
}
// Calculate the embeddings using fav scores
private def runFavScore(
interestedInClusters: TypedPipe[(Long, ClustersUserIsInterestedIn)],
userUserNormalGraph: TypedPipe[UserAndNeighbors],
userNormsAndCounts: TypedPipe[NormsAndCounts],
modelVersion: ModelVersion,
outputDir: String
)(
implicit uniqueID: UniqueID
): Execution[Unit] = {
val keyByClusterSinkPath = outputDir + "keyedByCluster/byFavScore_" + modelVersion
val keyByProducerSinkPath = outputDir + "keyedByProducer/byFavScore_" + modelVersion
runAdhocByScore(
interestedInClusters,
userUserNormalGraph,
userNormsAndCounts,
keyedByProducerSinkPath = keyByProducerSinkPath,
keyedByClusterSinkPath = keyByClusterSinkPath,
userToProducerScoringFn = userToProducerFavScore,
userToClusterScoringFn = userToClusterFavScore,
_.faverCount.exists(_ > minNumFaversForProducer),
modelVersion
)
}
/**
* Calculate the embedding and writes the results keyed by producers and clusters separately into
* individual locations
*/
private def runAdhocByScore(
interestedInClusters: TypedPipe[(Long, ClustersUserIsInterestedIn)],
userUserNormalGraph: TypedPipe[UserAndNeighbors],
userNormsAndCounts: TypedPipe[NormsAndCounts],
keyedByProducerSinkPath: String,
keyedByClusterSinkPath: String,
userToProducerScoringFn: NeighborWithWeights => Double,
userToClusterScoringFn: UserToInterestedInClusterScores => Double,
userFilter: NormsAndCounts => Boolean,
modelVersion: ModelVersion
)(
implicit uniqueID: UniqueID
): Execution[Unit] = {
val producerClusterEmbedding = getProducerClusterEmbedding(
interestedInClusters,
userUserNormalGraph,
userNormsAndCounts,
userToProducerScoringFn,
userToClusterScoringFn,
userFilter,
numReducersForMatrixMultiplication,
modelVersion,
cosineSimilarityThreshold
).forceToDisk
val keyByProducerExec =
toSimClusterEmbedding(producerClusterEmbedding, topKClustersToKeep, modelVersion)
.writeExecution(
AdhocKeyValSources.topProducerToClusterEmbeddingsSource(keyedByProducerSinkPath))
val keyByClusterExec =
fromSimClusterEmbedding(producerClusterEmbedding, topKUsersToKeep, modelVersion)
.map { case (clusterId, topProducers) => (clusterId, topProducersToThrift(topProducers)) }
.writeExecution(
AdhocKeyValSources.topClusterEmbeddingsToProducerSource(keyedByClusterSinkPath))
Execution.zip(keyByProducerExec, keyByClusterExec).unit
}
}

View File

@ -0,0 +1,82 @@
package com.twitter.simclusters_v2.scalding.embedding
import com.twitter.dal.client.dataset.KeyValDALDataset
import com.twitter.scalding.*
import com.twitter.scalding_internal.dalv2.DALWrite.*
import com.twitter.scalding_internal.multiformat.format.keyval.KeyVal
import com.twitter.simclusters_v2.common.ModelVersions
import com.twitter.simclusters_v2.hdfs_sources.*
import com.twitter.simclusters_v2.scalding.embedding.common.EmbeddingUtil.*
import com.twitter.simclusters_v2.scalding.embedding.common.SimClustersEmbeddingJob
import com.twitter.simclusters_v2.thriftscala.*
import com.twitter.wtf.scalding.jobs.common.{AdhocExecutionApp, ScheduledExecutionApp}
import java.util.TimeZone
object ProducerEmbeddingsFromInterestedInBatchAppUtil {
import ProducerEmbeddingsFromInterestedIn.*
val user = System.getenv("USER")
val rootPath: String = s"/user/$user/manhattan_sequence_files"
// Helps speed up the multiplication step which can get very big
val numReducersForMatrixMultiplication: Int = 12000
/**
* Given the producer x cluster matrix, key by producer / cluster individually, and write output
* to individual DAL datasets
*/
def writeOutput(
producerClusterEmbedding: TypedPipe[((ClusterId, UserId), Double)],
producerTopKEmbeddingsDataset: KeyValDALDataset[KeyVal[Long, TopSimClustersWithScore]],
clusterTopKProducersDataset: KeyValDALDataset[
KeyVal[PersistedFullClusterId, TopProducersWithScore]
],
producerTopKEmbeddingsPath: String,
clusterTopKProducersPath: String,
modelVersion: ModelVersion
): Execution[Unit] = {
val keyedByProducer =
toSimClusterEmbedding(producerClusterEmbedding, topKClustersToKeep, modelVersion)
.map { case (userId, clusters) => KeyVal(userId, clusters) }
.writeDALVersionedKeyValExecution(
producerTopKEmbeddingsDataset,
D.Suffix(producerTopKEmbeddingsPath)
)
val keyedBySimCluster = fromSimClusterEmbedding(
producerClusterEmbedding,
topKUsersToKeep,
modelVersion
).map {
case (clusterId, topProducers) => KeyVal(clusterId, topProducersToThrift(topProducers))
}
.writeDALVersionedKeyValExecution(
clusterTopKProducersDataset,
D.Suffix(clusterTopKProducersPath)
)
Execution.zip(keyedByProducer, keyedBySimCluster).unit
}
}

View File

@ -0,0 +1,56 @@
package com.twitter.simclusters_v2.scalding.embedding
import com.twitter.dal.client.dataset.KeyValDALDataset
import com.twitter.scalding.*
import com.twitter.scalding_internal.dalv2.DALWrite.*
import com.twitter.scalding_internal.multiformat.format.keyval.KeyVal
import com.twitter.simclusters_v2.common.ModelVersions
import com.twitter.simclusters_v2.hdfs_sources.*
import com.twitter.simclusters_v2.scalding.embedding.common.EmbeddingUtil.*
import com.twitter.simclusters_v2.scalding.embedding.common.SimClustersEmbeddingJob
import com.twitter.simclusters_v2.thriftscala.*
import com.twitter.wtf.scalding.jobs.common.{AdhocExecutionApp, ScheduledExecutionApp}
import java.util.TimeZone
/**
capesospy-v2 update --build_locally --start_cron \
--start_cron producer_embeddings_from_interested_in_by_fav_score_2020 \
src/scala/com/twitter/simclusters_v2/capesos_config/atla_proc3.yaml
*/
object ProducerEmbeddingsFromInterestedInByFavScore2020BatchApp
extends ProducerEmbeddingsFromInterestedInByFavScoreBase {
override val firstTime: RichDate = RichDate("2021-03-01")
override val batchIncrement: Duration = Days(7)
override def modelVersion: ModelVersion = ModelVersion.Model20m145k2020
override def getInterestedInFn: (
DateRange,
TimeZone
) => TypedPipe[(UserId, ClustersUserIsInterestedIn)] =
InterestedInSources.simClustersInterestedIn2020Source
override def producerTopKSimclusterEmbeddingsByFavScoreDataset: KeyValDALDataset[
KeyVal[Long, TopSimClustersWithScore]
] =
ProducerTopKSimclusterEmbeddingsByFavScore2020ScalaDataset
override def simclusterEmbeddingTopKProducersByFavScoreDataset: KeyValDALDataset[
KeyVal[PersistedFullClusterId, TopProducersWithScore]
] =
SimclusterEmbeddingTopKProducersByFavScore2020ScalaDataset
}

View File

@ -0,0 +1,96 @@
package com.twitter.simclusters_v2.scalding.embedding
import com.twitter.dal.client.dataset.KeyValDALDataset
import com.twitter.scalding.*
import com.twitter.scalding_internal.dalv2.DALWrite.*
import com.twitter.scalding_internal.multiformat.format.keyval.KeyVal
import com.twitter.simclusters_v2.common.ModelVersions
import com.twitter.simclusters_v2.hdfs_sources.*
import com.twitter.simclusters_v2.scalding.embedding.common.EmbeddingUtil.*
import com.twitter.simclusters_v2.scalding.embedding.common.SimClustersEmbeddingJob
import com.twitter.simclusters_v2.thriftscala.*
import com.twitter.wtf.scalding.jobs.common.{AdhocExecutionApp, ScheduledExecutionApp}
import java.util.TimeZone
/**
* Base class for Fav based producer embeddings. Helps reuse the code for different model versions
*/
trait ProducerEmbeddingsFromInterestedInByFavScoreBase extends ScheduledExecutionApp {
import ProducerEmbeddingsFromInterestedIn.*
import ProducerEmbeddingsFromInterestedInBatchAppUtil.*
val producerTopKEmbeddingsByFavScorePathPrefix: String =
"/producer_top_k_simcluster_embeddings_by_fav_score_"
val clusterTopKProducersByFavScorePathPrefix: String =
"/simcluster_embedding_top_k_producers_by_fav_score_"
val minNumFavers: Int = minNumFaversForProducer
def modelVersion: ModelVersion
def producerTopKSimclusterEmbeddingsByFavScoreDataset: KeyValDALDataset[
KeyVal[Long, TopSimClustersWithScore]
]
def simclusterEmbeddingTopKProducersByFavScoreDataset: KeyValDALDataset[
KeyVal[PersistedFullClusterId, TopProducersWithScore]
]
def getInterestedInFn: (DateRange, TimeZone) => TypedPipe[(Long, ClustersUserIsInterestedIn)]
override def runOnDateRange(
args: Args
)(
implicit dateRange: DateRange,
timeZone: TimeZone,
uniqueID: UniqueID
): Execution[Unit] = {
val producerTopKEmbeddingsByFavScorePathUpdated: String =
rootPath + producerTopKEmbeddingsByFavScorePathPrefix + ModelVersions
.toKnownForModelVersion(modelVersion)
val clusterTopKProducersByFavScorePathUpdated: String =
rootPath + clusterTopKProducersByFavScorePathPrefix + ModelVersions
.toKnownForModelVersion(modelVersion)
val producerClusterEmbeddingByFavScore = getProducerClusterEmbedding(
getInterestedInFn(dateRange.embiggen(Days(5)), timeZone),
DataSources.userUserNormalizedGraphSource,
DataSources.userNormsAndCounts,
userToProducerFavScore,
userToClusterFavScore, // Fav score
_.faverCount.exists(_ > minNumFavers),
numReducersForMatrixMultiplication,
modelVersion,
cosineSimilarityThreshold
).forceToDisk
writeOutput(
producerClusterEmbeddingByFavScore,
producerTopKSimclusterEmbeddingsByFavScoreDataset,
simclusterEmbeddingTopKProducersByFavScoreDataset,
producerTopKEmbeddingsByFavScorePathUpdated,
clusterTopKProducersByFavScorePathUpdated,
modelVersion
)
}
}

View File

@ -0,0 +1,57 @@
package com.twitter.simclusters_v2.scalding.embedding
import com.twitter.dal.client.dataset.KeyValDALDataset
import com.twitter.scalding.*
import com.twitter.scalding_internal.dalv2.DALWrite.*
import com.twitter.scalding_internal.multiformat.format.keyval.KeyVal
import com.twitter.simclusters_v2.common.ModelVersions
import com.twitter.simclusters_v2.hdfs_sources.*
import com.twitter.simclusters_v2.scalding.embedding.common.EmbeddingUtil.*
import com.twitter.simclusters_v2.scalding.embedding.common.SimClustersEmbeddingJob
import com.twitter.simclusters_v2.thriftscala.*
import com.twitter.wtf.scalding.jobs.common.{AdhocExecutionApp, ScheduledExecutionApp}
import java.util.TimeZone
/**
capesospy-v2 update --build_locally --start_cron \
--start_cron producer_embeddings_from_interested_in_by_fav_score \
src/scala/com/twitter/simclusters_v2/capesos_config/atla_proc3.yaml
*/
object ProducerEmbeddingsFromInterestedInByFavScoreBatchApp
extends ProducerEmbeddingsFromInterestedInByFavScoreBase {
override val firstTime: RichDate = RichDate("2019-09-10")
override val batchIncrement: Duration = Days(7)
override def modelVersion: ModelVersion = ModelVersion.Model20m145kUpdated
override def getInterestedInFn: (
DateRange,
TimeZone
) => TypedPipe[(UserId, ClustersUserIsInterestedIn)] =
InterestedInSources.simClustersInterestedInUpdatedSource
override def producerTopKSimclusterEmbeddingsByFavScoreDataset: KeyValDALDataset[
KeyVal[Long, TopSimClustersWithScore]
] =
ProducerTopKSimclusterEmbeddingsByFavScoreUpdatedScalaDataset
override def simclusterEmbeddingTopKProducersByFavScoreDataset: KeyValDALDataset[
KeyVal[PersistedFullClusterId, TopProducersWithScore]
] =
SimclusterEmbeddingTopKProducersByFavScoreUpdatedScalaDataset
}

View File

@ -0,0 +1,54 @@
package com.twitter.simclusters_v2.scalding.embedding
import com.twitter.dal.client.dataset.KeyValDALDataset
import com.twitter.scalding.*
import com.twitter.scalding_internal.dalv2.DALWrite.*
import com.twitter.scalding_internal.multiformat.format.keyval.KeyVal
import com.twitter.simclusters_v2.common.ModelVersions
import com.twitter.simclusters_v2.hdfs_sources.*
import com.twitter.simclusters_v2.scalding.embedding.common.EmbeddingUtil.*
import com.twitter.simclusters_v2.scalding.embedding.common.SimClustersEmbeddingJob
import com.twitter.simclusters_v2.thriftscala.*
import com.twitter.wtf.scalding.jobs.common.{AdhocExecutionApp, ScheduledExecutionApp}
import java.util.TimeZone
/**
capesospy-v2 update --build_locally --start_cron \
--start_cron producer_embeddings_from_interested_in_by_fav_score_dec11 \
src/scala/com/twitter/simclusters_v2/capesos_config/atla_proc3.yaml
*/
object ProducerEmbeddingsFromInterestedInByFavScoreDec11BatchApp
extends ProducerEmbeddingsFromInterestedInByFavScoreBase {
override val firstTime: RichDate = RichDate("2019-11-18")
override val batchIncrement: Duration = Days(7)
override def modelVersion: ModelVersion = ModelVersion.Model20m145kDec11
override def getInterestedInFn: (
DateRange,
TimeZone
) => TypedPipe[(UserId, ClustersUserIsInterestedIn)] =
InterestedInSources.simClustersInterestedInDec11Source
override def producerTopKSimclusterEmbeddingsByFavScoreDataset: KeyValDALDataset[
KeyVal[Long, TopSimClustersWithScore]
] =
ProducerTopKSimclusterEmbeddingsByFavScoreScalaDataset
override def simclusterEmbeddingTopKProducersByFavScoreDataset: KeyValDALDataset[
KeyVal[PersistedFullClusterId, TopProducersWithScore]
] =
SimclusterEmbeddingTopKProducersByFavScoreScalaDataset
}

View File

@ -0,0 +1,49 @@
package com.twitter.simclusters_v2.scalding.embedding
import com.twitter.dal.client.dataset.KeyValDALDataset
import com.twitter.scalding.*
import com.twitter.scalding_internal.dalv2.DALWrite.*
import com.twitter.scalding_internal.multiformat.format.keyval.KeyVal
import com.twitter.simclusters_v2.common.ModelVersions
import com.twitter.simclusters_v2.hdfs_sources.*
import com.twitter.simclusters_v2.scalding.embedding.common.EmbeddingUtil.*
import com.twitter.simclusters_v2.scalding.embedding.common.SimClustersEmbeddingJob
import com.twitter.simclusters_v2.thriftscala.*
import com.twitter.wtf.scalding.jobs.common.{AdhocExecutionApp, ScheduledExecutionApp}
import java.util.TimeZone
/**
capesospy-v2 update --build_locally --start_cron \
--start_cron producer_embeddings_from_interested_in_by_follow_score_2020 \
src/scala/com/twitter/simclusters_v2/capesos_config/atla_proc3.yaml
*/
object ProducerEmbeddingsFromInterestedInByFollowScore2020BatchApp
extends ProducerEmbeddingsFromInterestedInByFollowScoreBase {
override val firstTime: RichDate = RichDate("2021-03-01")
override val batchIncrement: Duration = Days(7)
override def modelVersion: ModelVersion = ModelVersion.Model20m145k2020
override def getInterestedInFn: (
DateRange,
TimeZone
) => TypedPipe[(UserId, ClustersUserIsInterestedIn)] =
InterestedInSources.simClustersInterestedIn2020Source
override def producerTopKSimclusterEmbeddingsByFollowScoreDataset: KeyValDALDataset[
KeyVal[Long, TopSimClustersWithScore]
] =
ProducerTopKSimclusterEmbeddingsByFollowScore2020ScalaDataset
override def simclusterEmbeddingTopKProducersByFollowScoreDataset: KeyValDALDataset[
KeyVal[PersistedFullClusterId, TopProducersWithScore]
] =
SimclusterEmbeddingTopKProducersByFollowScore2020ScalaDataset
}

View File

@ -0,0 +1,94 @@
package com.twitter.simclusters_v2.scalding.embedding
import com.twitter.dal.client.dataset.KeyValDALDataset
import com.twitter.scalding.*
import com.twitter.scalding_internal.dalv2.DALWrite.*
import com.twitter.scalding_internal.multiformat.format.keyval.KeyVal
import com.twitter.simclusters_v2.common.ModelVersions
import com.twitter.simclusters_v2.hdfs_sources.*
import com.twitter.simclusters_v2.scalding.embedding.common.EmbeddingUtil.*
import com.twitter.simclusters_v2.scalding.embedding.common.SimClustersEmbeddingJob
import com.twitter.simclusters_v2.thriftscala.*
import com.twitter.wtf.scalding.jobs.common.{AdhocExecutionApp, ScheduledExecutionApp}
import java.util.TimeZone
/**
* Base class for Follow based producer embeddings. Helps reuse the code for different model versions
*/
trait ProducerEmbeddingsFromInterestedInByFollowScoreBase extends ScheduledExecutionApp {
import ProducerEmbeddingsFromInterestedIn.*
import ProducerEmbeddingsFromInterestedInBatchAppUtil.*
val producerTopKEmbeddingsByFollowScorePathPrefix: String =
"/producer_top_k_simcluster_embeddings_by_follow_score_"
val clusterTopKProducersByFollowScorePathPrefix: String =
"/simcluster_embedding_top_k_producers_by_follow_score_"
val minNumFollowers: Int = minNumFollowersForProducer
def modelVersion: ModelVersion
def producerTopKSimclusterEmbeddingsByFollowScoreDataset: KeyValDALDataset[
KeyVal[Long, TopSimClustersWithScore]
]
def simclusterEmbeddingTopKProducersByFollowScoreDataset: KeyValDALDataset[
KeyVal[PersistedFullClusterId, TopProducersWithScore]
]
def getInterestedInFn: (DateRange, TimeZone) => TypedPipe[(Long, ClustersUserIsInterestedIn)]
override def runOnDateRange(
args: Args
)(
implicit dateRange: DateRange,
timeZone: TimeZone,
uniqueID: UniqueID
): Execution[Unit] = {
val producerTopKEmbeddingsByFollowScorePath: String =
rootPath + producerTopKEmbeddingsByFollowScorePathPrefix + ModelVersions
.toKnownForModelVersion(modelVersion)
val clusterTopKProducersByFollowScorePath: String =
rootPath + clusterTopKProducersByFollowScorePathPrefix + ModelVersions
.toKnownForModelVersion(modelVersion)
val producerClusterEmbeddingByFollowScore = getProducerClusterEmbedding(
getInterestedInFn(dateRange.embiggen(Days(5)), timeZone),
DataSources.userUserNormalizedGraphSource,
DataSources.userNormsAndCounts,
userToProducerFollowScore,
userToClusterFollowScore, // Follow score
_.followerCount.exists(_ > minNumFollowers),
numReducersForMatrixMultiplication,
modelVersion,
cosineSimilarityThreshold
).forceToDisk
writeOutput(
producerClusterEmbeddingByFollowScore,
producerTopKSimclusterEmbeddingsByFollowScoreDataset,
simclusterEmbeddingTopKProducersByFollowScoreDataset,
producerTopKEmbeddingsByFollowScorePath,
clusterTopKProducersByFollowScorePath,
modelVersion
)
}
}

View File

@ -0,0 +1,52 @@
package com.twitter.simclusters_v2.scalding.embedding
import com.twitter.dal.client.dataset.KeyValDALDataset
import com.twitter.scalding.*
import com.twitter.scalding_internal.dalv2.DALWrite.*
import com.twitter.scalding_internal.multiformat.format.keyval.KeyVal
import com.twitter.simclusters_v2.common.ModelVersions
import com.twitter.simclusters_v2.hdfs_sources.*
import com.twitter.simclusters_v2.scalding.embedding.common.EmbeddingUtil.*
import com.twitter.simclusters_v2.scalding.embedding.common.SimClustersEmbeddingJob
import com.twitter.simclusters_v2.thriftscala.*
import com.twitter.wtf.scalding.jobs.common.{AdhocExecutionApp, ScheduledExecutionApp}
import java.util.TimeZone
/**
capesospy-v2 update --build_locally --start_cron \
--start_cron producer_embeddings_from_interested_in_by_follow_score \
src/scala/com/twitter/simclusters_v2/capesos_config/atla_proc3.yaml
*/
object ProducerEmbeddingsFromInterestedInByFollowScoreBatchApp
extends ProducerEmbeddingsFromInterestedInByFollowScoreBase {
override val firstTime: RichDate = RichDate("2019-09-10")
override val batchIncrement: Duration = Days(7)
override def modelVersion: ModelVersion = ModelVersion.Model20m145kUpdated
override def getInterestedInFn: (
DateRange,
TimeZone
) => TypedPipe[(UserId, ClustersUserIsInterestedIn)] =
InterestedInSources.simClustersInterestedInUpdatedSource
override def producerTopKSimclusterEmbeddingsByFollowScoreDataset: KeyValDALDataset[
KeyVal[Long, TopSimClustersWithScore]
] =
ProducerTopKSimclusterEmbeddingsByFollowScoreUpdatedScalaDataset
override def simclusterEmbeddingTopKProducersByFollowScoreDataset: KeyValDALDataset[
KeyVal[PersistedFullClusterId, TopProducersWithScore]
] =
SimclusterEmbeddingTopKProducersByFollowScoreUpdatedScalaDataset
}

View File

@ -0,0 +1,47 @@
package com.twitter.simclusters_v2.scalding.embedding
import com.twitter.dal.client.dataset.KeyValDALDataset
import com.twitter.scalding.*
import com.twitter.scalding_internal.dalv2.DALWrite.*
import com.twitter.scalding_internal.multiformat.format.keyval.KeyVal
import com.twitter.simclusters_v2.common.ModelVersions
import com.twitter.simclusters_v2.hdfs_sources.*
import com.twitter.simclusters_v2.scalding.embedding.common.EmbeddingUtil.*
import com.twitter.simclusters_v2.scalding.embedding.common.SimClustersEmbeddingJob
import com.twitter.simclusters_v2.thriftscala.*
import com.twitter.wtf.scalding.jobs.common.{AdhocExecutionApp, ScheduledExecutionApp}
import java.util.TimeZone
/**
capesospy-v2 update --build_locally --start_cron \
--start_cron producer_embeddings_from_interested_in_by_follow_score_dec11 \
src/scala/com/twitter/simclusters_v2/capesos_config/atla_proc3.yaml
*/
object ProducerEmbeddingsFromInterestedInByFollowScoreDec11BatchApp
extends ProducerEmbeddingsFromInterestedInByFollowScoreBase {
override val firstTime: RichDate = RichDate("2019-11-18")
override val batchIncrement: Duration = Days(7)
override def modelVersion: ModelVersion = ModelVersion.Model20m145kDec11
override def getInterestedInFn: (
DateRange,
TimeZone
) => TypedPipe[(UserId, ClustersUserIsInterestedIn)] =
InterestedInSources.simClustersInterestedInDec11Source
override def producerTopKSimclusterEmbeddingsByFollowScoreDataset: KeyValDALDataset[
KeyVal[Long, TopSimClustersWithScore]
] =
ProducerTopKSimclusterEmbeddingsByFollowScoreScalaDataset
override def simclusterEmbeddingTopKProducersByFollowScoreDataset: KeyValDALDataset[
KeyVal[PersistedFullClusterId, TopProducersWithScore]
] =
SimclusterEmbeddingTopKProducersByFollowScoreScalaDataset
}

View File

@ -0,0 +1,28 @@
package com.twitter.simclusters_v2.scalding.embedding
import com.twitter.dal.client.dataset.KeyValDALDataset
import com.twitter.recos.entities.thriftscala.{Entity, Hashtag, SemanticCoreEntity}
import com.twitter.scalding.*
import com.twitter.scalding_internal.dalv2.DALWrite.*
import com.twitter.scalding_internal.multiformat.format.keyval.KeyVal
import com.twitter.simclusters_v2.common.{ModelVersions, SimClustersEmbedding}
import com.twitter.simclusters_v2.hdfs_sources.*
import com.twitter.simclusters_v2.scalding.embedding.common.EmbeddingUtil.*
import com.twitter.simclusters_v2.scalding.embedding.common.{EmbeddingUtil, EntityEmbeddingUtil, SimClustersEmbeddingJob}
import com.twitter.simclusters_v2.thriftscala.{SimClustersEmbedding as ThriftSimClustersEmbedding, *}
import com.twitter.wtf.entity_real_graph.common.EntityUtil
import com.twitter.wtf.entity_real_graph.thriftscala.EntityType
import com.twitter.wtf.scalding.jobs.common.{AdhocExecutionApp, DataSources, ScheduledExecutionApp}
import java.util.TimeZone
/**
* $ ./bazel bundle src/scala/com/twitter/simclusters_v2/scalding/embedding:semantic_core_entity_embeddings_2020_job
* $ capesospy-v2 update \
--build_locally \
--start_cron semantic_core_entity_embeddings_2020_job src/scala/com/twitter/simclusters_v2/capesos_config/atla_proc3.yaml
*/
object SemanticCoreEntityEmbeddings2020App extends EntityToSimClustersEmbeddingApp

View File

@ -17,194 +17,6 @@ import com.twitter.wtf.scalding.jobs.common.AdhocExecutionApp
import com.twitter.wtf.scalding.jobs.common.ScheduledExecutionApp import com.twitter.wtf.scalding.jobs.common.ScheduledExecutionApp
import java.util.TimeZone import java.util.TimeZone
/**
capesospy-v2 update --build_locally --start_cron \
--start_cron similar_users_by_simclusters_embeddings_job \
src/scala/com/twitter/simclusters_v2/capesos_config/atla_proc3.yaml
*/
object SimilarUsersBySimClustersEmbeddingBatchApp extends ScheduledExecutionApp {
override val firstTime: RichDate = RichDate("2019-07-10")
override val batchIncrement: Duration = Days(7)
private val outputByFav =
"/user/cassowary/manhattan_sequence_files/similar_users_by_simclusters_embeddings/by_fav"
private val outputByFollow =
"/user/cassowary/manhattan_sequence_files/similar_users_by_simclusters_embeddings/by_follow"
private implicit val valueInj: CompactScalaCodec[Candidates] = CompactScalaCodec(Candidates)
private val topClusterEmbeddingsByFavScore = DAL
.readMostRecentSnapshotNoOlderThan(
ProducerTopKSimclusterEmbeddingsByFavScoreUpdatedScalaDataset,
Days(14)
)
.withRemoteReadPolicy(AllowCrossClusterSameDC)
.toTypedPipe
.map { clusterScorePair => clusterScorePair.key -> clusterScorePair.value }
private val topProducersForClusterEmbeddingByFavScore = DAL
.readMostRecentSnapshotNoOlderThan(
SimclusterEmbeddingTopKProducersByFavScoreUpdatedScalaDataset,
Days(14)
)
.withRemoteReadPolicy(AllowCrossClusterSameDC)
.toTypedPipe
.map { producerScoresPair => producerScoresPair.key -> producerScoresPair.value }
private val topClusterEmbeddingsByFollowScore = DAL
.readMostRecentSnapshotNoOlderThan(
ProducerTopKSimclusterEmbeddingsByFollowScoreUpdatedScalaDataset,
Days(14)
)
.withRemoteReadPolicy(AllowCrossClusterSameDC)
.toTypedPipe
.map { clusterScorePair => clusterScorePair.key -> clusterScorePair.value }
private val topProducersForClusterEmbeddingByFollowScore = DAL
.readMostRecentSnapshotNoOlderThan(
SimclusterEmbeddingTopKProducersByFollowScoreUpdatedScalaDataset,
Days(14)
)
.withRemoteReadPolicy(AllowCrossClusterSameDC)
.toTypedPipe
.map { producerScoresPair => producerScoresPair.key -> producerScoresPair.value }
override def runOnDateRange(
args: Args
)(
implicit dateRange: DateRange,
timeZone: TimeZone,
uniqueID: UniqueID
): Execution[Unit] = {
Execution
.zip(
SimilarUsersBySimClustersEmbedding
.getTopUsersRelatedToUser(
topClusterEmbeddingsByFavScore,
topProducersForClusterEmbeddingByFavScore
)
.map { case (key, value) => KeyVal(key, value) }
.writeDALVersionedKeyValExecution(
SimilarUsersByFavBasedProducerEmbeddingScalaDataset,
D.Suffix(outputByFav)
),
SimilarUsersBySimClustersEmbedding
.getTopUsersRelatedToUser(
topClusterEmbeddingsByFollowScore,
topProducersForClusterEmbeddingByFollowScore
)
.map { case (key, value) => KeyVal(key, value) }
.writeDALVersionedKeyValExecution(
SimilarUsersByFollowBasedProducerEmbeddingScalaDataset,
D.Suffix(outputByFollow)
)
).unit
}
}
/**
* Adhoc job to calculate producer's simcluster embeddings, which essentially assigns interestedIn
* SimClusters to each producer, regardless of whether the producer has a knownFor assignment.
*
./bazel bundle src/scala/com/twitter/simclusters_v2/scalding/embedding:similar_users_by_simclusters_embeddings-adhoc && \
oscar hdfs --user recos-platform --screen --tee similar_users_by_simclusters_embeddings --bundle similar_users_by_simclusters_embeddings-adhoc \
--tool com.twitter.simclusters_v2.scalding.embedding.SimilarUsersBySimClustersEmbeddingAdhocApp \
-- --date 2019-07-10T00 2019-07-10T23
*/
object SimilarUsersBySimClustersEmbeddingAdhocApp extends AdhocExecutionApp {
private val outputByFav =
"/user/recos-platform/adhoc/similar_users_by_simclusters_embeddings/by_fav"
private val outputByFollow =
"/user/recos-platform/adhoc/similar_users_by_simclusters_embeddings/by_follow"
private val topClusterEmbeddingsByFavScore = DAL
.readMostRecentSnapshotNoOlderThan(
ProducerTopKSimclusterEmbeddingsByFavScoreUpdatedScalaDataset,
Days(14)
)
.withRemoteReadPolicy(AllowCrossClusterSameDC)
.toTypedPipe
.map { clusterScorePair => clusterScorePair.key -> clusterScorePair.value }
private val topProducersForClusterEmbeddingByFavScore = DAL
.readMostRecentSnapshotNoOlderThan(
SimclusterEmbeddingTopKProducersByFavScoreUpdatedScalaDataset,
Days(14)
)
.withRemoteReadPolicy(AllowCrossClusterSameDC)
.toTypedPipe
.map { producerScoresPair => producerScoresPair.key -> producerScoresPair.value }
private val topClusterEmbeddingsByFollowScore = DAL
.readMostRecentSnapshotNoOlderThan(
ProducerTopKSimclusterEmbeddingsByFollowScoreUpdatedScalaDataset,
Days(14)
)
.withRemoteReadPolicy(AllowCrossClusterSameDC)
.toTypedPipe
.map { clusterScorePair => clusterScorePair.key -> clusterScorePair.value }
private val topProducersForClusterEmbeddingByFollowScore = DAL
.readMostRecentSnapshotNoOlderThan(
SimclusterEmbeddingTopKProducersByFollowScoreUpdatedScalaDataset,
Days(14)
)
.withRemoteReadPolicy(AllowCrossClusterSameDC)
.toTypedPipe
.map { producerScoresPair => producerScoresPair.key -> producerScoresPair.value }
implicit val candidatesInj: CompactScalaCodec[Candidates] = CompactScalaCodec(Candidates)
override def runOnDateRange(
args: Args
)(
implicit dateRange: DateRange,
timeZone: TimeZone,
uniqueID: UniqueID
): Execution[Unit] = {
Execution
.zip(
SimilarUsersBySimClustersEmbedding
.getTopUsersRelatedToUser(
topClusterEmbeddingsByFavScore,
topProducersForClusterEmbeddingByFavScore).writeExecution(
VersionedKeyValSource[Long, Candidates](outputByFav))
.getCounters
.flatMap {
case (_, counters) =>
counters.toMap.toSeq
.sortBy(e => (e._1.group, e._1.counter))
.foreach {
case (statKey, value) =>
println(s"${statKey.group}\t${statKey.counter}\t$value")
}
Execution.unit
},
SimilarUsersBySimClustersEmbedding
.getTopUsersRelatedToUser(
topClusterEmbeddingsByFollowScore,
topProducersForClusterEmbeddingByFollowScore).writeExecution(
VersionedKeyValSource[Long, Candidates](outputByFollow))
.getCounters
.flatMap {
case (_, counters) =>
counters.toMap.toSeq
.sortBy(e => (e._1.group, e._1.counter))
.foreach {
case (statKey, value) =>
println(s"${statKey.group}\t${statKey.counter}\t$value")
}
Execution.unit
}
).unit
}
}
object SimilarUsersBySimClustersEmbedding { object SimilarUsersBySimClustersEmbedding {
private val maxUsersPerCluster = 300 private val maxUsersPerCluster = 300
private val maxClustersPerUser = 50 private val maxClustersPerUser = 50

View File

@ -0,0 +1,119 @@
package com.twitter.simclusters_v2.scalding.embedding
import com.twitter.bijection.Injection
import com.twitter.bijection.scrooge.CompactScalaCodec
import com.twitter.hermit.candidate.thriftscala.{Candidate, Candidates}
import com.twitter.scalding.*
import com.twitter.scalding.commons.source.VersionedKeyValSource
import com.twitter.scalding_internal.dalv2.*
import com.twitter.scalding_internal.dalv2.DALWrite.*
import com.twitter.scalding_internal.dalv2.remote_access.AllowCrossClusterSameDC
import com.twitter.scalding_internal.multiformat.format.keyval.KeyVal
import com.twitter.simclusters_v2.common.CosineSimilarityUtil
import com.twitter.simclusters_v2.hdfs_sources.*
import com.twitter.simclusters_v2.thriftscala.*
import com.twitter.wtf.scalding.jobs.common.{AdhocExecutionApp, ScheduledExecutionApp}
import java.util.TimeZone
/**
* Adhoc job to calculate producer's simcluster embeddings, which essentially assigns interestedIn
* SimClusters to each producer, regardless of whether the producer has a knownFor assignment.
*
./bazel bundle src/scala/com/twitter/simclusters_v2/scalding/embedding:similar_users_by_simclusters_embeddings-adhoc && \
oscar hdfs --user recos-platform --screen --tee similar_users_by_simclusters_embeddings --bundle similar_users_by_simclusters_embeddings-adhoc \
--tool com.twitter.simclusters_v2.scalding.embedding.SimilarUsersBySimClustersEmbeddingAdhocApp \
-- --date 2019-07-10T00 2019-07-10T23
*/
object SimilarUsersBySimClustersEmbeddingAdhocApp extends AdhocExecutionApp {
private val outputByFav =
"/user/recos-platform/adhoc/similar_users_by_simclusters_embeddings/by_fav"
private val outputByFollow =
"/user/recos-platform/adhoc/similar_users_by_simclusters_embeddings/by_follow"
private val topClusterEmbeddingsByFavScore = DAL
.readMostRecentSnapshotNoOlderThan(
ProducerTopKSimclusterEmbeddingsByFavScoreUpdatedScalaDataset,
Days(14)
)
.withRemoteReadPolicy(AllowCrossClusterSameDC)
.toTypedPipe
.map { clusterScorePair => clusterScorePair.key -> clusterScorePair.value }
private val topProducersForClusterEmbeddingByFavScore = DAL
.readMostRecentSnapshotNoOlderThan(
SimclusterEmbeddingTopKProducersByFavScoreUpdatedScalaDataset,
Days(14)
)
.withRemoteReadPolicy(AllowCrossClusterSameDC)
.toTypedPipe
.map { producerScoresPair => producerScoresPair.key -> producerScoresPair.value }
private val topClusterEmbeddingsByFollowScore = DAL
.readMostRecentSnapshotNoOlderThan(
ProducerTopKSimclusterEmbeddingsByFollowScoreUpdatedScalaDataset,
Days(14)
)
.withRemoteReadPolicy(AllowCrossClusterSameDC)
.toTypedPipe
.map { clusterScorePair => clusterScorePair.key -> clusterScorePair.value }
private val topProducersForClusterEmbeddingByFollowScore = DAL
.readMostRecentSnapshotNoOlderThan(
SimclusterEmbeddingTopKProducersByFollowScoreUpdatedScalaDataset,
Days(14)
)
.withRemoteReadPolicy(AllowCrossClusterSameDC)
.toTypedPipe
.map { producerScoresPair => producerScoresPair.key -> producerScoresPair.value }
implicit val candidatesInj: CompactScalaCodec[Candidates] = CompactScalaCodec(Candidates)
override def runOnDateRange(
args: Args
)(
implicit dateRange: DateRange,
timeZone: TimeZone,
uniqueID: UniqueID
): Execution[Unit] = {
Execution
.zip(
SimilarUsersBySimClustersEmbedding
.getTopUsersRelatedToUser(
topClusterEmbeddingsByFavScore,
topProducersForClusterEmbeddingByFavScore).writeExecution(
VersionedKeyValSource[Long, Candidates](outputByFav))
.getCounters
.flatMap {
case (_, counters) =>
counters.toMap.toSeq
.sortBy(e => (e._1.group, e._1.counter))
.foreach {
case (statKey, value) =>
println(s"${statKey.group}\t${statKey.counter}\t$value")
}
Execution.unit
},
SimilarUsersBySimClustersEmbedding
.getTopUsersRelatedToUser(
topClusterEmbeddingsByFollowScore,
topProducersForClusterEmbeddingByFollowScore).writeExecution(
VersionedKeyValSource[Long, Candidates](outputByFollow))
.getCounters
.flatMap {
case (_, counters) =>
counters.toMap.toSeq
.sortBy(e => (e._1.group, e._1.counter))
.foreach {
case (statKey, value) =>
println(s"${statKey.group}\t${statKey.counter}\t$value")
}
Execution.unit
}
).unit
}
}

View File

@ -0,0 +1,109 @@
package com.twitter.simclusters_v2.scalding.embedding
import com.twitter.bijection.Injection
import com.twitter.bijection.scrooge.CompactScalaCodec
import com.twitter.hermit.candidate.thriftscala.{Candidate, Candidates}
import com.twitter.scalding.*
import com.twitter.scalding.commons.source.VersionedKeyValSource
import com.twitter.scalding_internal.dalv2.*
import com.twitter.scalding_internal.dalv2.DALWrite.*
import com.twitter.scalding_internal.dalv2.remote_access.AllowCrossClusterSameDC
import com.twitter.scalding_internal.multiformat.format.keyval.KeyVal
import com.twitter.simclusters_v2.common.CosineSimilarityUtil
import com.twitter.simclusters_v2.hdfs_sources.*
import com.twitter.simclusters_v2.thriftscala.*
import com.twitter.wtf.scalding.jobs.common.{AdhocExecutionApp, ScheduledExecutionApp}
import java.util.TimeZone
/**
capesospy-v2 update --build_locally --start_cron \
--start_cron similar_users_by_simclusters_embeddings_job \
src/scala/com/twitter/simclusters_v2/capesos_config/atla_proc3.yaml
*/
object SimilarUsersBySimClustersEmbeddingBatchApp extends ScheduledExecutionApp {
override val firstTime: RichDate = RichDate("2019-07-10")
override val batchIncrement: Duration = Days(7)
private val outputByFav =
"/user/cassowary/manhattan_sequence_files/similar_users_by_simclusters_embeddings/by_fav"
private val outputByFollow =
"/user/cassowary/manhattan_sequence_files/similar_users_by_simclusters_embeddings/by_follow"
private implicit val valueInj: CompactScalaCodec[Candidates] = CompactScalaCodec(Candidates)
private val topClusterEmbeddingsByFavScore = DAL
.readMostRecentSnapshotNoOlderThan(
ProducerTopKSimclusterEmbeddingsByFavScoreUpdatedScalaDataset,
Days(14)
)
.withRemoteReadPolicy(AllowCrossClusterSameDC)
.toTypedPipe
.map { clusterScorePair => clusterScorePair.key -> clusterScorePair.value }
private val topProducersForClusterEmbeddingByFavScore = DAL
.readMostRecentSnapshotNoOlderThan(
SimclusterEmbeddingTopKProducersByFavScoreUpdatedScalaDataset,
Days(14)
)
.withRemoteReadPolicy(AllowCrossClusterSameDC)
.toTypedPipe
.map { producerScoresPair => producerScoresPair.key -> producerScoresPair.value }
private val topClusterEmbeddingsByFollowScore = DAL
.readMostRecentSnapshotNoOlderThan(
ProducerTopKSimclusterEmbeddingsByFollowScoreUpdatedScalaDataset,
Days(14)
)
.withRemoteReadPolicy(AllowCrossClusterSameDC)
.toTypedPipe
.map { clusterScorePair => clusterScorePair.key -> clusterScorePair.value }
private val topProducersForClusterEmbeddingByFollowScore = DAL
.readMostRecentSnapshotNoOlderThan(
SimclusterEmbeddingTopKProducersByFollowScoreUpdatedScalaDataset,
Days(14)
)
.withRemoteReadPolicy(AllowCrossClusterSameDC)
.toTypedPipe
.map { producerScoresPair => producerScoresPair.key -> producerScoresPair.value }
override def runOnDateRange(
args: Args
)(
implicit dateRange: DateRange,
timeZone: TimeZone,
uniqueID: UniqueID
): Execution[Unit] = {
Execution
.zip(
SimilarUsersBySimClustersEmbedding
.getTopUsersRelatedToUser(
topClusterEmbeddingsByFavScore,
topProducersForClusterEmbeddingByFavScore
)
.map { case (key, value) => KeyVal(key, value) }
.writeDALVersionedKeyValExecution(
SimilarUsersByFavBasedProducerEmbeddingScalaDataset,
D.Suffix(outputByFav)
),
SimilarUsersBySimClustersEmbedding
.getTopUsersRelatedToUser(
topClusterEmbeddingsByFollowScore,
topProducersForClusterEmbeddingByFollowScore
)
.map { case (key, value) => KeyVal(key, value) }
.writeDALVersionedKeyValExecution(
SimilarUsersByFollowBasedProducerEmbeddingScalaDataset,
D.Suffix(outputByFollow)
)
).unit
}
}