Compare commits

...

3 Commits

Author SHA1 Message Date
maik101010
2a8d8f8857
Merge 6c8a2cd9ad into fb54d8b549 2023-05-22 17:38:53 -05:00
michaelg
6c8a2cd9ad update: removed unused variable 2023-04-01 15:48:59 +01:00
michaelg
9bc72e82b2 update: removed unused variable 2023-04-01 15:45:02 +01:00

View File

@ -106,8 +106,6 @@ public class FeatureUpdateController implements FeatureUpdateService.ServiceIfac
@Override @Override
public Future<FeatureUpdateResponse> process(FeatureUpdateRequest featureUpdate) { public Future<FeatureUpdateResponse> process(FeatureUpdateRequest featureUpdate) {
long requestStartTimeMillis = clock.nowMillis();
// Export overall and per-client request rate stats // Export overall and per-client request rate stats
final String requestClientId; final String requestClientId;
if (featureUpdate.getRequestClientId() != null if (featureUpdate.getRequestClientId() != null
@ -130,7 +128,7 @@ public class FeatureUpdateController implements FeatureUpdateService.ServiceIfac
} }
ThriftIndexingEvent event = featureUpdate.getEvent(); ThriftIndexingEvent event = featureUpdate.getEvent();
return writeToKafka(event, requestStartTimeMillis) return writeToKafka(event)
.map(responsesList -> { .map(responsesList -> {
stats.clientResponse(requestClientId, FeatureUpdateResponseCode.SUCCESS); stats.clientResponse(requestClientId, FeatureUpdateResponseCode.SUCCESS);
// only when both Realtime & RealtimeCG succeed, then it will return a success flag // only when both Realtime & RealtimeCG succeed, then it will return a success flag
@ -162,8 +160,7 @@ public class FeatureUpdateController implements FeatureUpdateService.ServiceIfac
* The FeatureUpdateResponse is more like an ACK message, and the upstream (feature update ingester) * The FeatureUpdateResponse is more like an ACK message, and the upstream (feature update ingester)
* will not be affected much even if it failed (as long as the kafka message is written) * will not be affected much even if it failed (as long as the kafka message is written)
*/ */
private Future<List<BoxedUnit>> writeToKafka(ThriftIndexingEvent event, private Future<List<BoxedUnit>> writeToKafka(ThriftIndexingEvent event) {
long requestStartTimeMillis) {
return Futures.collect(Lists.newArrayList( return Futures.collect(Lists.newArrayList(
writeToKafkaInternal(event, WRITE_TO_KAFKA_DECIDER_KEY, droppedKafkaUpdateEvents, writeToKafkaInternal(event, WRITE_TO_KAFKA_DECIDER_KEY, droppedKafkaUpdateEvents,
kafkaUpdateEventsTopicName, -1, kafkaProducer), kafkaUpdateEventsTopicName, -1, kafkaProducer),