LocalBlobStore.java

package doss.local;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.nio.file.NotDirectoryException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import doss.Blob;
import doss.BlobStore;
import doss.BlobTx;
import doss.CorruptBlobStoreException;
import doss.NoSuchBlobException;
import doss.NoSuchBlobTxException;
import doss.Writable;
import doss.core.ManagedTransaction;
import doss.core.Transaction;
import doss.core.Writables;

public class LocalBlobStore implements BlobStore {

    final Symlinker symlinker;
    final Database db;
    final Map<Long, BlobTx> txs = new ConcurrentHashMap<>();
    final Path rootDir;
    final List<Area> areas = new ArrayList<>();
    final Area stagingArea;

    private LocalBlobStore(Path rootDir, String jdbcUrl) throws IOException {
        this.rootDir = rootDir;
        if (jdbcUrl == null) {
            db = Database.open(subdir("db"));
        } else {
            db = Database.open(jdbcUrl);
        }
        symlinker = new Symlinker(subdir("blob"));
        List<Filesystem> fslist = new ArrayList<>();
        fslist.add(new Filesystem("fs.staging", subdir("staging")));
        stagingArea = new Area(db, "area.staging", fslist, "directory");
        areas.add(stagingArea);
    }

    public static void init(Path root) throws IOException {
        Files.createDirectories(root.resolve("staging"));
        Files.createDirectories(root.resolve("db"));
        Files.createDirectories(root.resolve("blob"));
        try (Database db = Database.open(root.resolve("db"))) {
            db.migrate();
        }
    }

    /**
     * Opens a BlobStore that stores all its data and indexes on the local file
     * system.
     * 
     * @param root
     *            directory to store data and indexes in
     * @return a new BlobStore
     * @throws CorruptBlobStoreException
     *             if the blob store is missing or corrupt
     */
    public static BlobStore open(Path root) throws CorruptBlobStoreException {
        try {
            return new LocalBlobStore(root, null);
        } catch (IOException e) {
            throw new CorruptBlobStoreException(root, e);
        }
    }

    /**
     * Opens a BlobStore that stores all its data and indexes on the local file
     * system.
     * 
     * @param root
     *            directory to store data in
     * @param jdbcUrl
     *            jdbcUrl for the DOSS SQL database
     * @return a new BlobStore
     * @throws CorruptBlobStoreException
     *             if the blob store is missing or corrupt
     */
    public static BlobStore open(Path root, String jdbcUrl) throws CorruptBlobStoreException {
        try {
            return new LocalBlobStore(root, jdbcUrl);
        } catch (IOException e) {
            throw new CorruptBlobStoreException(root, e);
        }
    }

    private Path subdir(String name) throws NotDirectoryException {
        Path path = rootDir.resolve(name);
        if (!Files.isDirectory(path)) {
            throw new NotDirectoryException(path.toString());
        }
        return path;
    }

    @Override
    public void close() {
        for (Area area : areas) {
            area.close();
        }
        db.close();
    }

    @Override
    public Blob get(long blobId) throws IOException, NoSuchBlobException {
        String legacyPath = db.locateLegacy(blobId);
        if (legacyPath != null) {
            return new FileBlob(blobId, Paths.get(legacyPath));
        }
        BlobLocation location = db.locateBlob(blobId);
        if (location == null) {
            throw new NoSuchBlobException(blobId);
        }
        Container container = stagingArea.container(location.containerId());
        Blob blob = container.get(location.offset());
        return new CachedMetadataBlob(db, blob);
    }

    @Override
    public Blob getLegacy(Path legacyPath) throws NoSuchBlobException,
            IOException {
        String path = legacyPath.toAbsolutePath().toString();
        if (!Files.exists(legacyPath)) {
            throw new NoSuchFileException(path);
        }
        return get(db.findOrInsertBlobIdByLegacyPath(path));
    }

    @Override
    public BlobTx begin() {
        BlobTx tx = new Tx();
        txs.put(tx.id(), tx);
        return tx;
    }

    @Override
    public BlobTx resume(long txId) throws NoSuchBlobTxException {
        BlobTx tx = txs.get(txId);
        if (tx == null) {
            throw new NoSuchBlobTxException(txId);
        }
        return tx;
    }

    public class Tx extends ManagedTransaction implements BlobTx {

        final long id = db.nextId();
        final List<Long> addedBlobs = new ArrayList<Long>();

        // ManagedTransaction will call back into this private Transaction when
        // the transaction changes state.
        // This allows us to have transaction state transition logic controlled
        // separately to the central data management concerns of this class.
        Transaction callbacks = new Transaction() {

            @Override
            public void commit() throws IOException {
                txs.remove(id);
            }

            @Override
            public void rollback() throws IOException {
                for (Long blobId : addedBlobs) {
                    db.deleteBlob(blobId);
                    symlinker.unlink(blobId);
                }
                txs.remove(id);
            }

            @Override
            public void prepare() {
                // TODO Auto-generated method stub
            }

            @Override
            public void close() throws IllegalStateException {
            }
        };

        public void setExtension(final long blobId, final String ext)
                throws NoSuchBlobException, IOException {
            Blob blob = get(blobId);
            symlinker.updateLinkPath(blobId, ext);
        }

        @Override
        public Blob put(final Path source) throws IOException {
            return put(Writables.wrap(source));
        }

        @Override
        public Blob put(final byte[] bytes) throws IOException {
            return put(Writables.wrap(bytes));
        }

        @Override
        public long id() {
            return id;
        }

        @Override
        protected Transaction getCallbacks() {
            return callbacks;
        }

        @Override
        public Blob put(Writable output) throws IOException {
            state.assertOpen();
            db.begin();
            long blobId = db.nextId();
            Container container = stagingArea.currentContainer();
            long offset = container.put(blobId, output);
            db.insertBlob(blobId, container.id(), offset);
            if (container instanceof DirectoryContainer) {
                symlinker.link(blobId, (DirectoryContainer) container, offset);
            }
            addedBlobs.add(blobId);
            db.commit();
            return new CachedMetadataBlob(db, container.get(offset));
        }

        /**
         * Slightly dodgey addition to LocalBlobStore Tx, for importing legacy
         * files into a local DOSS. Files do not get a symlink as there are no
         * legacy jp2s.
         * 
         * @param legacyPath
         *            The full path to the legacy DOSS storage system.
         * 
         * @return The Blob id for the legacy file. File can now be retrieved
         *         just like any other DOSS stored file
         * 
         * @throws IOException
         *             when it's unhappy
         */
        public Long putLegacy(Path legacyPath) throws IOException {
            state.assertOpen();
            Long blobId = db.findOrInsertBlobIdByLegacyPath(legacyPath
                    .toString());
            addedBlobs.add(blobId);
            return blobId;
        }
    }

    public String version() {
        return db.version();
    }
}