Delete product-mixer/shared-library/src/main/scala/com/twitter/product_mixer/shared_library directory

This commit is contained in:
kenan238 2023-04-05 20:54:19 +03:00 committed by GitHub
parent c0ede8f3bc
commit 242f07f939
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 0 additions and 1420 deletions

View File

@ -1,22 +0,0 @@
scala_library(
sources = ["*.scala"],
compiler_option_sets = ["fatal_warnings"],
strict_deps = True,
tags = ["bazel-compatible"],
dependencies = [
"finagle/finagle-core/src/main",
"finagle/finagle-http/src/main/scala",
"finagle/finagle-thriftmux/src/main/scala",
"finatra-internal/mtls-http/src/main/scala",
"finatra-internal/mtls-thriftmux/src/main/scala",
"util/util-core",
],
exports = [
"finagle/finagle-core/src/main",
"finagle/finagle-http/src/main/scala",
"finagle/finagle-thriftmux/src/main/scala",
"finatra-internal/mtls-http/src/main/scala",
"finatra-internal/mtls-thriftmux/src/main/scala",
"util/util-core",
],
)

View File

@ -1,57 +0,0 @@
package com.twitter.product_mixer.shared_library.http_client
import com.twitter.finagle.Http
import com.twitter.finagle.mtls.authentication.ServiceIdentifier
import com.twitter.finagle.mtls.client.MtlsStackClient._
import com.twitter.finagle.stats.StatsReceiver
import com.twitter.util.Duration
object FinagleHttpClientBuilder {
/**
* Build a Finagle HTTP client with S2S Auth / Mutual TLS
*
* @param requestTimeout HTTP client request timeout
* @param connectTimeout HTTP client transport connect timeout
* @param acquisitionTimeout HTTP client session acquisition timeout
* @param serviceIdentifier Service ID used to S2S Auth
* @param statsReceiver Stats
*
* @return Finagle HTTP Client with S2S Auth / Mutual TLS
*/
def buildFinagleHttpClientMutualTls(
requestTimeout: Duration,
connectTimeout: Duration,
acquisitionTimeout: Duration,
serviceIdentifier: ServiceIdentifier,
statsReceiver: StatsReceiver
): Http.Client =
buildFinagleHttpClient(
requestTimeout = requestTimeout,
connectTimeout = connectTimeout,
acquisitionTimeout = acquisitionTimeout,
statsReceiver = statsReceiver
).withMutualTls(serviceIdentifier)
/**
* Build a Finagle HTTP client
*
* @param requestTimeout HTTP client request timeout
* @param connectTimeout HTTP client transport connect timeout
* @param acquisitionTimeout HTTP client session acquisition timeout
* @param statsReceiver stats
*
* @return Finagle HTTP Client
*/
def buildFinagleHttpClient(
requestTimeout: Duration,
connectTimeout: Duration,
acquisitionTimeout: Duration,
statsReceiver: StatsReceiver,
): Http.Client =
Http.client
.withStatsReceiver(statsReceiver)
.withRequestTimeout(requestTimeout)
.withTransport.connectTimeout(connectTimeout)
.withSession.acquisitionTimeout(acquisitionTimeout)
}

View File

@ -1,97 +0,0 @@
package com.twitter.product_mixer.shared_library.http_client
import com.twitter.finagle.Http
import com.twitter.finagle.Service
import com.twitter.finagle.client.Transporter
import com.twitter.finagle.http.ProxyCredentials
import com.twitter.finagle.http.Request
import com.twitter.finagle.http.Response
import com.twitter.finagle.stats.StatsReceiver
import com.twitter.product_mixer.shared_library.http_client.FinagleHttpClientBuilder.buildFinagleHttpClient
import com.twitter.util.Duration
object FinagleHttpClientWithProxyBuilder {
/**
* Build a Finagle HTTP client with Egress Proxy support using Credentials
*
* @param twitterProxyHostPort Twitter egress proxy host port
* @param remoteProxyHostPort Remote proxy host port
* @param requestTimeout HTTP client request timeout
* @param connectTimeout HTTP client transport connect timeout
* @param acquisitionTimeout HTTP client session acquisition timeout
* @param proxyCredentials Proxy credentials
* @param statsReceiver Stats
*
* @return Finagle HTTP client with Egress Proxy support using Credentials
*/
def buildFinagleHttpClientWithCredentialProxy(
twitterProxyHostPort: HttpHostPort,
remoteProxyHostPort: HttpHostPort,
requestTimeout: Duration,
connectTimeout: Duration,
acquisitionTimeout: Duration,
proxyCredentials: ProxyCredentials,
statsReceiver: StatsReceiver,
): Http.Client = {
val httpClient = buildFinagleHttpClient(
requestTimeout = requestTimeout,
connectTimeout = connectTimeout,
acquisitionTimeout = acquisitionTimeout,
statsReceiver = statsReceiver
)
httpClient.withTransport
.httpProxyTo(
host = remoteProxyHostPort.toString,
credentials = Transporter.Credentials(proxyCredentials.username, proxyCredentials.password))
.withTls(remoteProxyHostPort.host)
}
/**
* Build a Finagle HTTP client with Egress Proxy support
*
* @param twitterProxyHostPort Twitter egress proxy host port
* @param remoteProxyHostPort Remote proxy host port
* @param requestTimeout HTTP client request timeout
* @param connectTimeout HTTP client transport connect timeout
* @param acquisitionTimeout HTTP client session acquisition timeout
* @param statsReceiver Stats
*
* @return Finagle HTTP client with Egress Proxy support
*/
def buildFinagleHttpClientWithProxy(
twitterProxyHostPort: HttpHostPort,
remoteProxyHostPort: HttpHostPort,
requestTimeout: Duration,
connectTimeout: Duration,
acquisitionTimeout: Duration,
statsReceiver: StatsReceiver,
): Http.Client = {
val httpClient = buildFinagleHttpClient(
requestTimeout = requestTimeout,
connectTimeout = connectTimeout,
acquisitionTimeout = acquisitionTimeout,
statsReceiver = statsReceiver
)
httpClient.withTransport
.httpProxyTo(remoteProxyHostPort.toString)
.withTls(remoteProxyHostPort.host)
}
/**
* Build a Finagle HTTP service with Egress Proxy support
*
* @param finagleHttpClientWithProxy Finagle HTTP client from which to build the service
* @param twitterProxyHostPort Twitter egress proxy host port
*
* @return Finagle HTTP service with Egress Proxy support
*/
def buildFinagleHttpServiceWithProxy(
finagleHttpClientWithProxy: Http.Client,
twitterProxyHostPort: HttpHostPort
): Service[Request, Response] = {
finagleHttpClientWithProxy.newService(twitterProxyHostPort.toString)
}
}

