Skip to content
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions src/hxcoro/ds/pipelines/IPipeReader.hx
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package hxcoro.ds.pipelines;

import haxe.io.ArrayBufferView;

interface IPipeReader {
@:coroutine function waitForRead():Bool;
function tryRead(out:Out<ArrayBufferView>):Bool;
function tryReadAtLeast(count:Int, out:Out<ArrayBufferView>):Bool;
function advance(consumed:Int, observed:Int):Void;
Comment thread
Simn marked this conversation as resolved.
}
10 changes: 10 additions & 0 deletions src/hxcoro/ds/pipelines/IPipeWriter.hx
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package hxcoro.ds.pipelines;

import haxe.io.ArrayBufferView;

interface IPipeWriter {
function getBuffer(minimumSize:Int=0):ArrayBufferView;
function advance(count:Int):Void;
@:coroutine function flush():Void;
function close():Void;
}
28 changes: 28 additions & 0 deletions src/hxcoro/ds/pipelines/Pipe.hx
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package hxcoro.ds.pipelines;

import hxcoro.ds.pipelines.pipe.State;
import hxcoro.ds.pipelines.pipe.PipeReader;
import hxcoro.ds.pipelines.pipe.PipeWriter;

typedef PipeOptions = {
var ?writerPauseThreshold:Int;
var ?writerResumeThreshold:Int;
}

class Pipe {
public final reader : IPipeReader;
public final writer : IPipeWriter;

function new(reader, writer) {
this.reader = reader;
this.writer = writer;
}

public static function create(options:Null<PipeOptions> = null) {
final state = new State(options?.writerPauseThreshold ?? 1024, options?.writerResumeThreshold ?? 512);
final reader = new PipeReader(state);
final writer = new PipeWriter(state);

return new Pipe(reader, writer);
}
}
12 changes: 12 additions & 0 deletions src/hxcoro/ds/pipelines/PipeExtensions.hx
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package hxcoro.ds.pipelines;

import haxe.io.Bytes;

class PipeExtensions {
@:coroutine public static function write(writer:IPipeWriter, bytes:Bytes) {
final buffer = writer.getBuffer(bytes.length);
buffer.blit(buffer.byteOffset, bytes, 0, bytes.length);
Comment thread
Aidan63 marked this conversation as resolved.
Outdated
writer.advance(bytes.length);
writer.flush();
}
}
146 changes: 146 additions & 0 deletions src/hxcoro/ds/pipelines/pipe/PipeReader.hx
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
package hxcoro.ds.pipelines.pipe;

import haxe.Unit;
import haxe.Exception;
import haxe.io.ArrayBufferView;
import haxe.exceptions.ArgumentException;
import hxcoro.ds.Out;

using hxcoro.util.Convenience;

class PipeReader implements IPipeReader {
final state : State;
final readOut : Out<ArrayBufferView>;
var outstanding : Null<ArrayBufferView>;
var fullyObserved : Bool;

public function new(state : State) {
this.state = state;
this.readOut = new Out();
this.outstanding = null;
this.fullyObserved = false;
}

@:coroutine public function waitForRead() {
// I'm not sure if we should have the following checks or not,
// it seems to make sense in some cases but not others.

// // There is remaining data and the user has not specified any of it as "observed"
// if (outstanding != null && fullyObserved == false) {
// return true;
// }

return state.channel.reader.waitForRead();
}

public function tryRead(out:Out<ArrayBufferView>):Bool {
if (out == null) {
throw new ArgumentException("out", "Out parameter must not be null");
}

if (state.channel.reader.tryRead(readOut)) {
final chunk = readOut.get();

fullyObserved = false;

out.set(
if (outstanding == null) {
outstanding = chunk;
} else {
final newTotalSize = outstanding.byteLength + chunk.byteLength;
final newView = new ArrayBufferView(newTotalSize);

newView.buffer.blit(0, outstanding.buffer, outstanding.byteOffset, outstanding.byteLength);
newView.buffer.blit(outstanding.byteLength, chunk.buffer, chunk.byteOffset, chunk.byteLength);

outstanding = newView;
});

return true;
}

return if (outstanding != null && fullyObserved == false) {
fullyObserved = false;

out.set(outstanding);

true;
} else {
false;
}
}

public function tryReadAtLeast(bytes:Int, out:Out<ArrayBufferView>):Bool {
if (out == null) {
throw new ArgumentException("out", "Out parameter must not be null");
}
if (bytes <= 0) {
throw new ArgumentException("bytes", "Bytes must be greater than zero");
}

if (state.channel.reader.tryRead(readOut)) {
final chunk = readOut.get();

fullyObserved = false;

out.set(
if (outstanding == null) {
outstanding = chunk;
} else {
final newTotalSize = outstanding.byteLength + chunk.byteLength;
final newView = new ArrayBufferView(newTotalSize);

newView.buffer.blit(0, outstanding.buffer, outstanding.byteOffset, outstanding.byteLength);
newView.buffer.blit(outstanding.byteLength, chunk.buffer, chunk.byteOffset, chunk.byteLength);

outstanding = newView;
});

return outstanding.byteLength >= bytes;
}

return if (outstanding != null && fullyObserved == false && outstanding.byteLength >= bytes) {
fullyObserved = false;

out.set(outstanding);

true;
} else {
false;
}
}

public function advance(consumed:Int, observed:Int) {
if (outstanding == null) {
throw new Exception("No data has been read");
}
if (consumed < 0) {
throw new ArgumentException("consumed", "Consumed must not be negative");
}
if (consumed > outstanding.byteLength) {
throw new ArgumentException("consumed", "Consumed must not be greater than the read data");
}
if (observed < 0) {
throw new ArgumentException("observed", "Observed must not be negative");
}
if (consumed + observed > outstanding.byteLength) {
throw new ArgumentException("observed", "Observed must not be greater than the read data");
}

if (consumed == outstanding.byteLength) {
outstanding = null;
} else {
outstanding = outstanding.sub(consumed);
fullyObserved = outstanding.byteLength == observed;
}

state.count.add(-consumed);

state.lock.acquire();
if (state.count.load() <= state.writerResumeThreshold) {
state.suspendedWriter?.succeedAsync(Unit);
state.suspendedWriter = null;
}
state.lock.release();
}
}
84 changes: 84 additions & 0 deletions src/hxcoro/ds/pipelines/pipe/PipeWriter.hx
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package hxcoro.ds.pipelines.pipe;

