BlobStoreServer.java

package doss.net;

import java.io.Closeable;
import java.io.IOException;
import java.net.ServerSocket;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import org.apache.thrift.TException;
import org.apache.thrift.TProcessor;
import org.apache.thrift.TProcessorFactory;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.server.ServerContext;
import org.apache.thrift.server.TServer;
import org.apache.thrift.server.TServerEventHandler;
import org.apache.thrift.server.TSimpleServer;
import org.apache.thrift.transport.TServerSocket;
import org.apache.thrift.transport.TServerTransport;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;

import doss.BlobStore;
import doss.net.DossService.Iface;

public class BlobStoreServer implements Runnable, Closeable {
    final private TServer server;
    final Map<TTransport, Connection> handlers = new ConcurrentHashMap<>();

    public BlobStoreServer(BlobStore blobStore, ServerSocket socket)
            throws IOException,
            TTransportException {
        TServerTransport serverTransport = new TServerSocket(socket);
        server = new TSimpleServer(
                new TServer.Args(serverTransport)
                        .processorFactory(new ProcessorFactory(blobStore)));
        server.setServerEventHandler(new TServerEventHandler() {

            @Override
            public void processContext(ServerContext arg0, TTransport arg1,
                    TTransport arg2) {
            }

            @Override
            public void preServe() {
            }

            @Override
            public void deleteContext(ServerContext context, TProtocol in,
                    TProtocol out) {
                Connection conn = (Connection) context;
                handlers.remove(conn);
                conn.disconnect();
            }

            @Override
            public ServerContext createContext(TProtocol in, TProtocol out) {
                return handlers.get(in.getTransport());
            }
        });
    }

    private class ProcessorFactory extends TProcessorFactory {
        final BlobStore blobStore;

        public ProcessorFactory(BlobStore blobStore) {
            super(null);
            this.blobStore = blobStore;
        }

        @Override
        public TProcessor getProcessor(TTransport transport) {
            Connection handler = new Connection(blobStore);
            handlers.put(transport, handler);
            return new Processor(handler);
        }

        @Override
        public boolean isAsyncProcessor() {
            return false;
        }

    }

    private static class Processor extends DossService.Processor<Iface> {

        public Processor(Iface iface) {
            super(iface);
        }

        @Override
        public boolean process(TProtocol arg0, TProtocol arg1)
                throws TException {
            try {
                return super.process(arg0, arg1);
            } catch (RuntimeException e) {
                e.printStackTrace();
                throw e;
            }
        }

    }

    @Override
    public void run() {
        server.serve();
    }

    @Override
    public void close() {
        server.stop();
    }
}