This commit is contained in:
maik101010 2023-05-22 17:38:53 -05:00 committed by GitHub
commit 2a8d8f8857
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -106,8 +106,6 @@ public class FeatureUpdateController implements FeatureUpdateService.ServiceIfac
@Override @Override
public Future<FeatureUpdateResponse> process(FeatureUpdateRequest featureUpdate) { public Future<FeatureUpdateResponse> process(FeatureUpdateRequest featureUpdate) {
long requestStartTimeMillis = clock.nowMillis();
// Export overall and per-client request rate stats // Export overall and per-client request rate stats
final String requestClientId; final String requestClientId;
if (featureUpdate.getRequestClientId() != null if (featureUpdate.getRequestClientId() != null
@ -130,7 +128,7 @@ public class FeatureUpdateController implements FeatureUpdateService.ServiceIfac
} }
ThriftIndexingEvent event = featureUpdate.getEvent(); ThriftIndexingEvent event = featureUpdate.getEvent();
return writeToKafka(event, requestStartTimeMillis) return writeToKafka(event)
.map(responsesList -> { .map(responsesList -> {
stats.clientResponse(requestClientId, FeatureUpdateResponseCode.SUCCESS); stats.clientResponse(requestClientId, FeatureUpdateResponseCode.SUCCESS);
// only when both Realtime & RealtimeCG succeed, then it will return a success flag // only when both Realtime & RealtimeCG succeed, then it will return a success flag
@ -162,8 +160,7 @@ public class FeatureUpdateController implements FeatureUpdateService.ServiceIfac
* The FeatureUpdateResponse is more like an ACK message, and the upstream (feature update ingester) * The FeatureUpdateResponse is more like an ACK message, and the upstream (feature update ingester)
* will not be affected much even if it failed (as long as the kafka message is written) * will not be affected much even if it failed (as long as the kafka message is written)
*/ */
private Future<List<BoxedUnit>> writeToKafka(ThriftIndexingEvent event, private Future<List<BoxedUnit>> writeToKafka(ThriftIndexingEvent event) {
long requestStartTimeMillis) {
return Futures.collect(Lists.newArrayList( return Futures.collect(Lists.newArrayList(
writeToKafkaInternal(event, WRITE_TO_KAFKA_DECIDER_KEY, droppedKafkaUpdateEvents, writeToKafkaInternal(event, WRITE_TO_KAFKA_DECIDER_KEY, droppedKafkaUpdateEvents,
kafkaUpdateEventsTopicName, -1, kafkaProducer), kafkaUpdateEventsTopicName, -1, kafkaProducer),