View File

@ -1,5 +0,0 @@
package com.twitter.product_mixer.shared_library.http_client
case class HttpHostPort(host: String, port: Int) {
override val toString: String = s"$host:$port"
}

View File

@ -1,24 +0,0 @@
scala_library(
sources = ["*.scala"],
compiler_option_sets = ["fatal_warnings"],
strict_deps = True,
tags = ["bazel-compatible"],
dependencies = [
"finagle-internal/mtls/src/main/scala/com/twitter/finagle/mtls/authentication",
"finagle-internal/mtls/src/main/scala/com/twitter/finagle/mtls/authorization",
"finagle/finagle-core/src/main",
"finagle/finagle-thriftmux/src/main/scala",
"src/scala/com/twitter/storehaus_internal/manhattan/config",
"src/thrift/com/twitter/manhattan:v1-scala",
"storage/clients/manhattan",
"util/util-core",
],
exports = [
"finagle-internal/mtls/src/main/scala/com/twitter/finagle/mtls/authentication",
"finagle-internal/mtls/src/main/scala/com/twitter/finagle/mtls/authorization",
"finagle/finagle-core/src/main",
"src/scala/com/twitter/storehaus_internal/manhattan/config",
"storage/clients/manhattan",
"util/util-core",
],
)

View File

@ -1,116 +0,0 @@
package com.twitter.product_mixer.shared_library.manhattan_client
import com.twitter.finagle.mtls.authentication.EmptyServiceIdentifier
import com.twitter.finagle.mtls.authentication.ServiceIdentifier
import com.twitter.finagle.ssl.OpportunisticTls
import com.twitter.finagle.stats.StatsReceiver
import com.twitter.manhattan.v1.{thriftscala => mh}
import com.twitter.storage.client.manhattan.kv.Experiments
import com.twitter.storage.client.manhattan.kv.Experiments.Experiment
import com.twitter.storage.client.manhattan.kv.Guarantee
import com.twitter.storage.client.manhattan.kv.ManhattanKVClient
import com.twitter.storage.client.manhattan.kv.ManhattanKVClientMtlsParams
import com.twitter.storage.client.manhattan.kv.ManhattanKVEndpoint
import com.twitter.storage.client.manhattan.kv.ManhattanKVEndpointBuilder
import com.twitter.storage.client.manhattan.kv.NoMtlsParams
import com.twitter.storehaus_internal.manhattan.ManhattanCluster
import com.twitter.util.Duration
object ManhattanClientBuilder {
/**
* Build a ManhattanKVClient/Endpoint [[ManhattanKVEndpoint]] / [[ManhattanKVClient]]
*
* @param cluster Manhattan cluster
* @param appId Manhattan appid
* @param numTries Max number of times to try
* @param maxTimeout Max request timeout
* @param maxItemsPerRequest Max items per request
* @param guarantee Consistency guarantee
* @param serviceIdentifier Service ID used to S2S Auth
* @param statsReceiver Stats
* @param experiments MH client experiments to include
* @return ManhattanKVEndpoint
*/
def buildManhattanEndpoint(
cluster: ManhattanCluster,
appId: String,
numTries: Int,
maxTimeout: Duration,
guarantee: Guarantee,
serviceIdentifier: ServiceIdentifier,
statsReceiver: StatsReceiver,
maxItemsPerRequest: Int = 100,
experiments: Seq[Experiment] = Seq(Experiments.ApertureLoadBalancer)
): ManhattanKVEndpoint = {
val client = buildManhattanClient(
cluster,
appId,
serviceIdentifier,
experiments
)
ManhattanKVEndpointBuilder(client)
.defaultGuarantee(guarantee)
.defaultMaxTimeout(maxTimeout)
.maxRetryCount(numTries)
.maxItemsPerRequest(maxItemsPerRequest)
.statsReceiver(statsReceiver)
.build()
}
/**
* Build a ManhattanKVClient
*
* @param cluster Manhattan cluster
* @param appId Manhattan appid
* @param serviceIdentifier Service ID used to S2S Auth
* @param experiments MH client experiments to include
*
* @return ManhattanKVClient
*/
def buildManhattanClient(
cluster: ManhattanCluster,
appId: String,
serviceIdentifier: ServiceIdentifier,
experiments: Seq[Experiment] = Seq(Experiments.ApertureLoadBalancer)
): ManhattanKVClient = {
val mtlsParams = serviceIdentifier match {
case EmptyServiceIdentifier => NoMtlsParams
case serviceIdentifier =>
ManhattanKVClientMtlsParams(
serviceIdentifier = serviceIdentifier,
opportunisticTls = OpportunisticTls.Required)
}
val label = s"manhattan/${cluster.prefix}"
new ManhattanKVClient(
appId = appId,
dest = cluster.wilyName,
mtlsParams = mtlsParams,
label = label,
experiments = experiments
)
}
def buildManhattanV1FinagleClient(
cluster: ManhattanCluster,
serviceIdentifier: ServiceIdentifier,
experiments: Seq[Experiment] = Seq(Experiments.ApertureLoadBalancer)
): mh.ManhattanCoordinator.MethodPerEndpoint = {
val mtlsParams = serviceIdentifier match {
case EmptyServiceIdentifier => NoMtlsParams
case serviceIdentifier =>
ManhattanKVClientMtlsParams(
serviceIdentifier = serviceIdentifier,
opportunisticTls = OpportunisticTls.Required)
}
val label = s"manhattan/${cluster.prefix}"
Experiments
.clientWithExperiments(experiments, mtlsParams)
.build[mh.ManhattanCoordinator.MethodPerEndpoint](cluster.wilyName, label)
}
}

View File

