twitter-team 617c8c787d Open-sourcing Unified User Actions
Unified User Action (UUA) is a centralized, real-time stream of user actions on Twitter, consumed by various product, ML, and marketing teams. UUA makes sure all internal teams consume the uniformed user actions data in an accurate and fast way.
2023-04-14 16:45:37 -05:00

152 lines
4.0 KiB
Plaintext

import os
import itertools
import subprocess
import math
SERVICE_NAME = 'uua-enricher'
CPU_NUM = 3
HEAP_SIZE = 6 * GB
RAM_SIZE = 8 * GB
DISK_SIZE = 3 * GB
class Profile(Struct):
package = Default(String, SERVICE_NAME)
cmdline_flags = Default(String, '')
log_level = Default(String, 'INFO')
instances = Default(Integer, 10)
kafka_bootstrap_servers = Default(String, '/s/kafka/bluebird-1:kafka-tls')
resources = Resources(
cpu = CPU_NUM,
ram = RAM_SIZE,
disk = DISK_SIZE
)
install = Packer.install(
name = '{{profile.package}}',
version = Workflows.package_version()
)
async_profiler_install = Packer.install(
name = 'async-profiler',
role = 'csl-perf',
version = 'latest'
)
setup_jaas_config = Process(
name = 'setup_jaas_config',
cmdline = '''
mkdir -p jaas_config
echo "KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
principal=\\"discode@TWITTER.BIZ\\"
useKeyTab=true
storeKey=true
keyTab=\\"/var/lib/tss/keys/fluffy/keytabs/client/discode.keytab\\"
doNotPrompt=true;
};" >> jaas_config/jaas.conf
'''
)
main = JVMProcess(
name = SERVICE_NAME,
jvm = Java11(
heap = HEAP_SIZE,
extra_jvm_flags =
'-Djava.net.preferIPv4Stack=true'
' -XX:+UseNUMA'
' -XX:+AggressiveOpts'
' -XX:+PerfDisableSharedMem' # http://www.evanjones.ca/jvm-mmap-pause.html
' -Dlog_level={{profile.log_level}}'
' -Dlog.access.output=access.log'
' -Dlog.service.output={{name}}.log'
' -Djava.security.auth.login.config=jaas_config/jaas.conf'
),
arguments =
'-jar {{name}}-bin.jar'
' -admin.port=:{{thermos.ports[health]}}'
' -kafka.bootstrap.servers={{profile.kafka_bootstrap_servers}}'
' -kafka.application.id={{name}}.{{environment}}'
' -kafka.application.num.instances={{instances}}' # Used for static partitioning
' -kafka.application.server={{mesos.instance}}.{{name}}.{{environment}}.{{role}}.service.{{cluster}}.twitter.com:80'
' -com.twitter.finatra.kafkastreams.config.principal={{role}}'
' -thrift.client.id={{name}}.{{environment}}'
' -service.identifier="{{role}}:{{name}}:{{environment}}:{{cluster}}"'
' -local.cache.ttl.seconds=86400'
' -local.cache.max.size=400000000'
' {{profile.cmdline_flags}}',
resources = resources
)
stats = Stats(
library = 'metrics',
port = 'admin'
)
job_template = Service(
name = SERVICE_NAME,
role = 'discode',
instances = '{{profile.instances}}',
contact = 'disco-data-eng@twitter.com',
constraints = {'rack': 'limit:1', 'host': 'limit:1'},
announce = Announcer(
primary_port = 'health',
portmap = {'aurora': 'health', 'admin': 'health'}
),
task = Task(
resources = resources,
name = SERVICE_NAME,
processes = [async_profiler_install, install, setup_jaas_config, main, stats],
constraints = order(async_profiler_install, install, setup_jaas_config, main)
),
health_check_config = HealthCheckConfig(
initial_interval_secs = 100,
interval_secs = 60,
timeout_secs = 60,
max_consecutive_failures = 4
),
update_config = UpdateConfig(
batch_size = 50,
watch_secs = 90,
max_per_shard_failures = 3,
max_total_failures = 0,
rollback_on_failure = False
)
)
PRODUCTION = Profile(
)
STAGING = Profile(
package = SERVICE_NAME+'-staging',
cmdline_flags = '',
kafka_bootstrap_servers = '/s/kafka/custdevel:kafka-tls'
)
DEVEL = STAGING(
log_level = 'DEBUG',
)
prod_job = job_template(
tier = 'preferred',
environment = 'prod',
).bind(profile = PRODUCTION)
staging_job = job_template(
environment = 'staging'
).bind(profile = STAGING)
devel_job = job_template(
environment = 'devel'
).bind(profile = DEVEL)
jobs = []
for cluster in ['atla', 'pdxa']:
jobs.append(prod_job(cluster = cluster))
jobs.append(staging_job(cluster = cluster))
jobs.append(devel_job(cluster = cluster))