RemoteBlobStore.java

package doss.net;

import java.io.IOException;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.WritableByteChannel;
import java.nio.file.Path;

import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;

import doss.Blob;
import doss.BlobStore;
import doss.BlobTx;
import doss.NoSuchBlobException;
import doss.NoSuchBlobTxException;
import doss.Writable;
import doss.core.ManagedTransaction;
import doss.core.Transaction;
import doss.core.Writables;

public class RemoteBlobStore implements BlobStore {
    private final DossService.Client client;
    private final TTransport transport;

    RemoteBlobStore(Socket socket) throws IOException,
            TTransportException {
        transport = new TSocket(socket);
        TProtocol protocol = new TBinaryProtocol(transport);
        client = new DossService.Client(protocol);
    }

    public static RemoteBlobStore open(Socket socket) throws IOException {
        try {
            return new RemoteBlobStore(socket);
        } catch (TTransportException e) {
            throw new IOException(e);
        }
    }

    @Override
    public synchronized Blob get(long blobId) throws NoSuchBlobException,
            IOException {
        try {
            return new RemoteBlob(client, client.stat(blobId));
        } catch (RemoteNoSuchBlobException e) {
            throw new NoSuchBlobException(e.getBlobId());
        } catch (TException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public Blob getLegacy(Path legacyPath) throws NoSuchBlobException,
            IOException {
        try {
            return new RemoteBlob(client, client.statLegacy(legacyPath
                    .toString()));
        } catch (RemoteNoSuchBlobException e) {
            throw new NoSuchBlobException(e.getBlobId());
        } catch (TException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public BlobTx begin() {
        try {
            return new RemoteBlobTx(client.beginTx());
        } catch (TException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public BlobTx resume(long txId) throws NoSuchBlobTxException {
        // TODO: check if tx exists first?
        return new RemoteBlobTx(txId);
    }

    @Override
    public void close() {
        transport.close();
    }

    public static int transfer(ByteBuffer src, ByteBuffer dest) {
        int n = Math.min(dest.remaining(), src.remaining());
        if (n > 0) {
            ByteBuffer tmp = src.duplicate();
            tmp.limit(src.position() + n);
            dest.put(tmp);
            src.position(src.position() + n);
        }
        return n;
    }

    private class RemoteBlobTx extends ManagedTransaction implements BlobTx {
        private final long id;

        RemoteBlobTx(long id) {
            this.id = id;
        }

        @Override
        public long id() {
            return id;
        }

        @Override
        public Blob put(Writable output) throws IOException {
            try {
                final long putHandle = client.beginPut(id);
                output.writeTo(new WritableByteChannel() {

                    @Override
                    public boolean isOpen() {
                        return true;
                    }

                    @Override
                    public void close() throws IOException {
                    }

                    @Override
                    public int write(ByteBuffer b) throws IOException {
                        int nbytes = b.remaining();
                        try {
                            client.write(putHandle, b);
                            b.position(b.limit());
                        } catch (TException e) {
                            throw new RuntimeException(e);
                        }
                        return nbytes;
                    }
                });
                return get(client.finishPut(putHandle));
            } catch (TException e) {
                throw new RuntimeException(e);
            }
        }

        @Override
        public Blob put(Path source) throws IOException {
            return put(Writables.wrap(source));
        }

        @Override
        public Blob put(byte[] bytes) throws IOException {
            return put(Writables.wrap(bytes));
        }

        @Override
        protected Transaction getCallbacks() {
            return new Transaction() {

                @Override
                public void rollback() throws IOException {
                    try {
                        client.rollbackTx(id);
                    } catch (TException e) {
                        throw new RuntimeException(e);
                    }
                }

                @Override
                public void prepare() throws IOException {
                    try {
                        client.prepareTx(id);
                    } catch (TException e) {
                        throw new RuntimeException(e);
                    }
                }

                @Override
                public void commit() throws IOException {
                    try {
                        client.commitTx(id);
                    } catch (TException e) {
                        throw new RuntimeException(e);
                    }
                }

                @Override
                public void close() throws IOException {
                }
            };
        }

    }
}