@ -1,18 +0,0 @@
scala_library(
sources = ["*.scala"],
compiler_option_sets = ["fatal_warnings"],
strict_deps = True,
tags = ["bazel-compatible"],
dependencies = [
"finagle/finagle-memcached/src/main/scala",
"finatra-internal/mtls-http/src/main/scala",
"finatra-internal/mtls-thriftmux/src/main/scala",
"servo/repo/src/main/scala",
],
exports = [
"finagle/finagle-memcached/src/main/scala",
"finatra-internal/mtls-http/src/main/scala",
"finatra-internal/mtls-thriftmux/src/main/scala",
"servo/repo/src/main/scala",
],
)

View File

@ -1,117 +0,0 @@
package com.twitter.product_mixer.shared_library.memcached_client
import com.twitter.finagle.memcached.Client
import com.twitter.finagle.memcached.protocol.Command
import com.twitter.finagle.memcached.protocol.Response
import com.twitter.finagle.mtls.client.MtlsStackClient._
import com.twitter.finagle.service.RetryExceptionsFilter
import com.twitter.finagle.service.RetryPolicy
import com.twitter.finagle.service.TimeoutFilter
import com.twitter.finagle.stats.StatsReceiver
import com.twitter.finagle.util.DefaultTimer
import com.twitter.finagle.GlobalRequestTimeoutException
import com.twitter.finagle.Memcached
import com.twitter.finagle.liveness.FailureAccrualFactory
import com.twitter.finagle.liveness.FailureAccrualPolicy
import com.twitter.finagle.mtls.authentication.ServiceIdentifier
import com.twitter.hashing.KeyHasher
import com.twitter.util.Duration
object MemcachedClientBuilder {
/**
* Build a Finagle Memcached [[Client]].
*
* @param destName Destination as a Wily path e.g. "/s/sample/sample".
* @param numTries Maximum number of times to try.
* @param requestTimeout Thrift client timeout per request. The Finagle default
* is unbounded which is almost never optimal.
* @param globalTimeout Thrift client total timeout. The Finagle default is
* unbounded which is almost never optimal.
* @param connectTimeout Thrift client transport connect timeout. The Finagle
* default of one second is reasonable but we lower this
* to match acquisitionTimeout for consistency.
* @param acquisitionTimeout Thrift client session acquisition timeout. The Finagle
* default is unbounded which is almost never optimal.
* @param serviceIdentifier Service ID used to S2S Auth.
* @param statsReceiver Stats.
* @param failureAccrualPolicy Policy to determine when to mark a cache server as dead.
* Memcached client will use default failure accrual policy
* if it is not set.
* @param keyHasher Hash algorithm that hashes a key into a 32-bit or 64-bit
* number. Memcached client will use default hash algorithm
* if it is not set.
*
* @see [[https://confluence.twitter.biz/display/CACHE/Finagle-memcached+User+Guide user guide]]
* @return Finagle Memcached [[Client]]
*/
def buildMemcachedClient(
destName: String,
numTries: Int,
requestTimeout: Duration,
globalTimeout: Duration,
connectTimeout: Duration,
acquisitionTimeout: Duration,
serviceIdentifier: ServiceIdentifier,
statsReceiver: StatsReceiver,
failureAccrualPolicy: Option[FailureAccrualPolicy] = None,
keyHasher: Option[KeyHasher] = None
): Client = {
buildRawMemcachedClient(
numTries,
requestTimeout,
globalTimeout,
connectTimeout,
acquisitionTimeout,
serviceIdentifier,
statsReceiver,
failureAccrualPolicy,
keyHasher
).newRichClient(destName)
}
def buildRawMemcachedClient(
numTries: Int,
requestTimeout: Duration,
globalTimeout: Duration,
connectTimeout: Duration,
acquisitionTimeout: Duration,
serviceIdentifier: ServiceIdentifier,
statsReceiver: StatsReceiver,
failureAccrualPolicy: Option[FailureAccrualPolicy] = None,
keyHasher: Option[KeyHasher] = None
): Memcached.Client = {
val globalTimeoutFilter = new TimeoutFilter[Command, Response](
timeout = globalTimeout,
exception = new GlobalRequestTimeoutException(globalTimeout),
timer = DefaultTimer)
val retryFilter = new RetryExceptionsFilter[Command, Response](
RetryPolicy.tries(numTries),
DefaultTimer,
statsReceiver)
val client = Memcached.client.withTransport
.connectTimeout(connectTimeout)
.withMutualTls(serviceIdentifier)
.withSession
.acquisitionTimeout(acquisitionTimeout)
.withRequestTimeout(requestTimeout)
.withStatsReceiver(statsReceiver)
.filtered(globalTimeoutFilter.andThen(retryFilter))
(keyHasher, failureAccrualPolicy) match {
case (Some(hasher), Some(policy)) =>
client
.withKeyHasher(hasher)
.configured(FailureAccrualFactory.Param(() => policy))
case (Some(hasher), None) =>
client
.withKeyHasher(hasher)
case (None, Some(policy)) =>
client
.configured(FailureAccrualFactory.Param(() => policy))
case _ =>
client
}
}
}

View File

@ -1,19 +0,0 @@
scala_library(
sources = ["*.scala"],
compiler_option_sets = ["fatal_warnings"],
platform = "java8",
strict_deps = True,
tags = ["bazel-compatible"],
dependencies = [
"servo/util",
"stitch/stitch-core",
"util/util-core:util-core-util",
"util/util-stats",
],
exports = [
"servo/util",
"stitch/stitch-core",
"util/util-core:util-core-util",
"util/util-stats",
],
)

View File

