Database.java
package doss.local;
import java.io.Closeable;
import java.nio.file.Path;
import java.sql.DatabaseMetaData;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.skife.jdbi.v2.DBI;
import org.skife.jdbi.v2.StatementContext;
import org.skife.jdbi.v2.sqlobject.Bind;
import org.skife.jdbi.v2.sqlobject.GetGeneratedKeys;
import org.skife.jdbi.v2.sqlobject.SqlQuery;
import org.skife.jdbi.v2.sqlobject.SqlUpdate;
import org.skife.jdbi.v2.sqlobject.Transaction;
import org.skife.jdbi.v2.sqlobject.customizers.RegisterMapper;
import org.skife.jdbi.v2.sqlobject.mixins.GetHandle;
import org.skife.jdbi.v2.sqlobject.mixins.Transactional;
import org.skife.jdbi.v2.tweak.ResultSetMapper;
import com.googlecode.flyway.core.Flyway;
abstract class Database implements Closeable, GetHandle, Transactional<Database> {
/*
* Connection meta data URL doesn't include H2 switches. We have to manually
* include them for flyway migrations
*/
static final String H2_SWITCHES = ";AUTO_SERVER=true;MVCC=true";
/**
* Opens an in-memory database for internal testing.
*/
static Database open() {
return open(new DBI("jdbc:h2:mem:testing;MVCC=true"));
}
public static Database open(String jdbcUrl) {
return open(new DBI(jdbcUrl));
}
@Override
public abstract void close();
/**
* Opens a DOSS database stored on the local filesystem.
*/
public static Database open(Path dbPath) {
return open(new DBI("jdbc:h2:file:" + dbPath + "/doss" + H2_SWITCHES));
}
public static Database open(DBI dbi) {
return dbi.open(Database.class);
}
/**
* Runs database migrations to populate or upgrade the schema.
*/
public Database migrate() {
try {
Flyway flyway = openFlyway();
flyway.setInitOnMigrate(true);
flyway.migrate();
} catch (SQLException e) {
throw new RuntimeException(e);
}
return this;
}
private Flyway openFlyway() throws SQLException {
// silence flyway's annoying default logging
Logger.getLogger("com.googlecode.flyway").setLevel(Level.SEVERE);
DatabaseMetaData md = getHandle().getConnection().getMetaData();
Flyway flyway = new Flyway();
flyway.setDataSource(md.getURL() + H2_SWITCHES, md.getUserName(), "");
flyway.setLocations("doss/migrations");
return flyway;
}
public String version() {
try {
Flyway flyway = openFlyway();
return flyway.info().current().getVersion().toString();
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
@SqlQuery("SELECT NEXTVAL('ID_SEQ')")
public abstract long nextId();
@SqlUpdate("INSERT INTO blobs (blob_id, container_id, offset) VALUES (:blobId, :containerId, :offset)")
public abstract void insertBlob(@Bind("blobId") long blobId, @Bind("containerId") long containerId,
@Bind("offset") long offset);
@SqlUpdate("DELETE FROM blobs WHERE blob_id = :blobId")
public abstract void deleteBlob(@Bind("blobId") long blobId);
@SqlQuery("SELECT container_id, offset FROM blobs WHERE blob_id = :blobId")
@RegisterMapper(BlobLocationMapper.class)
public abstract BlobLocation locateBlob(@Bind("blobId") long blobId);
public static class BlobLocationMapper implements ResultSetMapper<BlobLocation> {
@Override
public BlobLocation map(int index, ResultSet r, StatementContext ctx) throws SQLException {
return new BlobLocation(r.getLong("container_id"), r.getLong("offset"));
}
}
@SqlQuery("SELECT container_id FROM containers WHERE sealed = 0 AND AREA = :area")
public abstract Long findAnOpenContainer(@Bind("area") String area);
@SqlUpdate("INSERT INTO containers (area) VALUES (:area)")
@GetGeneratedKeys
public abstract long createContainer(@Bind("area") String name);
@SqlUpdate("UPDATE containers SET sealed = true WHERE container_id = :id")
public abstract long sealContainer(@Bind("id") long containerId);
@SqlQuery("SELECT blob_id FROM legacy_paths WHERE legacy_path = :legacy_path FOR UPDATE")
public abstract Long findBlobIdForLegacyPathAndLock(@Bind("legacy_path") String legacyPath);
@SqlUpdate("INSERT INTO legacy_paths (blob_id, legacy_path) VALUES (:blob_id, :legacy_path)")
public abstract long insertLegacy(@Bind("blob_id") long blobId, @Bind("legacy_path") String legacyPath);
@Transaction
public Long findOrInsertBlobIdByLegacyPath(String legacyPath) {
Long blobId = findBlobIdForLegacyPathAndLock(legacyPath);
if (blobId == null) {
blobId = nextId();
insertLegacy(blobId, legacyPath);
}
return blobId;
}
@SqlQuery("SELECT legacy_path FROM legacy_paths WHERE blob_id = :blob_id")
public abstract String locateLegacy(@Bind("blob_id") long blobId);
@SqlQuery("SELECT digest FROM digests WHERE blob_id = :blob_id AND algorithm = :algorithm")
public abstract String getDigest(@Bind("blob_id") long blobId, @Bind("algorithm") String algorithm);
@SqlUpdate("INSERT INTO digests (blob_id, algorithm, digest) VALUES(:blob_id, :algorithm, :digest)")
public abstract void insertDigest(@Bind("blob_id") long blobId, @Bind("algorithm") String algorithm,
@Bind("digest") String digest);
@SqlQuery("SELECT algorithm, digest FROM digests WHERE blob_id = :blob_id")
public abstract ResultSet getDigestsIterable(@Bind("blob_id") long blobId);
public Map<String, String> getDigests(long blobId) {
HashMap<String, String> out = new HashMap<String, String>();
for (Map<String, Object> row : getHandle().select("SELECT algorithm, digest FROM digests WHERE blob_id = ?",
blobId)) {
out.put((String) row.get("algorithm"), (String) row.get("digest"));
}
return out;
}
}