Connection.java
package doss.net;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SeekableByteChannel;
import java.nio.file.Paths;
import java.security.NoSuchAlgorithmException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.thrift.TException;
import org.apache.thrift.server.ServerContext;
import doss.Blob;
import doss.BlobStore;
import doss.BlobTx;
import doss.NoSuchBlobException;
import doss.NoSuchBlobTxException;
/**
* Server's view of a DOSS connection.
*/
class Connection implements DossService.Iface, ServerContext {
private final BlobStore blobStore;
private final Map<Long, Upload> uploads = new HashMap<>();
private long nextUploadId = 0;
Connection(BlobStore blobStore) {
this.blobStore = blobStore;
}
@Override
public StatResponse stat(long blobId) throws TException {
try {
return statResponse(blobStore.get(blobId));
} catch (NoSuchBlobException e) {
throw new RemoteNoSuchBlobException().setBlobId(blobId);
} catch (IOException e) {
throw buildIOException(blobId, e);
}
}
private StatResponse statResponse(Blob blob)
throws IOException {
return new StatResponse()
.setBlobId(blob.id())
.setCreatedMillis(blob.created().toMillis())
.setSize(blob.size());
}
@Override
public StatResponse statLegacy(String legacyPath)
throws RemoteNoSuchBlobException, RemoteIOException, TException {
try {
return statResponse(blobStore.getLegacy(Paths.get(legacyPath)));
} catch (NoSuchBlobException e) {
throw new RemoteNoSuchBlobException();
} catch (IOException e) {
throw buildIOException(-1, e);
}
}
@Override
public ByteBuffer read(long blobId, long offset, int length)
throws TException {
try {
ByteBuffer b = ByteBuffer.allocate(length);
Blob blob = blobStore.get(blobId);
if (offset > blob.size() || offset < 0) {
throw new IOException("out of bounds read " + offset + " > "
+ blob.size());
}
try (SeekableByteChannel channel = blob.openChannel()) {
channel.position(offset);
channel.read(b);
}
b.flip();
if (b.remaining() < length) {
throw new IOException("read requested " + length
+ " bytes but only received " + b.remaining());
}
return b;
} catch (NoSuchBlobException e) {
throw new RemoteNoSuchBlobException().setBlobId(blobId);
} catch (IOException e) {
throw buildIOException(blobId, e);
}
}
@Override
public long beginTx() throws TException {
return blobStore.begin().id();
}
private RemoteIOException buildIOException(long blobId, Exception e) {
return new RemoteIOException()
.setBlobId(blobId)
.setType(e.getClass().getName())
.setMesssage(e.getMessage());
}
@Override
public void commitTx(long txId) throws TException {
try {
blobStore.resume(txId).commit();
} catch (NoSuchBlobTxException | IOException e) {
throw new RuntimeException(e);
}
}
@Override
public void rollbackTx(long txId) throws TException {
try {
abortUploads();
blobStore.resume(txId).rollback();
} catch (NoSuchBlobTxException | IOException e) {
throw new RuntimeException(e);
}
}
@Override
public void prepareTx(long txId) throws TException {
try {
blobStore.resume(txId).prepare();
} catch (NoSuchBlobTxException | IOException e) {
throw new RuntimeException(e);
}
}
/**
* Called when the client disconnects, cleanup any state.
*/
public void disconnect() {
System.out.println("Disconnected " + this);
abortUploads();
}
private void abortUploads() {
// cleanup any unfinished uploads
for (Upload upload : uploads.values()) {
upload.finish();
}
}
@Override
public long beginPut(long txId) throws TException {
BlobTx tx = blobStore.resume(txId);
long id = nextUploadId++;
uploads.put(id, new Upload(tx));
return id;
}
@Override
public void write(long handle, ByteBuffer data) throws TException {
uploads.get(handle).write(data);
}
@Override
public long finishPut(long handle) throws TException {
return uploads.remove(handle).finish();
}
@Override
public String digest(long blobId, String algorithm) throws RemoteNoSuchBlobException, RemoteIOException, TException {
try {
return blobStore.get(blobId).digest(algorithm);
} catch (NoSuchBlobException e) {
throw new RemoteNoSuchBlobException().setBlobId(blobId);
} catch (IOException | NoSuchAlgorithmException e) {
throw buildIOException(blobId, e);
}
}
@Override
public List<String> verify(long blobId) throws RemoteNoSuchBlobException, RemoteIOException, TException {
try {
return blobStore.get(blobId).verify();
} catch (NoSuchBlobException e) {
throw new RemoteNoSuchBlobException().setBlobId(blobId);
} catch (IOException e) {
throw buildIOException(blobId, e);
}
}
}