@ -1,203 +0,0 @@
package com.twitter.product_mixer.shared_library.observer
import com.twitter.finagle.stats.Counter
import com.twitter.finagle.stats.RollupStatsReceiver
import com.twitter.finagle.stats.Stat
import com.twitter.finagle.stats.StatsReceiver
import com.twitter.servo.util.CancelledExceptionExtractor
import com.twitter.stitch.Arrow
import com.twitter.stitch.Stitch
import com.twitter.util.Duration
import com.twitter.util.Future
import com.twitter.util.Throwables
import com.twitter.util.Try
/**
* Helper functions to observe requests, success, failures, cancellations, exceptions, and latency.
* Supports native functions and asynchronous operations.
*/
object Observer {
val Requests = "requests"
val Success = "success"
val Failures = "failures"
val Cancelled = "cancelled"
val Latency = "latency_ms"
/**
* Helper function to observe a stitch
*
* @see [[StitchObserver]]
*/
def stitch[T](statsReceiver: StatsReceiver, scopes: String*): StitchObserver[T] =
new StitchObserver[T](statsReceiver, scopes)
/**
* Helper function to observe an arrow
*
* @see [[ArrowObserver]]
*/
def arrow[In, Out](statsReceiver: StatsReceiver, scopes: String*): ArrowObserver[In, Out] =
new ArrowObserver[In, Out](statsReceiver, scopes)
/**
* Helper function to observe a future
*
* @see [[FutureObserver]]
*/
def future[T](statsReceiver: StatsReceiver, scopes: String*): FutureObserver[T] =
new FutureObserver[T](statsReceiver, scopes)
/**
* Helper function to observe a function
*
* @see [[FunctionObserver]]
*/
def function[T](statsReceiver: StatsReceiver, scopes: String*): FunctionObserver[T] =
new FunctionObserver[T](statsReceiver, scopes)
/**
* [[StitchObserver]] can record latency stats, success counters, and
* detailed failure stats for the results of a Stitch computation.
*/
class StitchObserver[T](
override val statsReceiver: StatsReceiver,
override val scopes: Seq[String])
extends Observer[T] {
/**
* Record stats for the provided Stitch.
* The result of the computation is passed through.
*
* @note the provided Stitch must contain the parts that need to be timed.
* Using this on just the result of the computation the latency stat
* will be incorrect.
*/
def apply(stitch: => Stitch[T]): Stitch[T] =
Stitch.time(stitch).map(observe.tupled).lowerFromTry
}
/**
* [[ArrowObserver]] can record the latency stats, success counters, and
* detailed failure stats for the result of an Arrow computation.
* The result of the computation is passed through.
*/
class ArrowObserver[In, Out](
override val statsReceiver: StatsReceiver,
override val scopes: Seq[String])
extends Observer[Out] {
/**
* Returns a new Arrow that records stats when it's run.
* The result of the Arrow is passed through.
*
* @note the provided Arrow must contain the parts that need to be timed.
* Using this on just the result of the computation the latency stat
* will be incorrect.
*/
def apply(arrow: Arrow[In, Out]): Arrow[In, Out] =
Arrow.time(arrow).map(observe.tupled).lowerFromTry
}
/**
* [[FutureObserver]] can record latency stats, success counters, and
* detailed failure stats for the results of a Future computation.
*/
class FutureObserver[T](
override val statsReceiver: StatsReceiver,
override val scopes: Seq[String])
extends Observer[T] {
/**
* Record stats for the provided Future.
* The result of the computation is passed through.
*
* @note the provided Future must contain the parts that need to be timed.
* Using this on just the result of the computation the latency stat
* will be incorrect.
*/
def apply(future: => Future[T]): Future[T] =
Stat
.timeFuture(latencyStat)(future)
.onSuccess(observeSuccess)
.onFailure(observeFailure)
}
/**
* [[FunctionObserver]] can record latency stats, success counters, and
* detailed failure stats for the results of a computation computation.
*/
class FunctionObserver[T](
override val statsReceiver: StatsReceiver,
override val scopes: Seq[String])
extends Observer[T] {
/**
* Record stats for the provided computation.
* The result of the computation is passed through.
*
* @note the provided computation must contain the parts that need to be timed.
* Using this on just the result of the computation the latency stat
* will be incorrect.
*/
def apply(f: => T): T = {
Try(Stat.time(latencyStat)(f))
.onSuccess(observeSuccess)
.onFailure(observeFailure)
.apply()
}
}
/** [[Observer]] provides methods for recording latency, success, and failure stats */
trait Observer[T] {
protected val statsReceiver: StatsReceiver
/** Scopes that prefix all stats */
protected val scopes: Seq[String]
private val rollupStatsReceiver = new RollupStatsReceiver(statsReceiver.scope(scopes: _*))
private val requestsCounter: Counter = statsReceiver.counter(scopes :+ Requests: _*)
private val successCounter: Counter = statsReceiver.counter(scopes :+ Success: _*)
// create the stats so their metrics paths are always present but
// defer to the [[RollupStatsReceiver]] to increment these stats
rollupStatsReceiver.counter(Failures)
rollupStatsReceiver.counter(Cancelled)
/** Serialize a throwable and it's causes into a seq of Strings for scoping metrics */
protected def serializeThrowable(throwable: Throwable): Seq[String] =
Throwables.mkString(throwable)
/** Used to record latency in milliseconds */
protected val latencyStat: Stat = statsReceiver.stat(scopes :+ Latency: _*)
/** Records the latency from a [[Duration]] */
protected val observeLatency: Duration => Unit = { latency =>
latencyStat.add(latency.inMilliseconds)
}
/** Records successes */
protected val observeSuccess: T => Unit = { _ =>
requestsCounter.incr()
successCounter.incr()
}
/** Records failures and failure details */
protected val observeFailure: Throwable => Unit = {
case CancelledExceptionExtractor(throwable) =>
requestsCounter.incr()
rollupStatsReceiver.counter(Cancelled +: serializeThrowable(throwable): _*).incr()
case throwable =>
requestsCounter.incr()
rollupStatsReceiver.counter(Failures +: serializeThrowable(throwable): _*).incr()
}
/** Records the latency, successes, and failures */
protected val observe: (Try[T], Duration) => Try[T] =
(response: Try[T], runDuration: Duration) => {
observeLatency(runDuration)
response
.onSuccess(observeSuccess)
.onFailure(observeFailure)
}
}
}

View File

