Skip to content

Commit 1fc57b9

Browse files
authored
fix: update gRPC ReadObject retry to avoid double retry (#2765)
Prevent adding an unavailable error to the queue if the retrying loop is still active. Also, cleanup unused retry config for gRPC ReadObject.
1 parent 6829eb4 commit 1fc57b9

File tree

2 files changed

+19
-10
lines changed

2 files changed

+19
-10
lines changed

‎google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedReadableByteChannel.java

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import java.util.concurrent.TimeoutException;
4848
import java.util.concurrent.atomic.AtomicLong;
4949
import org.checkerframework.checker.nullness.qual.NonNull;
50+
import org.checkerframework.checker.nullness.qual.Nullable;
5051

5152
final class GapicUnbufferedReadableByteChannel
5253
implements UnbufferedReadableByteChannel, ScatteringByteChannel {
@@ -258,6 +259,11 @@ ApiFuture<Object> getResult() {
258259

259260
private void ensureStreamOpen() {
260261
if (readObjectObserver == null) {
262+
java.lang.Object peek = queue.peek();
263+
if (peek instanceof Throwable || peek == EOF_MARKER) {
264+
// If our queue has an error or EOF, do not send another request
265+
return;
266+
}
261267
readObjectObserver =
262268
Retrying.run(
263269
retryingDeps,
@@ -326,13 +332,15 @@ protected void onResponseImpl(ReadObjectResponse response) {
326332

327333
@Override
328334
protected void onErrorImpl(Throwable t) {
329-
open.setException(t);
330-
if (!alg.shouldRetry(t, null)) {
331-
result.setException(StorageException.coalesce(t));
332-
}
333335
if (t instanceof CancellationException) {
334336
cancellation.set(t);
335337
}
338+
if (!open.isDone()) {
339+
open.setException(t);
340+
if (!alg.shouldRetry(t, null)) {
341+
result.setException(StorageException.coalesce(t));
342+
}
343+
}
336344
try {
337345
queue.offer(t);
338346
} catch (InterruptedException e) {
@@ -369,6 +377,11 @@ public boolean nonEmpty() {
369377
return !queue.isEmpty();
370378
}
371379

380+
@Nullable
381+
public T peek() {
382+
return queue.peek();
383+
}
384+
372385
@NonNull
373386
public T poll() throws InterruptedException {
374387
return queue.take();

‎google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -723,8 +723,7 @@ public GrpcBlobReadChannel reader(String bucket, String blob, BlobSourceOption..
723723
public GrpcBlobReadChannel reader(BlobId blob, BlobSourceOption... options) {
724724
Opts<ObjectSourceOpt> opts = Opts.unwrap(options).resolveFrom(blob).prepend(defaultOpts);
725725
ReadObjectRequest request = getReadObjectRequest(blob, opts);
726-
Set<StatusCode.Code> codes = resultRetryAlgorithmToCodes(retryAlgorithmManager.getFor(request));
727-
GrpcCallContext grpcCallContext = Retrying.newCallContext().withRetryableCodes(codes);
726+
GrpcCallContext grpcCallContext = Retrying.newCallContext();
728727

729728
return new GrpcBlobReadChannel(
730729
storageClient.readObjectCallable().withDefaultCallContext(grpcCallContext),
@@ -1708,10 +1707,7 @@ private UnbufferedReadableByteChannelSession<Object> unbufferedReadSession(
17081707

17091708
Opts<ObjectSourceOpt> opts = Opts.unwrap(options).resolveFrom(blob).prepend(defaultOpts);
17101709
ReadObjectRequest readObjectRequest = getReadObjectRequest(blob, opts);
1711-
Set<StatusCode.Code> codes =
1712-
resultRetryAlgorithmToCodes(retryAlgorithmManager.getFor(readObjectRequest));
1713-
GrpcCallContext grpcCallContext =
1714-
opts.grpcMetadataMapper().apply(Retrying.newCallContext().withRetryableCodes(codes));
1710+
GrpcCallContext grpcCallContext = opts.grpcMetadataMapper().apply(Retrying.newCallContext());
17151711
return ResumableMedia.gapic()
17161712
.read()
17171713
.byteChannel(

0 commit comments

Comments
 (0)