Upload.java
package doss.net;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.WritableByteChannel;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.SynchronousQueue;
import doss.Blob;
import doss.BlobTx;
import doss.Writable;
/* XXX This is a hack just to get things working prior to redsign. The BlobStore
* put interface is overly simplistic and keeps all state on the stack. That doesn't
* work so great with the stateful RPC methods. For the time being we fork off a thread for
* each upload and pass it chunks of data as they come in.
*/
public class Upload {
private final Future<Blob> result;
private final SynchronousQueue<ByteBuffer> queue = new SynchronousQueue<>();
private final static ExecutorService pool = Executors.newCachedThreadPool();
private long bytesWritten = 0;
private final static ByteBuffer SENTINAL = ByteBuffer.allocate(1);
public Upload(final BlobTx tx) {
result = pool.submit(new Callable<Blob>() {
@Override
public Blob call() throws Exception {
return tx.put(new Writable() {
@Override
public void writeTo(WritableByteChannel channel)
throws IOException {
try {
while (true) {
ByteBuffer b = queue.take();
if (b == SENTINAL)
break;
while (b.remaining() > 0) {
int written = channel.write(b);
bytesWritten += written;
}
}
} catch (InterruptedException e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}
});
}
});
}
public void write(ByteBuffer data) {
try {
queue.put(data);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
public long finish() {
try {
try {
queue.put(SENTINAL);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return result.get().id();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
}
public long getBytesWritten() {
return bytesWritten;
}
}