@ -1,281 +0,0 @@
package com.twitter.product_mixer.shared_library.observer
import com.twitter.finagle.stats.Counter
import com.twitter.finagle.stats.StatsReceiver
import com.twitter.product_mixer.shared_library.observer.Observer.ArrowObserver
import com.twitter.product_mixer.shared_library.observer.Observer.FunctionObserver
import com.twitter.product_mixer.shared_library.observer.Observer.FutureObserver
import com.twitter.product_mixer.shared_library.observer.Observer.Observer
import com.twitter.product_mixer.shared_library.observer.Observer.StitchObserver
import com.twitter.stitch.Arrow
import com.twitter.stitch.Stitch
import com.twitter.util.Future
import com.twitter.util.Try
/**
* Helper functions to observe requests, successes, failures, cancellations, exceptions, latency,
* and result counts. Supports native functions and asynchronous operations.
*/
object ResultsObserver {
val Total = "total"
val Found = "found"
val NotFound = "not_found"
/**
* Helper function to observe a stitch and result counts
*
* @see [[StitchResultsObserver]]
*/
def stitchResults[T](
size: T => Int,
statsReceiver: StatsReceiver,
scopes: String*
): StitchResultsObserver[T] = {
new StitchResultsObserver[T](size, statsReceiver, scopes)
}
/**
* Helper function to observe a stitch and traversable (e.g. Seq, Set) result counts
*
* @see [[StitchResultsObserver]]
*/
def stitchResults[T <: TraversableOnce[_]](
statsReceiver: StatsReceiver,
scopes: String*
): StitchResultsObserver[T] = {
new StitchResultsObserver[T](_.size, statsReceiver, scopes)
}
/**
* Helper function to observe an arrow and result counts
*
* @see [[ArrowResultsObserver]]
*/
def arrowResults[In, Out](
size: Out => Int,
statsReceiver: StatsReceiver,
scopes: String*
): ArrowResultsObserver[In, Out] = {
new ArrowResultsObserver[In, Out](size, statsReceiver, scopes)
}
/**
* Helper function to observe an arrow and traversable (e.g. Seq, Set) result counts
*
* @see [[ArrowResultsObserver]]
*/
def arrowResults[In, Out <: TraversableOnce[_]](
statsReceiver: StatsReceiver,
scopes: String*
): ArrowResultsObserver[In, Out] = {
new ArrowResultsObserver[In, Out](_.size, statsReceiver, scopes)
}
/**
* Helper function to observe an arrow and result counts
*
* @see [[TransformingArrowResultsObserver]]
*/
def transformingArrowResults[In, Out, Transformed](
transformer: Out => Try[Transformed],
size: Transformed => Int,
statsReceiver: StatsReceiver,
scopes: String*
): TransformingArrowResultsObserver[In, Out, Transformed] = {
new TransformingArrowResultsObserver[In, Out, Transformed](
transformer,
size,
statsReceiver,
scopes)
}
/**
* Helper function to observe an arrow and traversable (e.g. Seq, Set) result counts
*
* @see [[TransformingArrowResultsObserver]]
*/
def transformingArrowResults[In, Out, Transformed <: TraversableOnce[_]](
transformer: Out => Try[Transformed],
statsReceiver: StatsReceiver,
scopes: String*
): TransformingArrowResultsObserver[In, Out, Transformed] = {
new TransformingArrowResultsObserver[In, Out, Transformed](
transformer,
_.size,
statsReceiver,
scopes)
}
/**
* Helper function to observe a future and result counts
*
* @see [[FutureResultsObserver]]
*/
def futureResults[T](
size: T => Int,
statsReceiver: StatsReceiver,
scopes: String*
): FutureResultsObserver[T] = {
new FutureResultsObserver[T](size, statsReceiver, scopes)
}
/**
* Helper function to observe a future and traversable (e.g. Seq, Set) result counts
*
* @see [[FutureResultsObserver]]
*/
def futureResults[T <: TraversableOnce[_]](
statsReceiver: StatsReceiver,
scopes: String*
): FutureResultsObserver[T] = {
new FutureResultsObserver[T](_.size, statsReceiver, scopes)
}
/**
* Helper function to observe a function and result counts
*
* @see [[FunctionResultsObserver]]
*/
def functionResults[T](
size: T => Int,
statsReceiver: StatsReceiver,
scopes: String*
): FunctionResultsObserver[T] = {
new FunctionResultsObserver[T](size, statsReceiver, scopes)
}
/**
* Helper function to observe a function and traversable (e.g. Seq, Set) result counts
*
* @see [[FunctionResultsObserver]]
*/
def functionResults[T <: TraversableOnce[_]](
statsReceiver: StatsReceiver,
scopes: String*
): FunctionResultsObserver[T] = {
new FunctionResultsObserver[T](_.size, statsReceiver, scopes)
}
/** [[StitchObserver]] that also records result size */
class StitchResultsObserver[T](
override val size: T => Int,
override val statsReceiver: StatsReceiver,
override val scopes: Seq[String])
extends StitchObserver[T](statsReceiver, scopes)
with ResultsObserver[T] {
override def apply(stitch: => Stitch[T]): Stitch[T] =
super
.apply(stitch)
.onSuccess(observeResults)
}
/** [[ArrowObserver]] that also records result size */
class ArrowResultsObserver[In, Out](
override val size: Out => Int,
override val statsReceiver: StatsReceiver,
override val scopes: Seq[String])
extends ArrowObserver[In, Out](statsReceiver, scopes)
with ResultsObserver[Out] {
override def apply(arrow: Arrow[In, Out]): Arrow[In, Out] =
super
.apply(arrow)
.onSuccess(observeResults)
}
/**
* [[TransformingArrowResultsObserver]] functions like an [[ArrowObserver]] except
* that it transforms the result using [[transformer]] before recording stats.
*
* The original non-transformed result is then returned.
*/
class TransformingArrowResultsObserver[In, Out, Transformed](
val transformer: Out => Try[Transformed],
override val size: Transformed => Int,
override val statsReceiver: StatsReceiver,
override val scopes: Seq[String])
extends Observer[Transformed]
with ResultsObserver[Transformed] {
/**
* Returns a new Arrow that records stats on the result after applying [[transformer]] when it's run.
* The original, non-transformed, result of the Arrow is passed through.
*
* @note the provided Arrow must contain the parts that need to be timed.
* Using this on just the result of the computation the latency stat
* will be incorrect.
*/
def apply(arrow: Arrow[In, Out]): Arrow[In, Out] = {
Arrow
.time(arrow)
.map {
case (response, stitchRunDuration) =>
observe(response.flatMap(transformer), stitchRunDuration)
.onSuccess(observeResults)
response
}.lowerFromTry
}
}
/** [[FutureObserver]] that also records result size */
class FutureResultsObserver[T](
override val size: T => Int,
override val statsReceiver: StatsReceiver,
override val scopes: Seq[String])
extends FutureObserver[T](statsReceiver, scopes)
with ResultsObserver[T] {
override def apply(future: => Future[T]): Future[T] =
super
.apply(future)
.onSuccess(observeResults)
}
/** [[FunctionObserver]] that also records result size */
class FunctionResultsObserver[T](
override val size: T => Int,
override val statsReceiver: StatsReceiver,
override val scopes: Seq[String])
extends FunctionObserver[T](statsReceiver, scopes)
with ResultsObserver[T] {
override def apply(f: => T): T = observeResults(super.apply(f))
}
/** [[ResultsObserver]] provides methods for recording stats for the result size */
trait ResultsObserver[T] {
protected val statsReceiver: StatsReceiver
/** Scopes that prefix all stats */
protected val scopes: Seq[String]
protected val totalCounter: Counter = statsReceiver.counter(scopes :+ Total: _*)
protected val foundCounter: Counter = statsReceiver.counter(scopes :+ Found: _*)
protected val notFoundCounter: Counter = statsReceiver.counter(scopes :+ NotFound: _*)
/** given a [[T]] returns it's size. */
protected val size: T => Int
/** Records the size of the `results` using [[size]] and return the original value. */
protected def observeResults(results: T): T = {
val resultsSize = size(results)
observeResultsWithSize(results, resultsSize)
}
/**
* Records the `resultsSize` and returns the `results`
*
* This is useful if the size is already available and is expensive to calculate.
*/
protected def observeResultsWithSize(results: T, resultsSize: Int): T = {
if (resultsSize > 0) {
totalCounter.incr(resultsSize)
foundCounter.incr()
} else {
notFoundCounter.incr()
}
results
}
}
}

