TarContainer.java

package doss.local;

import static java.nio.file.StandardOpenOption.CREATE;
import static java.nio.file.StandardOpenOption.READ;
import static java.nio.file.StandardOpenOption.WRITE;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SeekableByteChannel;
import java.nio.file.Files;
import java.nio.file.Path;

import org.apache.commons.compress.archivers.ArchiveException;
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;

import doss.Blob;
import doss.SizedWritable;
import doss.Writable;
import doss.core.Writables;

public class TarContainer implements Container {

    final private Path path;
    final private long id;
    final private SeekableByteChannel channel;
    final private ByteBuffer headerBuffer = ByteBuffer
            .allocate(TAR_ENTRY_HEADER_LENGTH);
    private static final int TAR_ENTRY_HEADER_LENGTH = 512;
    private static final byte[] FOOTER_BYTES = new byte[2 * TAR_ENTRY_HEADER_LENGTH];

    TarContainer(long id, Path path) throws IOException, ArchiveException {
        this.path = path;
        this.id = id;
        channel = Files.newByteChannel(path, READ, WRITE, CREATE);
    }

    @Override
    public synchronized void close() {
        try {
            channel.close();
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public synchronized Blob get(long offset) throws IOException {
        TarBlob tarBlob = null;

        channel.position(offset);
        headerBuffer.clear();
        channel.read(headerBuffer);
        headerBuffer.flip();
        TarArchiveEntry entry = new TarArchiveEntry(headerBuffer.array());
        tarBlob = new TarBlob(path, offset + 512, entry);

        return tarBlob;
    }

    @Override
    public synchronized long put(long id, Writable data) throws IOException {
        SizedWritable output = Writables.toSized(data);
        if (channel.size() > 1024) {
            channel.position(channel.size() - 1024);
        }

        long offset = channel.position();
        writeRecordHeader(id, output);
        output.writeTo(channel);
        writeRecordPadding();
        writeArchiveFooter();
        return offset;

    }

    private void writeRecordPadding() throws IOException {
        int padding = (int) (512L - channel.position() % 512L);
        channel.write(ByteBuffer.allocate(padding));
    }

    /**
     * Write the 1024 zero byte end of archive marker.
     */
    private void writeArchiveFooter() throws IOException {
        ByteBuffer endByteByffer = ByteBuffer.wrap(FOOTER_BYTES);
        endByteByffer.flip();
        endByteByffer.rewind();
        channel.write(endByteByffer);
    }

    private void writeRecordHeader(long id, SizedWritable output)
            throws IOException {
        TarArchiveEntry entry = new TarArchiveEntry(String.valueOf(id));
        entry.setSize(output.size());
        headerBuffer.clear();
        entry.writeEntryHeader(headerBuffer.array());
        channel.write(headerBuffer);
    }

    @Override
    public long id() {
        return id;
    }

    @Override
    public long size() throws IOException {
        return channel.size();
    }

}