mirror of
https://github.com/twitter/the-algorithm.git
synced 2025-01-25 02:11:19 +01:00
617c8c787d
Unified User Action (UUA) is a centralized, real-time stream of user actions on Twitter, consumed by various product, ML, and marketing teams. UUA makes sure all internal teams consume the uniformed user actions data in an accurate and fast way.
175 lines
5.3 KiB
Plaintext
175 lines
5.3 KiB
Plaintext
import os
|
|
import itertools
|
|
import subprocess
|
|
import math
|
|
|
|
SERVICE_NAME = 'uua-client-event'
|
|
|
|
CPU_NUM = 3
|
|
HEAP_SIZE = 3 * GB
|
|
RAM_SIZE = HEAP_SIZE + 1 * GB
|
|
# We make disk size larger than HEAP so that if we ever need to do a heap dump, it will fit on disk.
|
|
DISK_SIZE = HEAP_SIZE + 2 * GB
|
|
|
|
class Profile(Struct):
|
|
package = Default(String, SERVICE_NAME)
|
|
cmdline_flags = Default(String, '')
|
|
log_level = Default(String, 'INFO')
|
|
instances = Default(Integer, 1000)
|
|
kafka_bootstrap_servers = Default(String, '/s/kafka/client-events:kafka-tls')
|
|
kafka_bootstrap_servers_remote_dest = Default(String, '/s/kafka/bluebird-1:kafka-tls')
|
|
source_topic = Default(String, 'client_event')
|
|
sink_topics = Default(String, 'unified_user_actions,unified_user_actions_engagements')
|
|
decider_overlay = Default(String, '')
|
|
|
|
resources = Resources(
|
|
cpu = CPU_NUM,
|
|
ram = RAM_SIZE,
|
|
disk = DISK_SIZE
|
|
)
|
|
|
|
install = Packer.install(
|
|
name = '{{profile.package}}',
|
|
version = Workflows.package_version()
|
|
)
|
|
|
|
async_profiler_install = Packer.install(
|
|
name = 'async-profiler',
|
|
role = 'csl-perf',
|
|
version = 'latest'
|
|
)
|
|
|
|
setup_jaas_config = Process(
|
|
name = 'setup_jaas_config',
|
|
cmdline = '''
|
|
mkdir -p jaas_config
|
|
echo "KafkaClient {
|
|
com.sun.security.auth.module.Krb5LoginModule required
|
|
principal=\\"discode@TWITTER.BIZ\\"
|
|
useKeyTab=true
|
|
storeKey=true
|
|
keyTab=\\"/var/lib/tss/keys/fluffy/keytabs/client/discode.keytab\\"
|
|
doNotPrompt=true;
|
|
};" >> jaas_config/jaas.conf
|
|
'''
|
|
)
|
|
|
|
main = JVMProcess(
|
|
name = SERVICE_NAME,
|
|
jvm = Java11(
|
|
heap = HEAP_SIZE,
|
|
extra_jvm_flags =
|
|
'-Djava.net.preferIPv4Stack=true'
|
|
|
|
' -XX:MaxMetaspaceSize=536870912'
|
|
' -XX:+UseNUMA'
|
|
' -XX:+AggressiveOpts'
|
|
' -XX:+PerfDisableSharedMem' # http://www.evanjones.ca/jvm-mmap-pause.html
|
|
|
|
' -Dlog_level={{profile.log_level}}'
|
|
' -Dlog.access.output=access.log'
|
|
' -Dlog.service.output={{name}}.log'
|
|
' -Djava.security.auth.login.config=jaas_config/jaas.conf'
|
|
),
|
|
arguments =
|
|
'-jar {{name}}-bin.jar'
|
|
' -admin.port=:{{thermos.ports[health]}}'
|
|
' -kafka.bootstrap.servers={{profile.kafka_bootstrap_servers}}'
|
|
' -kafka.bootstrap.servers.remote.dest={{profile.kafka_bootstrap_servers_remote_dest}}'
|
|
' -kafka.group.id={{name}}-{{environment}}'
|
|
' -kafka.producer.client.id={{name}}-{{environment}}'
|
|
' -kafka.max.pending.requests=10000'
|
|
# CE events is about 0.4-0.6kb per message on the consumer side. A fetch size of 6~18 MB get us
|
|
# about 10k ~ 20k of messages per batch. This fits the size of our pending requests queue and
|
|
# within the limit of the max poll records.
|
|
' -kafka.consumer.fetch.max=9.megabytes'
|
|
' -kafka.consumer.fetch.min=3.megabytes'
|
|
' -kafka.max.poll.records=40000'
|
|
' -kafka.commit.interval=20.seconds'
|
|
' -kafka.producer.batch.size=4.megabytes'
|
|
' -kafka.producer.buffer.mem=64.megabytes'
|
|
' -kafka.producer.linger=100.millisecond'
|
|
' -kafka.producer.request.timeout=30.seconds'
|
|
' -kafka.producer.compression.type=lz4'
|
|
' -kafka.worker.threads=4'
|
|
' -kafka.source.topic={{profile.source_topic}}'
|
|
' -kafka.sink.topics={{profile.sink_topics}}'
|
|
' -decider.base=decider.yml'
|
|
' -decider.overlay={{profile.decider_overlay}}'
|
|
' -cluster={{cluster}}'
|
|
' {{profile.cmdline_flags}}',
|
|
resources = resources
|
|
)
|
|
|
|
stats = Stats(
|
|
library = 'metrics',
|
|
port = 'admin'
|
|
)
|
|
|
|
job_template = Service(
|
|
name = SERVICE_NAME,
|
|
role = 'discode',
|
|
instances = '{{profile.instances}}',
|
|
contact = 'disco-data-eng@twitter.com',
|
|
constraints = {'rack': 'limit:1', 'host': 'limit:1'},
|
|
announce = Announcer(
|
|
primary_port = 'health',
|
|
portmap = {'aurora': 'health', 'admin': 'health'}
|
|
),
|
|
task = Task(
|
|
resources = resources,
|
|
name = SERVICE_NAME,
|
|
processes = [async_profiler_install, install, setup_jaas_config, main, stats],
|
|
constraints = order(async_profiler_install, install, setup_jaas_config, main)
|
|
),
|
|
health_check_config = HealthCheckConfig(
|
|
initial_interval_secs = 100,
|
|
interval_secs = 60,
|
|
timeout_secs = 60,
|
|
max_consecutive_failures = 4
|
|
),
|
|
update_config = UpdateConfig(
|
|
batch_size = 1000,
|
|
watch_secs = 90,
|
|
max_per_shard_failures = 3,
|
|
max_total_failures = 0,
|
|
rollback_on_failure = False
|
|
)
|
|
)
|
|
|
|
PRODUCTION = Profile(
|
|
# go/uua-decider
|
|
decider_overlay = '/usr/local/config/overlays/discode-default/UnifiedUserActions/prod/{{cluster}}/decider_overlay.yml'
|
|
)
|
|
|
|
STAGING = Profile(
|
|
package = SERVICE_NAME+'-staging',
|
|
cmdline_flags = '',
|
|
kafka_bootstrap_servers_remote_dest = '/s/kafka/custdevel:kafka-tls',
|
|
decider_overlay = '/usr/local/config/overlays/discode-default/UnifiedUserActions/staging/{{cluster}}/decider_overlay.yml' # go/uua-decider
|
|
)
|
|
|
|
DEVEL = STAGING(
|
|
log_level = 'INFO',
|
|
)
|
|
|
|
|
|
prod_job = job_template(
|
|
tier = 'preferred',
|
|
environment = 'prod',
|
|
).bind(profile = PRODUCTION)
|
|
|
|
staging_job = job_template(
|
|
environment = 'staging'
|
|
).bind(profile = STAGING)
|
|
|
|
devel_job = job_template(
|
|
environment = 'devel'
|
|
).bind(profile = DEVEL)
|
|
|
|
jobs = []
|
|
for cluster in ['atla', 'pdxa']:
|
|
jobs.append(prod_job(cluster = cluster))
|
|
jobs.append(staging_job(cluster = cluster))
|
|
jobs.append(devel_job(cluster = cluster))
|