Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 27 additions & 6 deletions src/main/java/hudson/remoting/FastPipedInputStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.PipedInputStream;
import java.lang.ref.Cleaner;
import java.lang.ref.WeakReference;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
* This class is equivalent to {@link PipedInputStream}. In the
Expand All @@ -39,6 +42,10 @@
*/
public class FastPipedInputStream extends InputStream {

private static final Logger LOGGER = Logger.getLogger(FastPipedInputStream.class.getName());

private static final Cleaner CLEANER = Cleaner.create();

final byte[] buffer;
/**
* Once closed, this is set to the stack trace of who closed it.
Expand All @@ -58,6 +65,7 @@ public class FastPipedInputStream extends InputStream {
*/
public FastPipedInputStream() {
this.buffer = new byte[0x10000];
CLEANER.register(this, new CleanupChecker(this.buffer));
}

/**
Expand All @@ -79,6 +87,7 @@ public FastPipedInputStream(FastPipedOutputStream source, int bufferSize) throws
connect(source);
}
this.buffer = new byte[bufferSize];
CLEANER.register(this, new CleanupChecker(this.buffer));
}

private void checkSource() throws IOException {
Expand Down Expand Up @@ -130,12 +139,6 @@ public void connect(FastPipedOutputStream source) throws IOException {
source.sink = new WeakReference<>(this);
}

@Override
protected void finalize() throws Throwable {
super.finalize();
close();
}

@Override
public void mark(int readLimit) {}

Expand Down Expand Up @@ -210,4 +213,22 @@ static final class ClosedBy extends Throwable {
super("The pipe was closed at...", error);
}
}

private static final class CleanupChecker implements Runnable {
private final byte[] buffer;

CleanupChecker(byte[] buffer) {
this.buffer = buffer;
}

@Override
public void run() {
if (buffer != null) {
synchronized (buffer) {
LOGGER.log(Level.WARNING, "FastPipedInputStream was not closed before being released");
buffer.notifyAll();
}
}
}
}
}
39 changes: 33 additions & 6 deletions src/main/java/hudson/remoting/FastPipedOutputStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@
import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.io.PipedOutputStream;
import java.lang.ref.Cleaner;
import java.lang.ref.WeakReference;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
* This class is equivalent to {@link PipedOutputStream}. In the
Expand All @@ -41,6 +44,10 @@
*/
public class FastPipedOutputStream extends OutputStream implements ErrorPropagatingOutputStream {

private static final Logger LOGGER = Logger.getLogger(FastPipedOutputStream.class.getName());

private static final Cleaner CLEANER = Cleaner.create();

WeakReference<FastPipedInputStream> sink;

private final Throwable allocatedAt = new Throwable();
Expand All @@ -50,6 +57,7 @@ public class FastPipedOutputStream extends OutputStream implements ErrorPropagat
*/
public FastPipedOutputStream() {
super();
CLEANER.register(this, new CleanupChecker(null));
}

/**
Expand All @@ -59,6 +67,7 @@ public FastPipedOutputStream() {
*/
public FastPipedOutputStream(FastPipedInputStream sink) throws IOException {
connect(sink);
CLEANER.register(this, new CleanupChecker(this.sink));
}

/**
Expand Down Expand Up @@ -114,12 +123,6 @@ public void connect(FastPipedInputStream sink) throws IOException {
sink.source = new WeakReference<>(this);
}

@Override
protected void finalize() throws Throwable {
super.finalize();
close();
}

@Override
@SuppressFBWarnings(
value = "NN_NAKED_NOTIFY",
Expand Down Expand Up @@ -202,4 +205,28 @@ public void write(@NonNull byte[] b, int off, int len) throws IOException {
}

static final int TIMEOUT = Integer.getInteger(FastPipedOutputStream.class.getName() + ".timeout", 10 * 1000);

private static final class CleanupChecker implements Runnable {
private final WeakReference<FastPipedInputStream> sink;

CleanupChecker(WeakReference<FastPipedInputStream> sink) {
this.sink = sink;
}

@Override
public void run() {
if (sink != null) {
FastPipedInputStream s = sink.get();
if (s != null) {
synchronized (s.buffer) {
if (s.closed == null) {
LOGGER.log(Level.WARNING, "FastPipedOutputStream was not closed before being released");
s.closed = new FastPipedInputStream.ClosedBy(null);
s.buffer.notifyAll();
}
}
}
}
}
}
}
69 changes: 56 additions & 13 deletions src/main/java/hudson/remoting/ProxyOutputStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.lang.ref.Cleaner;
import java.util.concurrent.ExecutionException;
import java.util.logging.Level;
import java.util.logging.Logger;
Expand All @@ -36,6 +37,11 @@
* {@link OutputStream} on a remote machine.
*/
final class ProxyOutputStream extends OutputStream implements ErrorPropagatingOutputStream {

private static final Logger LOGGER = Logger.getLogger(ProxyOutputStream.class.getName());

private static final Cleaner CLEANER = Cleaner.create();

private Channel channel;
private int oid;

Expand All @@ -52,13 +58,17 @@ final class ProxyOutputStream extends OutputStream implements ErrorPropagatingOu
*/
private Throwable error;

private final CleanupState cleanupState = new CleanupState();

/**
* Creates unconnected {@link ProxyOutputStream}.
* The returned stream accepts data right away, and
* when it's {@link #connect(Channel,int) connected} later,
* the data will be sent at once to the remote stream.
*/
public ProxyOutputStream() {}
public ProxyOutputStream() {
CLEANER.register(this, new CleanupChecker(cleanupState));
}

/**
* Creates an already connected {@link ProxyOutputStream}.
Expand All @@ -68,6 +78,7 @@ public ProxyOutputStream() {}
*/
public ProxyOutputStream(@NonNull Channel channel, int oid) throws IOException {
connect(channel, oid);
CLEANER.register(this, new CleanupChecker(cleanupState));
}

/**
Expand All @@ -82,6 +93,7 @@ synchronized void connect(@NonNull Channel channel, int oid) throws IOException
}
this.channel = channel;
this.oid = oid;
cleanupState.set(channel, oid);

window = channel.getPipeWindow(oid);

Expand Down Expand Up @@ -183,17 +195,7 @@ private void doClose(Throwable error) throws IOException {
channel.send(new EOF(channel.newIoId(), oid, error));
channel = null;
oid = -1;
}

@Override
protected void finalize() throws Throwable {
super.finalize();
// if we haven't done so, release the exported object on the remote side.
// if the object is auto-unexported, the export entry could have already been removed.
if (channel != null && oid != -1) {
channel.send(new Unexport(channel.newIoId(), oid));
oid = -1;
}
cleanupState.clear();
}

/**
Expand Down Expand Up @@ -464,5 +466,46 @@ public String toString() {
private static final long serialVersionUID = 1L;
}

private static final Logger LOGGER = Logger.getLogger(ProxyOutputStream.class.getName());
/**
* Holds cleanup state that can be accessed by the Cleaner without preventing garbage collection.
*/
private static final class CleanupState {
private Channel channel;
private int oid = -1;

synchronized void set(Channel channel, int oid) {
this.channel = channel;
this.oid = oid;
}

synchronized void clear() {
this.channel = null;
this.oid = -1;
}

synchronized void cleanup() {
if (channel != null && oid != -1) {
try {
channel.send(new Unexport(channel.newIoId(), oid));
} catch (Exception e) {
LOGGER.log(Level.WARNING, "Failed to unexport ProxyOutputStream", e);
}
channel = null;
oid = -1;
}
}
}

private static final class CleanupChecker implements Runnable {
private final CleanupState state;

CleanupChecker(CleanupState state) {
this.state = state;
}

@Override
public void run() {
state.cleanup();
}
}
}
Loading
Loading