From e734b4a068d0eb615de60f3e3c867eeb214a4c3f Mon Sep 17 00:00:00 2001 From: Boris Nagaev Date: Wed, 26 Nov 2025 20:09:36 -0300 Subject: [PATCH] hashmail: block until stream is freed Fix flaky tests. Reproducer: go test -run TestHashMailServerReturnStream -count=20 TestHashMailServerReturnStream fails because the test cancels a read stream and immediately dials RecvStream again expecting the same stream to be handed out once the server returns it. The hashmail server implemented RequestReadStream/RequestWriteStream with a non-blocking channel poll and returned "read/write stream occupied" as soon as the mailbox was busy. That raced with the deferred ReturnStream call and the reconnect often happened before the stream got pushed back, so clients received the occupancy error instead of the context cancellation they triggered. Teach RequestReadStream/RequestWriteStream to wait for the stream to become available (or the caller's context / server shutdown) with a bounded timeout. If the wait expires we still return the "... stream occupied" error, so callers that legitimately pile up can see that signal. The new streamAcquireTimeout constant documents the policy, and the blocking select removes the race, so reconnect attempts now either succeed or surface the original context error. --- hashmail_server.go | 24 ++++++++++++++++++++++-- hashmail_server_test.go | 5 ++++- 2 files changed, 26 insertions(+), 3 deletions(-) diff --git a/hashmail_server.go b/hashmail_server.go index 0c03276..bbb3e46 100644 --- a/hashmail_server.go +++ b/hashmail_server.go @@ -39,6 +39,12 @@ const ( // reads for it to be considered for pruning. Otherwise, memory will grow // unbounded. streamTTL = 24 * time.Hour + + // streamAcquireTimeout determines how long we wait for a read/write + // stream to become available before reporting it as occupied. Context + // cancellation is still honoured immediately, so callers can shorten + // the wait. + streamAcquireTimeout = 250 * time.Millisecond ) // streamID is the identifier of a stream. @@ -317,7 +323,14 @@ func (s *stream) RequestReadStream(ctx context.Context) (*readStream, error) { case r := <-s.readStreamChan: s.status.streamTaken(true) return r, nil - default: + + case <-s.quit: + return nil, fmt.Errorf("stream shutting down") + + case <-ctx.Done(): + return nil, ctx.Err() + + case <-time.After(streamAcquireTimeout): return nil, fmt.Errorf("read stream occupied") } } @@ -332,7 +345,14 @@ func (s *stream) RequestWriteStream(ctx context.Context) (*writeStream, error) { case w := <-s.writeStreamChan: s.status.streamTaken(false) return w, nil - default: + + case <-s.quit: + return nil, fmt.Errorf("stream shutting down") + + case <-ctx.Done(): + return nil, ctx.Err() + + case <-time.After(streamAcquireTimeout): return nil, fmt.Errorf("write stream occupied") } } diff --git a/hashmail_server_test.go b/hashmail_server_test.go index 79909d4..a86aabf 100644 --- a/hashmail_server_test.go +++ b/hashmail_server_test.go @@ -512,7 +512,10 @@ func recvFromStream(client hashmailrpc.HashMailClient) error { }() select { - case <-time.After(time.Second): + // Wait a little longer than the server's stream-acquire timeout so we + // only trip this path when the server truly failed to hand over the + // stream (instead of beating it to the punch). + case <-time.After(2 * streamAcquireTimeout): return fmt.Errorf("timed out waiting to receive from receive " + "stream")