diff --git a/src/java/com/twitter/search/ingester/pipeline/twitter/RetrieveCardBatchedStage.java b/src/java/com/twitter/search/ingester/pipeline/twitter/RetrieveCardBatchedStage.java index 705c211c5..7c956d6fc 100644 --- a/src/java/com/twitter/search/ingester/pipeline/twitter/RetrieveCardBatchedStage.java +++ b/src/java/com/twitter/search/ingester/pipeline/twitter/RetrieveCardBatchedStage.java @@ -134,7 +134,7 @@ public class RetrieveCardBatchedStage extends TwitterBaseStage })); } - private Future> batchRetrieveURLs(Set keys) { + private Future> batchRetrieveURLs(List keys) { retrieveCardsTimer.start(); totalTweets.increment(keys.size()); @@ -145,7 +145,7 @@ public class RetrieveCardBatchedStage extends TwitterBaseStage GetTweetsRequest request = new GetTweetsRequest() .setOptions(options) - .setTweet_ids(new ArrayList<>(keys)); + .setTweet_ids(keys); return tweetyPieService.get_tweets(request) .onFailure(throwable -> { diff --git a/src/java/com/twitter/search/ingester/pipeline/util/BatchingClient.java b/src/java/com/twitter/search/ingester/pipeline/util/BatchingClient.java index 222c6f544..abcf82076 100644 --- a/src/java/com/twitter/search/ingester/pipeline/util/BatchingClient.java +++ b/src/java/com/twitter/search/ingester/pipeline/util/BatchingClient.java @@ -2,7 +2,7 @@ package com.twitter.search.ingester.pipeline.util; import java.util.HashSet; import java.util.Map; -import java.util.Set; +import java.util.List; import java.util.concurrent.ConcurrentHashMap; import com.google.common.collect.Sets; @@ -20,7 +20,7 @@ public class BatchingClient { /** * Issue a request to the underlying store which supports batches of requests. */ - Future> batchGet(Set requests); + Future> batchGet(List requests); } /** @@ -54,7 +54,7 @@ public class BatchingClient { } private void maybeBatchCall(RQ request) { - Set frozenRequests; + List frozenRequests; synchronized (unsentRequests) { unsentRequests.add(request); if (unsentRequests.size() < batchSize) { @@ -63,27 +63,25 @@ public class BatchingClient { // Make a copy of requests so we can modify it inside executeBatchCall without additional // synchronization. - frozenRequests = new HashSet<>(unsentRequests); + frozenRequests = Arrays.asList((RQ[]) unsentRequests.toArray()); unsentRequests.clear(); } executeBatchCall(frozenRequests); } - private void executeBatchCall(Set requests) { + private void executeBatchCall(List requests) { batchClient.batchGet(requests) .onSuccess(responseMap -> { - for (Map.Entry entry : responseMap.entrySet()) { - Promise promise = promises.remove(entry.getKey()); - if (promise != null) { - promise.become(Future.value(entry.getValue())); - } - } - - Set outstandingRequests = Sets.difference(requests, responseMap.keySet()); - for (RQ request : outstandingRequests) { + for (RQ request : requests) { Promise promise = promises.remove(request); - if (promise != null) { + if (promise == null) { + continue; + } + RP response = responseMap.get(request); + if (response != null) { + promise.become(Future.value(response)); + } else { promise.become(Future.exception(new ResponseNotReturnedException(request))); } } diff --git a/src/java/com/twitter/search/ingester/pipeline/util/ResponseNotReturnedException.java b/src/java/com/twitter/search/ingester/pipeline/util/ResponseNotReturnedException.java index ad58148cf..0fe0eeb40 100644 --- a/src/java/com/twitter/search/ingester/pipeline/util/ResponseNotReturnedException.java +++ b/src/java/com/twitter/search/ingester/pipeline/util/ResponseNotReturnedException.java @@ -2,6 +2,6 @@ package com.twitter.search.ingester.pipeline.util; public class ResponseNotReturnedException extends Exception { ResponseNotReturnedException(Object request) { - super("Response not returned in batch for request: " + request); + super("Response not returned in batch for request: " + request, null, false, false); } }