import haxe.Exception;
import haxe.io.Bytes;
import haxe.io.BytesBuffer;
Comment thread
Aidan63 marked this conversation as resolved.
Outdated
import haxe.io.ArrayBufferView;
import haxe.exceptions.ArgumentException;

class PipeWriter implements IPipeWriter {
final state : State;

var current : Null<ArrayBufferView>;
var pending : Array<ArrayBufferView>;

public function new(state:State) {
this.state = state;
this.current = null;
this.pending = [];
}

public function getBuffer(minimumSize:Int = 0):ArrayBufferView {
if (minimumSize < 0) {
throw new ArgumentException("minimumSize", "Buffer size must be non negative");
}

if (current != null) {
throw new Exception("");
}
Comment thread
Aidan63 marked this conversation as resolved.

final actualSize = if (minimumSize == 0) 1024 else minimumSize;

return current = new ArrayBufferView(actualSize);
}

public function advance(count:Int) {
if (count < 0) {
throw new ArgumentException("count", "Count must be non negative");
}

switch current {
case null:
throw new Exception("");
case _:
Comment thread
Aidan63 marked this conversation as resolved.
Outdated
if (count == 0) {
Comment thread
Aidan63 marked this conversation as resolved.
Outdated
return;
}

@:nullSafety(Off) pending.push(current.sub(0, count));

Comment thread
Aidan63 marked this conversation as resolved.
Outdated
current = null;
}
}

@:coroutine public function flush():Void {
for (chunk in pending) {
state.count.add(chunk.byteLength);
state.channel.writer.write(chunk);
}

pending.resize(0);

state.lock.acquire();

if (state.writerPauseThreshold > 0 && state.count.load() >= state.writerPauseThreshold) {
suspendCancellable(cont -> {
state.suspendedWriter = cont;

state.lock.release();

_ -> {
state.lock.acquire();
state.suspendedWriter = null;
state.lock.release();
}
});
} else {
state.lock.release();
}
}

public function close() {
state.channel.writer.close();
}
}
26 changes: 26 additions & 0 deletions src/hxcoro/ds/pipelines/pipe/State.hx
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package hxcoro.ds.pipelines.pipe;

import haxe.atomic.AtomicInt;
import haxe.io.ArrayBufferView;
import haxe.Unit;
import haxe.coro.IContinuation;
import hxcoro.ds.channels.Channel;
import sys.thread.Mutex;

class State {
public var suspendedWriter : Null<IContinuation<Unit>>;
public final channel : Channel<ArrayBufferView>;
public final count : AtomicInt;
public final writerPauseThreshold : Int;
public final writerResumeThreshold : Int;
public final lock : Mutex;

public function new(writerPauseThreshold, writerResumeThreshold) {
Comment thread
Simn marked this conversation as resolved.
Outdated
this.writerPauseThreshold = writerPauseThreshold;
this.writerResumeThreshold = writerResumeThreshold;
this.suspendedWriter = null;
this.channel = Channel.createUnbounded({});
this.count = new AtomicInt(0);
this.lock = new Mutex();
}
}
Loading
Loading