View File

@ -1,243 +0,0 @@
package com.twitter.product_mixer.shared_library.observer
import com.twitter.finagle.stats.Stat
import com.twitter.finagle.stats.StatsReceiver
import com.twitter.product_mixer.shared_library.observer.Observer.ArrowObserver
import com.twitter.product_mixer.shared_library.observer.Observer.FunctionObserver
import com.twitter.product_mixer.shared_library.observer.Observer.FutureObserver
import com.twitter.product_mixer.shared_library.observer.Observer.Observer
import com.twitter.product_mixer.shared_library.observer.Observer.StitchObserver
import com.twitter.product_mixer.shared_library.observer.ResultsObserver.ResultsObserver
import com.twitter.stitch.Arrow
import com.twitter.stitch.Stitch
import com.twitter.util.Future
import com.twitter.util.Try
/**
* Helper functions to observe requests, successes, failures, cancellations, exceptions, latency,
* and result counts and time-series stats. Supports native functions and asynchronous operations.
*
* Note that since time-series stats are expensive to compute (relative to counters), prefer
* [[ResultsObserver]] unless a time-series stat is needed.
*/
object ResultsStatsObserver {
val Size = "size"
/**
* Helper function to observe a stitch and result counts and time-series stats
*/
def stitchResultsStats[T](
size: T => Int,
statsReceiver: StatsReceiver,
scopes: String*
): StitchResultsStatsObserver[T] = {
new StitchResultsStatsObserver[T](size, statsReceiver, scopes)
}
/**
* Helper function to observe a stitch and traversable (e.g. Seq, Set) result counts and
* time-series stats
*/
def stitchResultsStats[T <: TraversableOnce[_]](
statsReceiver: StatsReceiver,
scopes: String*
): StitchResultsStatsObserver[T] = {
new StitchResultsStatsObserver[T](_.size, statsReceiver, scopes)
}
/**
* Helper function to observe an arrow and result counts and time-series stats
*/
def arrowResultsStats[T, U](
size: U => Int,
statsReceiver: StatsReceiver,
scopes: String*
): ArrowResultsStatsObserver[T, U] = {
new ArrowResultsStatsObserver[T, U](size, statsReceiver, scopes)
}
/**
* Helper function to observe an arrow and traversable (e.g. Seq, Set) result counts and
* * time-series stats
*/
def arrowResultsStats[T, U <: TraversableOnce[_]](
statsReceiver: StatsReceiver,
scopes: String*
): ArrowResultsStatsObserver[T, U] = {
new ArrowResultsStatsObserver[T, U](_.size, statsReceiver, scopes)
}
/**
* Helper function to observe an arrow and result counts
*
* @see [[TransformingArrowResultsStatsObserver]]
*/
def transformingArrowResultsStats[In, Out, Transformed](
transformer: Out => Try[Transformed],
size: Transformed => Int,
statsReceiver: StatsReceiver,
scopes: String*
): TransformingArrowResultsStatsObserver[In, Out, Transformed] = {
new TransformingArrowResultsStatsObserver[In, Out, Transformed](
transformer,
size,
statsReceiver,
scopes)
}
/**
* Helper function to observe an arrow and traversable (e.g. Seq, Set) result counts
*
* @see [[TransformingArrowResultsStatsObserver]]
*/
def transformingArrowResultsStats[In, Out, Transformed <: TraversableOnce[_]](
transformer: Out => Try[Transformed],
statsReceiver: StatsReceiver,
scopes: String*
): TransformingArrowResultsStatsObserver[In, Out, Transformed] = {
new TransformingArrowResultsStatsObserver[In, Out, Transformed](
transformer,
_.size,
statsReceiver,
scopes)
}
/**
* Helper function to observe a future and result counts and time-series stats
*/
def futureResultsStats[T](
size: T => Int,
statsReceiver: StatsReceiver,
scopes: String*
): FutureResultsStatsObserver[T] = {
new FutureResultsStatsObserver[T](size, statsReceiver, scopes)
}
/**
* Helper function to observe a future and traversable (e.g. Seq, Set) result counts and
* time-series stats
*/
def futureResultsStats[T <: TraversableOnce[_]](
statsReceiver: StatsReceiver,
scopes: String*
): FutureResultsStatsObserver[T] = {
new FutureResultsStatsObserver[T](_.size, statsReceiver, scopes)
}
/**
* Helper function observe a function and result counts and time-series stats
*/
def functionResultsStats[T](
size: T => Int,
statsReceiver: StatsReceiver,
scopes: String*
): FunctionResultsStatsObserver[T] = {
new FunctionResultsStatsObserver[T](size, statsReceiver, scopes)
}
/**
* Helper function observe a function and traversable (e.g. Seq, Set) result counts and
* time-series stats
*/
def functionResultsStats[T <: TraversableOnce[_]](
statsReceiver: StatsReceiver,
scopes: String*
): FunctionResultsStatsObserver[T] = {
new FunctionResultsStatsObserver[T](_.size, statsReceiver, scopes)
}
class StitchResultsStatsObserver[T](
override val size: T => Int,
override val statsReceiver: StatsReceiver,
override val scopes: Seq[String])
extends StitchObserver[T](statsReceiver, scopes)
with ResultsStatsObserver[T] {
override def apply(stitch: => Stitch[T]): Stitch[T] =
super
.apply(stitch)
.onSuccess(observeResults)
}
class ArrowResultsStatsObserver[T, U](
override val size: U => Int,
override val statsReceiver: StatsReceiver,
override val scopes: Seq[String])
extends ArrowObserver[T, U](statsReceiver, scopes)
with ResultsStatsObserver[U] {
override def apply(arrow: Arrow[T, U]): Arrow[T, U] =
super
.apply(arrow)
.onSuccess(observeResults)
}
/**
* [[TransformingArrowResultsStatsObserver]] functions like an [[ArrowObserver]] except
* that it transforms the result using [[transformer]] before recording stats.
*
* The original non-transformed result is then returned.
*/
class TransformingArrowResultsStatsObserver[In, Out, Transformed](
val transformer: Out => Try[Transformed],
override val size: Transformed => Int,
override val statsReceiver: StatsReceiver,
override val scopes: Seq[String])
extends Observer[Transformed]
with ResultsStatsObserver[Transformed] {
/**
* Returns a new Arrow that records stats on the result after applying [[transformer]] when it's run.
* The original, non-transformed, result of the Arrow is passed through.
*
* @note the provided Arrow must contain the parts that need to be timed.
* Using this on just the result of the computation the latency stat
* will be incorrect.
*/
def apply(arrow: Arrow[In, Out]): Arrow[In, Out] = {
Arrow
.time(arrow)
.map {
case (response, stitchRunDuration) =>
observe(response.flatMap(transformer), stitchRunDuration)
.onSuccess(observeResults)
response
}.lowerFromTry
}
}
class FutureResultsStatsObserver[T](
override val size: T => Int,
override val statsReceiver: StatsReceiver,
override val scopes: Seq[String])
extends FutureObserver[T](statsReceiver, scopes)
with ResultsStatsObserver[T] {
override def apply(future: => Future[T]): Future[T] =
super
.apply(future)
.onSuccess(observeResults)
}
class FunctionResultsStatsObserver[T](
override val size: T => Int,
override val statsReceiver: StatsReceiver,
override val scopes: Seq[String])
extends FunctionObserver[T](statsReceiver, scopes)
with ResultsStatsObserver[T] {
override def apply(f: => T): T = {
observeResults(super.apply(f))
}
}
trait ResultsStatsObserver[T] extends ResultsObserver[T] {
private val sizeStat: Stat = statsReceiver.stat(scopes :+ Size: _*)
protected override def observeResults(results: T): T = {
val resultsSize = size(results)
sizeStat.add(resultsSize)
observeResultsWithSize(results, resultsSize)
}
}
}

