Skip to content

Commit e5ba9fd

Browse files
Merge pull request #1114 from byroot/response-buffer-clear
2 parents c603440 + 852de11 commit e5ba9fd

4 files changed

Lines changed: 67 additions & 49 deletions

File tree

.rubocop_todo.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ Metrics/MethodLength:
2323
Exclude:
2424
- 'lib/dalli/pipelined_getter.rb'
2525
- 'lib/dalli/protocol/base.rb'
26+
- 'lib/dalli/socket.rb'
2627

2728
# Offense count: 1
2829
# Configuration parameters: CountComments, CountAsOne.

lib/dalli/protocol/connection_manager.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -193,8 +193,8 @@ def flushed_write(bytes)
193193

194194
# Non-blocking read. Here to support the operation
195195
# of the get_multi operation
196-
def read_available
197-
@sock.read_available
196+
def read_available(...)
197+
@sock.read_available(...)
198198
end
199199

200200
def max_allowed_failures

lib/dalli/protocol/response_buffer.rb

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -23,22 +23,34 @@ def initialize(io_source, response_processor)
2323
end
2424

2525
def read
26-
@buffer << @io_source.read_available
26+
remaining_bytes = @buffer.bytesize - @offset
27+
if remaining_bytes.zero?
28+
@offset = 0
29+
@buffer = @io_source.read_available(@buffer)
30+
else
31+
if remaining_bytes > COMPACT_THRESHOLD && remaining_bytes < @buffer.bytesize
32+
# NB: we're freezing the old buffer before slicing so Ruby doesn't
33+
# have to allocate a third hidden string to be the Copy-on-Write onwer.
34+
@buffer = @buffer.freeze.byteslice(@offset..)
35+
@offset = 0
36+
end
37+
@buffer << @io_source.read_available
38+
end
2739
end
2840

2941
# Attempts to process a single response from the buffer,
3042
# advancing the offset past the consumed bytes.
3143
def process_single_getk_response
3244
bytes, status, cas, key, value = @response_processor.getk_response_from_buffer(@buffer, @offset)
3345
@offset += bytes
34-
compact_if_needed
3546
[status, cas, key, value]
3647
end
3748

3849
# Resets the internal buffer to an empty state,
3950
# so that we're ready to read pipelined responses
4051
def reset
41-
@buffer = ''.b
52+
@buffer&.clear
53+
@buffer ||= ''.b
4254
@offset = 0
4355
end
4456

@@ -54,25 +66,14 @@ def ensure_ready
5466

5567
# Clear the internal response buffer
5668
def clear
69+
@buffer&.clear
5770
@buffer = nil
5871
@offset = 0
5972
end
6073

6174
def in_progress?
6275
!@buffer.nil?
6376
end
64-
65-
private
66-
67-
# Only compact when we've consumed a significant portion of the buffer.
68-
# This avoids per-response string allocation while preventing unbounded
69-
# memory growth for large pipelines.
70-
def compact_if_needed
71-
return unless @offset > COMPACT_THRESHOLD && @offset > @buffer.bytesize / 2
72-
73-
@buffer = @buffer.byteslice(@offset..)
74-
@offset = 0
75-
end
7677
end
7778
end
7879
end

lib/dalli/socket.rb

Lines changed: 48 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -12,48 +12,64 @@ module Socket
1212
# Common methods for all socket implementations.
1313
##
1414
module InstanceMethods
15-
def readfull(count)
16-
value = String.new(capacity: count + 1)
17-
loop do
18-
result = read_nonblock(count - value.bytesize, exception: false)
19-
value << result if append_to_buffer?(result)
20-
break if value.bytesize == count
15+
def read_available(reusable_buffer = nil)
16+
if reusable_buffer
17+
value = read_nonblock(8196, reusable_buffer, exception: false)
18+
case value
19+
when :wait_writable, :wait_readable
20+
return reusable_buffer.clear
21+
when nil
22+
raise Errno::ECONNRESET, "Connection reset: #{logged_options.inspect}"
23+
end
24+
else
25+
value = ''.b
2126
end
22-
value
23-
end
2427

25-
def read_available
26-
value = +''
28+
buffer = ''.b
2729
loop do
28-
result = read_nonblock(8196, exception: false)
29-
break if WAIT_RCS.include?(result)
30-
raise Errno::ECONNRESET, "Connection reset: #{logged_options.inspect}" unless result
31-
32-
value << result
30+
result = read_nonblock(8196, buffer, exception: false)
31+
case result
32+
when :wait_writable, :wait_readable
33+
buffer.clear
34+
return value
35+
when nil
36+
raise Errno::ECONNRESET, "Connection reset: #{logged_options.inspect}"
37+
else
38+
value << result
39+
end
3340
end
34-
value
35-
end
36-
37-
WAIT_RCS = %i[wait_writable wait_readable].freeze
38-
39-
def append_to_buffer?(result)
40-
raise Timeout::Error, "IO timeout: #{logged_options.inspect}" if nonblock_timed_out?(result)
41-
raise Errno::ECONNRESET, "Connection reset: #{logged_options.inspect}" unless result
42-
43-
!WAIT_RCS.include?(result)
44-
end
45-
46-
def nonblock_timed_out?(result)
47-
return true if result == :wait_readable && !wait_readable(options[:socket_timeout])
48-
49-
# TODO: Do we actually need this? Looks to be only used in read_nonblock
50-
result == :wait_writable && !wait_writable(options[:socket_timeout])
5141
end
5242

5343
FILTERED_OUT_OPTIONS = %i[username password].freeze
5444
def logged_options
5545
options.except(*FILTERED_OUT_OPTIONS)
5646
end
47+
48+
# JRuby doesn't support IO#timeout=, so use custom readfull implementation
49+
# CRuby 3.3+ has IO#timeout= which makes IO#read work with timeouts
50+
if RUBY_ENGINE == 'jruby'
51+
# rubocop:disable Metrics/AbcSize
52+
def readfull(count)
53+
value = String.new(capacity: count + 1)
54+
55+
until value.bytesize == count
56+
result = read_nonblock(count - value.bytesize, exception: false)
57+
case result
58+
when :wait_readable
59+
wait_readable(options[:socket_timeout]) or raise Timeout::Error, "IO timeout: #{logged_options.inspect}"
60+
when :wait_writable
61+
wait_writable(options[:socket_timeout]) or raise Timeout::Error, "IO timeout: #{logged_options.inspect}"
62+
when nil
63+
raise Errno::ECONNRESET, "Connection reset: #{logged_options.inspect}"
64+
else
65+
value << result
66+
end
67+
end
68+
69+
value
70+
end
71+
# rubocop:enable Metrics/AbcSize
72+
end
5773
end
5874

5975
##

0 commit comments

Comments
 (0)