View File

@ -1,20 +0,0 @@
scala_library(
sources = ["*.scala"],
compiler_option_sets = ["fatal_warnings"],
strict_deps = True,
tags = ["bazel-compatible"],
dependencies = [
"finagle/finagle-core/src/main",
"finagle/finagle-thriftmux/src/main/scala",
"finatra-internal/mtls-http/src/main/scala",
"finatra-internal/mtls-thriftmux/src/main/scala",
"util/util-core",
],
exports = [
"finagle/finagle-core/src/main",
"finagle/finagle-thriftmux/src/main/scala",
"finatra-internal/mtls-http/src/main/scala",
"finatra-internal/mtls-thriftmux/src/main/scala",
"util/util-core",
],
)

View File

@ -1,198 +0,0 @@
package com.twitter.product_mixer.shared_library.thrift_client
import com.twitter.conversions.DurationOps._
import com.twitter.finagle.ThriftMux
import com.twitter.finagle.mtls.authentication.ServiceIdentifier
import com.twitter.finagle.mtls.client.MtlsStackClient._
import com.twitter.finagle.stats.StatsReceiver
import com.twitter.finagle.thrift.ClientId
import com.twitter.finagle.thrift.service.Filterable
import com.twitter.finagle.thrift.service.MethodPerEndpointBuilder
import com.twitter.finagle.thrift.service.ServicePerEndpointBuilder
import com.twitter.finagle.thriftmux.MethodBuilder
import com.twitter.util.Duration
import org.apache.thrift.protocol.TProtocolFactory
sealed trait Idempotency
case object NonIdempotent extends Idempotency
case class Idempotent(maxExtraLoadPercent: Double) extends Idempotency
object FinagleThriftClientBuilder {
/**
* Library to build a Finagle Thrift method per endpoint client is a less error-prone way when
* compared to the builders in Finagle. This is achieved by requiring values for fields that should
* always be set in practice. For example, request timeouts in Finagle are unbounded when not
* explicitly set, and this method requires that timeout durations are passed into the method and
* set on the Finagle builder.
*
* Usage of
* [[com.twitter.inject.thrift.modules.ThriftMethodBuilderClientModule]] is almost always preferred,
* and the Product Mixer component library [[com.twitter.product_mixer.component_library.module]]
* package contains numerous examples. However, if multiple versions of a client are needed e.g.
* for different timeout settings, this method is useful to easily provide multiple variants.
*
* @example
* {{{
* final val SampleServiceClientName = "SampleServiceClient"
* @Provides
* @Singleton
* @Named(SampleServiceClientName)
* def provideSampleServiceClient(
* serviceIdentifier: ServiceIdentifier,
* clientId: ClientId,
* statsReceiver: StatsReceiver,
* ): SampleService.MethodPerEndpoint =
* buildFinagleMethodPerEndpoint[SampleService.ServicePerEndpoint, SampleService.MethodPerEndpoint](
* serviceIdentifier = serviceIdentifier,
* clientId = clientId,
* dest = "/s/sample/sample",
* label = "sample",
* statsReceiver = statsReceiver,
* idempotency = Idempotent(5.percent),
* timeoutPerRequest = 200.milliseconds,
* timeoutTotal = 400.milliseconds
* )
* }}}
* @param serviceIdentifier Service ID used to S2S Auth
* @param clientId Client ID
* @param dest Destination as a Wily path e.g. "/s/sample/sample"
* @param label Label of the client
* @param statsReceiver Stats
* @param idempotency Idempotency semantics of the client
* @param timeoutPerRequest Thrift client timeout per request. The Finagle default is
* unbounded which is almost never optimal.
* @param timeoutTotal Thrift client total timeout. The Finagle default is
* unbounded which is almost never optimal.
* If the client is set as idempotent, which adds a
* [[com.twitter.finagle.client.BackupRequestFilter]],
* be sure to leave enough room for the backup request. A
* reasonable (albeit usually too large) starting point is to
* make the total timeout 2x relative to the per request timeout.
* If the client is set as non-idempotent, the total timeout and
* the per request timeout should be the same, as there will be
* no backup requests.
* @param connectTimeout Thrift client transport connect timeout. The Finagle default
* of one second is reasonable but we lower this to match
* acquisitionTimeout for consistency.
* @param acquisitionTimeout Thrift client session acquisition timeout. The Finagle default
* is unbounded which is almost never optimal.
* @param protocolFactoryOverride Override the default protocol factory
* e.g. [[org.apache.thrift.protocol.TCompactProtocol.Factory]]
* @param servicePerEndpointBuilder implicit service per endpoint builder
* @param methodPerEndpointBuilder implicit method per endpoint builder
*
* @see [[https://twitter.github.io/finagle/guide/MethodBuilder.html user guide]]
* @see [[https://twitter.github.io/finagle/guide/MethodBuilder.html#idempotency user guide]]
* @return method per endpoint Finagle Thrift Client
*/
def buildFinagleMethodPerEndpoint[
ServicePerEndpoint <: Filterable[ServicePerEndpoint],
MethodPerEndpoint
](
serviceIdentifier: ServiceIdentifier,
clientId: ClientId,
dest: String,
label: String,
statsReceiver: StatsReceiver,
idempotency: Idempotency,
timeoutPerRequest: Duration,
timeoutTotal: Duration,
connectTimeout: Duration = 500.milliseconds,
acquisitionTimeout: Duration = 500.milliseconds,
protocolFactoryOverride: Option[TProtocolFactory] = None,
)(
implicit servicePerEndpointBuilder: ServicePerEndpointBuilder[ServicePerEndpoint],
methodPerEndpointBuilder: MethodPerEndpointBuilder[ServicePerEndpoint, MethodPerEndpoint]
): MethodPerEndpoint = {
val service: ServicePerEndpoint = buildFinagleServicePerEndpoint(
serviceIdentifier = serviceIdentifier,
clientId = clientId,
dest = dest,
label = label,
statsReceiver = statsReceiver,
idempotency = idempotency,
timeoutPerRequest = timeoutPerRequest,
timeoutTotal = timeoutTotal,
connectTimeout = connectTimeout,
acquisitionTimeout = acquisitionTimeout,
protocolFactoryOverride = protocolFactoryOverride
)
ThriftMux.Client.methodPerEndpoint(service)
}
/**
* Build a Finagle Thrift service per endpoint client.
*
* @note [[buildFinagleMethodPerEndpoint]] should be preferred over the service per endpoint variant
*
* @param serviceIdentifier Service ID used to S2S Auth
* @param clientId Client ID
* @param dest Destination as a Wily path e.g. "/s/sample/sample"
* @param label Label of the client
* @param statsReceiver Stats
* @param idempotency Idempotency semantics of the client
* @param timeoutPerRequest Thrift client timeout per request. The Finagle default is
* unbounded which is almost never optimal.
* @param timeoutTotal Thrift client total timeout. The Finagle default is
* unbounded which is almost never optimal.
* If the client is set as idempotent, which adds a
* [[com.twitter.finagle.client.BackupRequestFilter]],
* be sure to leave enough room for the backup request. A
* reasonable (albeit usually too large) starting point is to
* make the total timeout 2x relative to the per request timeout.
* If the client is set as non-idempotent, the total timeout and
* the per request timeout should be the same, as there will be
* no backup requests.
* @param connectTimeout Thrift client transport connect timeout. The Finagle default
* of one second is reasonable but we lower this to match
* acquisitionTimeout for consistency.
* @param acquisitionTimeout Thrift client session acquisition timeout. The Finagle default
* is unbounded which is almost never optimal.
* @param protocolFactoryOverride Override the default protocol factory
* e.g. [[org.apache.thrift.protocol.TCompactProtocol.Factory]]
*
* @return service per endpoint Finagle Thrift Client
*/
def buildFinagleServicePerEndpoint[ServicePerEndpoint <: Filterable[ServicePerEndpoint]](
serviceIdentifier: ServiceIdentifier,
clientId: ClientId,
dest: String,
label: String,
statsReceiver: StatsReceiver,
idempotency: Idempotency,
timeoutPerRequest: Duration,
timeoutTotal: Duration,
connectTimeout: Duration = 500.milliseconds,
acquisitionTimeout: Duration = 500.milliseconds,
protocolFactoryOverride: Option[TProtocolFactory] = None,
)(
implicit servicePerEndpointBuilder: ServicePerEndpointBuilder[ServicePerEndpoint]
): ServicePerEndpoint = {
val thriftMux: ThriftMux.Client = ThriftMux.client
.withMutualTls(serviceIdentifier)
.withClientId(clientId)
.withLabel(label)
.withStatsReceiver(statsReceiver)
.withTransport.connectTimeout(connectTimeout)
.withSession.acquisitionTimeout(acquisitionTimeout)
val protocolThriftMux: ThriftMux.Client = protocolFactoryOverride
.map { protocolFactory =>
thriftMux.withProtocolFactory(protocolFactory)
}.getOrElse(thriftMux)
val methodBuilder: MethodBuilder = protocolThriftMux
.methodBuilder(dest)
.withTimeoutPerRequest(timeoutPerRequest)
.withTimeoutTotal(timeoutTotal)
val idempotencyMethodBuilder: MethodBuilder = idempotency match {
case NonIdempotent => methodBuilder.nonIdempotent
case Idempotent(maxExtraLoad) => methodBuilder.idempotent(maxExtraLoad = maxExtraLoad)
}
idempotencyMethodBuilder.servicePerEndpoint[ServicePerEndpoint]
}
}