Implementation and tests of GC, GcJob, S3 files

This commit is contained in:
Winston Li
2017-02-17 11:22:11 +00:00
parent 8a8d308365
commit ee61d72e2e
72 changed files with 1668 additions and 84 deletions

View File

@@ -5,6 +5,8 @@ import org.eclipse.jgit.transport.ServiceMayNotContinueException;
import uk.ac.ic.wlgitbridge.bridge.db.DBStore;
import uk.ac.ic.wlgitbridge.bridge.db.ProjectState;
import uk.ac.ic.wlgitbridge.bridge.db.sqlite.SqliteDBStore;
import uk.ac.ic.wlgitbridge.bridge.gc.GcJob;
import uk.ac.ic.wlgitbridge.bridge.gc.GcJobImpl;
import uk.ac.ic.wlgitbridge.bridge.lock.LockGuard;
import uk.ac.ic.wlgitbridge.bridge.lock.ProjectLock;
import uk.ac.ic.wlgitbridge.bridge.repo.FSGitRepoStore;
@@ -145,6 +147,7 @@ public class Bridge {
private final DBStore dbStore;
private final SwapStore swapStore;
private final SwapJob swapJob;
private final GcJob gcJob;
private final SnapshotAPI snapshotAPI;
private final ResourceCache resourceCache;
@@ -183,6 +186,7 @@ public class Bridge {
dbStore,
swapStore
),
new GcJobImpl(repoStore, lock),
new NetSnapshotAPI(),
new UrlResourceCache(dbStore)
);
@@ -197,6 +201,7 @@ public class Bridge {
* @param dbStore the {@link DBStore} to use
* @param swapStore the {@link SwapStore} to use
* @param swapJob the {@link SwapJob} to use
* @param gcJob
* @param snapshotAPI the {@link SnapshotAPI} to use
* @param resourceCache the {@link ResourceCache} to use
*/
@@ -206,6 +211,7 @@ public class Bridge {
DBStore dbStore,
SwapStore swapStore,
SwapJob swapJob,
GcJob gcJob,
SnapshotAPI snapshotAPI,
ResourceCache resourceCache
) {
@@ -216,6 +222,7 @@ public class Bridge {
this.snapshotAPI = snapshotAPI;
this.resourceCache = resourceCache;
this.swapJob = swapJob;
this.gcJob = gcJob;
postbackManager = new PostbackManager();
Runtime.getRuntime().addShutdownHook(new Thread(this::doShutdown));
repoStore.purgeNonexistentProjects(dbStore.getProjectNames());
@@ -234,6 +241,8 @@ public class Bridge {
Log.info("Shutdown received.");
Log.info("Stopping SwapJob");
swapJob.stop();
Log.info("Stopping GcJob");
gcJob.stop();
Log.info("Waiting for projects");
lock.lockAll();
Log.info("Bye");
@@ -243,8 +252,9 @@ public class Bridge {
* Starts the swap job, which will begin checking whether projects should be
* swapped with a configurable frequency.
*/
public void startSwapJob() {
public void startBackgroundJobs() {
swapJob.start();
gcJob.start();
}
/**
@@ -348,8 +358,12 @@ public class Bridge {
*
* 1. Queries the project state for the given project name.
* a. NOT_PRESENT = We've never seen it before, and the row for the
* project doesn't even exist.
* b. PRESENT = The
* project doesn't even exist. The project definitely
* exists because
* {@link #projectExists(Credential, String)} would
* have had to return true to get here.
* b. PRESENT = The project is on disk.
* c. SWAPPED = The project is in the {@link SwapStore}
*
* If the project has never been cloned, it is git init'd. If the project
* is in swap, it is restored to disk. Otherwise, the project was already

View File

@@ -0,0 +1,34 @@
package uk.ac.ic.wlgitbridge.bridge.gc;
import com.google.api.client.auth.oauth2.Credential;
import uk.ac.ic.wlgitbridge.bridge.Bridge;
import uk.ac.ic.wlgitbridge.bridge.repo.ProjectRepo;
import uk.ac.ic.wlgitbridge.data.filestore.RawDirectory;
import java.util.concurrent.CompletableFuture;
/**
* Is started by the bridge. Every time a project is updated, we queue it for
* GC which executes every hour or so.
*
* We don't queue it into a more immediate Executor because there is no way to
* know if a call to {@link Bridge#updateProject(Credential, ProjectRepo)},
* which releases the lock, is going to call
* {@link Bridge#push(Credential, String, RawDirectory, RawDirectory, String)}.
*
* We don't want the GC to run in between an update and a push.
*/
public interface GcJob {
void start();
void stop();
void onPreGc(Runnable preGc);
void onPostGc(Runnable postGc);
void queueForGc(String projectName);
CompletableFuture<Void> waitForRun();
}

View File

@@ -0,0 +1,141 @@
package uk.ac.ic.wlgitbridge.bridge.gc;
import uk.ac.ic.wlgitbridge.bridge.lock.LockGuard;
import uk.ac.ic.wlgitbridge.bridge.lock.ProjectLock;
import uk.ac.ic.wlgitbridge.bridge.repo.ProjectRepo;
import uk.ac.ic.wlgitbridge.bridge.repo.RepoStore;
import uk.ac.ic.wlgitbridge.util.Log;
import uk.ac.ic.wlgitbridge.util.TimerUtils;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* Implementation of {@link GcJob} using its own Timer and a synchronized
* queue.
*/
public class GcJobImpl implements GcJob {
private final RepoStore repoStore;
private final ProjectLock locks;
private final long intervalMs;
private final Timer timer;
private final Set<String> gcQueue;
/**
* Hooks in case they are needed, e.g. for testing.
*/
private AtomicReference<Runnable> preGc;
private AtomicReference<Runnable> postGc;
/* We need to iterate over and empty it after every run */
private final Lock jobWaitersLock;
private final List<CompletableFuture<Void>> jobWaiters;
public GcJobImpl(RepoStore repoStore, ProjectLock locks, long intervalMs) {
this.repoStore = repoStore;
this.locks = locks;
this.intervalMs = intervalMs;
timer = new Timer();
gcQueue = Collections.newSetFromMap(new ConcurrentHashMap<>());
preGc = new AtomicReference<>(() -> {});
postGc = new AtomicReference<>(() -> {});
jobWaitersLock = new ReentrantLock();
jobWaiters = new ArrayList<>();
}
public GcJobImpl(RepoStore repoStore, ProjectLock locks) {
this(
repoStore,
locks,
TimeUnit.MILLISECONDS.convert(1, TimeUnit.HOURS)
);
}
@Override
public void start() {
Log.info("Starting GC job to run every [{}] ms", intervalMs);
timer.scheduleAtFixedRate(
TimerUtils.makeTimerTask(this::doGC),
intervalMs,
intervalMs
);
}
@Override
public void stop() {
Log.info("Stopping GC job");
timer.cancel();
}
@Override
public void onPreGc(Runnable preGc) {
this.preGc.set(preGc);
}
@Override
public void onPostGc(Runnable postGc) {
this.postGc.set(postGc);
}
/**
* Needs to be callable from any thread.
* @param projectName
*/
@Override
public void queueForGc(String projectName) {
gcQueue.add(projectName);
}
@Override
public CompletableFuture<Void> waitForRun() {
CompletableFuture<Void> ret = new CompletableFuture<>();
jobWaitersLock.lock();
try {
jobWaiters.add(ret);
} finally {
jobWaitersLock.unlock();
}
return ret;
}
private void doGC() {
Log.info("GC job running");
int numGcs = 0;
preGc.get().run();
for (
Iterator<String> it = gcQueue.iterator();
it.hasNext();
it.remove(), ++numGcs
) {
String proj = it.next();
Log.info("[{}] Running GC job on project", proj);
try (LockGuard __ = locks.lockGuard(proj)) {
try {
ProjectRepo repo = repoStore.getExistingRepo(proj);
repo.runGC();
repo.deleteIncomingPacks();
} catch (IOException e) {
Log.info("[{}] Failed to GC project", proj);
}
}
}
Log.info("GC job finished, num gcs: {}", numGcs);
jobWaitersLock.lock();
try {
jobWaiters.forEach(w -> w.complete(null));
} finally {
jobWaitersLock.unlock();
}
postGc.get().run();
}
}

View File

@@ -45,6 +45,13 @@ public class FSGitRepoStore implements RepoStore {
return rootDirectory;
}
@Override
public ProjectRepo getExistingRepo(String project) throws IOException {
GitProjectRepo ret = new GitProjectRepo(project);
ret.useExistingRepository(this);
return ret;
}
/* TODO: Perhaps we should just delete bad directories on the fly. */
@Override
public void purgeNonexistentProjects(

View File

@@ -1,6 +1,7 @@
package uk.ac.ic.wlgitbridge.bridge.repo;
import com.google.common.base.Preconditions;
import org.apache.commons.io.IOUtils;
import org.eclipse.jgit.api.Git;
import org.eclipse.jgit.api.ResetCommand;
import org.eclipse.jgit.api.errors.GitAPIException;
@@ -10,7 +11,6 @@ import org.eclipse.jgit.storage.file.FileRepositoryBuilder;
import uk.ac.ic.wlgitbridge.data.filestore.GitDirectoryContents;
import uk.ac.ic.wlgitbridge.data.filestore.RawFile;
import uk.ac.ic.wlgitbridge.git.exception.GitUserException;
import uk.ac.ic.wlgitbridge.git.exception.SizeLimitExceededException;
import uk.ac.ic.wlgitbridge.git.util.RepositoryObjectTreeWalker;
import uk.ac.ic.wlgitbridge.util.Log;
import uk.ac.ic.wlgitbridge.util.Project;
@@ -18,10 +18,25 @@ import uk.ac.ic.wlgitbridge.util.Util;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.FileVisitResult;
import java.nio.file.FileVisitor;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.*;
/**
* Created by winston on 20/08/2016.
* Class representing a Git repository.
*
* It stores the projectName and repo separately because the hooks need to be
* able to construct one of these without knowing whether the repo exists yet.
*
* It can then be passed to the Bridge, which will either
* {@link #initRepo(RepoStore)} for a never-seen-before repo, or
* {@link #useExistingRepository(RepoStore)} for an existing repo.
*
* Make sure to acquire the project lock before calling methods here.
*/
public class GitProjectRepo implements ProjectRepo {
@@ -81,6 +96,105 @@ public class GitProjectRepo implements ProjectRepo {
}
}
@Override
public void runGC() throws IOException {
Preconditions.checkState(
repository.isPresent(),
"Repo is not present"
);
File dir = getProjectDir();
Preconditions.checkState(dir.isDirectory());
Log.info("[{}] Running git gc", projectName);
Process proc = new ProcessBuilder(
"git", "gc"
).directory(dir).start();
int exitCode;
try {
exitCode = proc.waitFor();
Log.info("Exit: {}", exitCode);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
if (exitCode != 0) {
Log.warn("[{}] Git gc failed", dir.getAbsolutePath());
Log.warn(IOUtils.toString(
proc.getInputStream(),
StandardCharsets.UTF_8
));
Log.warn(IOUtils.toString(
proc.getErrorStream(),
StandardCharsets.UTF_8
));
try {
Thread.sleep(1000000);
} catch (InterruptedException e) {
e.printStackTrace();
}
throw new IOException("git gc error");
}
Log.info("[{}] git gc successful", projectName);
}
@Override
public void deleteIncomingPacks() throws IOException {
Log.info(
"[{}] Checking for garbage `incoming` files",
projectName
);
Files.walkFileTree(getDotGitDir().toPath(), new FileVisitor<Path>() {
@Override
public FileVisitResult preVisitDirectory(
Path dir,
BasicFileAttributes attrs
) throws IOException {
return FileVisitResult.CONTINUE;
}
@Override
public FileVisitResult visitFile(
Path file,
BasicFileAttributes attrs
) throws IOException {
File file_ = file.toFile();
String name = file_.getName();
if (name.startsWith("incoming_") && name.endsWith(".pack")) {
Log.info("Deleting garbage `incoming` file: {}", file_);
Preconditions.checkState(file_.delete());
}
return FileVisitResult.CONTINUE;
}
@Override
public FileVisitResult visitFileFailed(
Path file,
IOException exc
) throws IOException {
Preconditions.checkNotNull(file);
Preconditions.checkNotNull(exc);
Log.warn("Failed to visit file: " + file, exc);
return FileVisitResult.TERMINATE;
}
@Override
public FileVisitResult postVisitDirectory(
Path dir,
IOException exc
) throws IOException {
Preconditions.checkNotNull(dir);
if (exc != null) {
return FileVisitResult.TERMINATE;
}
return FileVisitResult.CONTINUE;
}
});
}
@Override
public File getProjectDir() {
return getJGitRepository().getDirectory().getParentFile();
}
public void resetHard() throws IOException {
Git git = new Git(getJGitRepository());
try {
@@ -94,7 +208,7 @@ public class GitProjectRepo implements ProjectRepo {
return repository.get();
}
public File getDirectory() {
public File getDotGitDir() {
return getJGitRepository().getWorkTree();
}

View File

@@ -3,8 +3,8 @@ package uk.ac.ic.wlgitbridge.bridge.repo;
import uk.ac.ic.wlgitbridge.data.filestore.GitDirectoryContents;
import uk.ac.ic.wlgitbridge.data.filestore.RawFile;
import uk.ac.ic.wlgitbridge.git.exception.GitUserException;
import uk.ac.ic.wlgitbridge.git.exception.SizeLimitExceededException;
import java.io.File;
import java.io.IOException;
import java.util.Collection;
import java.util.Map;
@@ -31,4 +31,10 @@ public interface ProjectRepo {
GitDirectoryContents gitDirectoryContents
) throws IOException;
void runGC() throws IOException;
void deleteIncomingPacks() throws IOException;
File getProjectDir();
}

View File

@@ -17,6 +17,8 @@ public interface RepoStore {
File getRootDirectory();
ProjectRepo getExistingRepo(String project) throws IOException;
void purgeNonexistentProjects(
Collection<String> existingProjectNames
);

View File

@@ -7,6 +7,7 @@ import uk.ac.ic.wlgitbridge.bridge.lock.ProjectLock;
import uk.ac.ic.wlgitbridge.bridge.repo.RepoStore;
import uk.ac.ic.wlgitbridge.bridge.swap.store.SwapStore;
import uk.ac.ic.wlgitbridge.util.Log;
import uk.ac.ic.wlgitbridge.util.TimerUtils;
import java.io.IOException;
import java.io.InputStream;
@@ -81,7 +82,7 @@ public class SwapJobImpl implements SwapJob {
@Override
public void start() {
timer.schedule(
uk.ac.ic.wlgitbridge.util.Timer.makeTimerTask(this::doSwap),
TimerUtils.makeTimerTask(this::doSwap),
0
);
}
@@ -98,7 +99,7 @@ public class SwapJobImpl implements SwapJob {
Log.warn("Exception thrown during swap job", t);
}
timer.schedule(
uk.ac.ic.wlgitbridge.util.Timer.makeTimerTask(this::doSwap),
TimerUtils.makeTimerTask(this::doSwap),
interval.toMillis()
);
}
@@ -161,10 +162,11 @@ public class SwapJobImpl implements SwapJob {
Log.info("Evicting project: {}", projName);
try (LockGuard __ = lock.lockGuard(projName)) {
long[] sizePtr = new long[1];
InputStream bzipped = repoStore.bzip2Project(projName, sizePtr);
swapStore.upload(projName, bzipped, sizePtr[0]);
dbStore.setLastAccessedTime(projName, null);
repoStore.remove(projName);
try (InputStream blob = repoStore.bzip2Project(projName, sizePtr)) {
swapStore.upload(projName, blob, sizePtr[0]);
dbStore.setLastAccessedTime(projName, null);
repoStore.remove(projName);
}
}
Log.info("Evicted project: {}", projName);
}
@@ -183,15 +185,17 @@ public class SwapJobImpl implements SwapJob {
@Override
public void restore(String projName) throws IOException {
try (LockGuard __ = lock.lockGuard(projName)) {
repoStore.unbzip2Project(
projName,
swapStore.openDownloadStream(projName)
);
swapStore.remove(projName);
dbStore.setLastAccessedTime(
projName,
Timestamp.valueOf(LocalDateTime.now())
);
try (InputStream s3File = swapStore.openDownloadStream(projName)) {
repoStore.unbzip2Project(
projName,
s3File
);
swapStore.remove(projName);
dbStore.setLastAccessedTime(
projName,
Timestamp.valueOf(LocalDateTime.now())
);
}
}
}

View File

@@ -83,7 +83,7 @@ public class GitBridgeServer {
try {
bridge.checkDB();
jettyServer.start();
bridge.startSwapJob();
bridge.startBackgroundJobs();
Log.info(Util.getServiceName() + "-Git Bridge server started");
Log.info("Listening on port: " + port);
Log.info("Bridged to: " + apiBaseURL);

View File

@@ -1,5 +1,6 @@
package uk.ac.ic.wlgitbridge.snapshot.servermock.util;
import com.google.common.collect.ImmutableSet;
import org.eclipse.jgit.api.Git;
import org.eclipse.jgit.api.errors.GitAPIException;
import org.eclipse.jgit.api.errors.NoHeadException;
@@ -10,9 +11,9 @@ import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* Created by Winston on 11/01/15.
@@ -50,6 +51,26 @@ public class FileUtil {
dir2,
dir2.resolve(".git")
);
return filesAreEqual(dir1, dir2, dir1Contents, dir2Contents);
}
public static boolean directoryDeepEquals(File dir, File dir_) {
return directoryDeepEquals(dir.toPath(), dir_.toPath());
}
public static boolean directoryDeepEquals(Path path, Path path_) {
List<Set<String>> contents = Stream.of(path, path_).map(p ->
getAllFilesRecursively(
p, p, Collections.emptySet(), true
)
).collect(Collectors.toList());
return filesAreEqual(path, path_, contents.get(0), contents.get(1));
}
private static boolean filesAreEqual(
Path dir1, Path dir2,
Set<String> dir1Contents, Set<String> dir2Contents
) {
boolean filesEqual = dir1Contents.equals(dir2Contents);
if (!filesEqual) {
System.out.println(
@@ -106,14 +127,18 @@ public class FileUtil {
Path dir,
Path excluded
) {
return getAllRecursivelyInDirectoryApartFrom(dir, excluded, true);
return getAllRecursivelyInDirectoryApartFrom(
dir, excluded, true
);
}
public static Set<String> getOnlyFilesRecursivelyInDirectoryApartFrom(
Path dir,
Path excluded
) {
return getAllRecursivelyInDirectoryApartFrom(dir, excluded, false);
return getAllRecursivelyInDirectoryApartFrom(
dir, excluded, false
);
}
private static Set<String> getAllRecursivelyInDirectoryApartFrom(
@@ -124,30 +149,40 @@ public class FileUtil {
if (!dir.toFile().isDirectory()) {
throw new IllegalArgumentException("need a directory");
}
return getAllFilesRecursively(dir, dir, excluded, directories);
return getAllFilesRecursively(
dir, dir, ImmutableSet.of(excluded.toFile()), directories
);
}
private static final Set<String> ExcludedNames = ImmutableSet.of(
".DS_Store"
);
static Set<String> getAllFilesRecursively(
Path baseDir,
Path dir,
Path excluded,
Set<File> excluded,
boolean directories
) {
Set<String> files = new HashSet<String>();
for (File file : dir.toFile().listFiles()) {
if (!file.equals(excluded.toFile())) {
boolean isDirectory = file.isDirectory();
if (directories || !isDirectory) {
files.add(baseDir.relativize(file.toPath()).toString());
}
if (isDirectory) {
files.addAll(getAllFilesRecursively(
baseDir,
file.toPath(),
excluded,
directories
));
}
if (excluded.contains(file)) {
continue;
}
if (ExcludedNames.contains(file.getName())) {
continue;
}
boolean isDirectory = file.isDirectory();
if (directories || !isDirectory) {
files.add(baseDir.relativize(file.toPath()).toString());
}
if (isDirectory) {
files.addAll(getAllFilesRecursively(
baseDir,
file.toPath(),
excluded,
directories
));
}
}
return files;

View File

@@ -8,14 +8,19 @@ import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream;
import org.apache.commons.compress.utils.IOUtils;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.output.ByteArrayOutputStream;
import java.io.*;
import java.nio.file.Path;
import java.nio.file.Paths;
/**
* Created by winston on 23/08/2016.
* Tar utilities.
*
* The resource returned by zip and tar are treated as unowned.
*
* The resource given to unzip is treated as unowned.
*
* Caller is responsible for all resources.
*/
public class Tar {
@@ -31,23 +36,27 @@ public class Tar {
File fileOrDir,
long[] sizePtr
) throws IOException {
ByteArrayOutputStream target = new ByteArrayOutputStream();
File tmp = File.createTempFile(fileOrDir.getName(), ".tar.bz2");
tmp.deleteOnExit();
OutputStream target = new FileOutputStream(tmp);
/* Closes target */
try (OutputStream bzip2 = new BZip2CompressorOutputStream(target)) {
tarTo(fileOrDir, bzip2);
}
if (sizePtr != null) {
sizePtr[0] = target.size();
sizePtr[0] = tmp.length();
}
return target.toInputStream();
return new FileInputStream(tmp);
}
public static void unzip(
InputStream tarbz2,
File parentDir
) throws IOException {
try (InputStream tar = new BZip2CompressorInputStream(tarbz2)) {
untar(tar, parentDir);
}
/* BZip2CompressorInputStream does not need closing
Closing it would close tarbz2 which we should not do */
InputStream tar = new BZip2CompressorInputStream(tarbz2);
untar(tar, parentDir);
}
}
@@ -55,9 +64,12 @@ public class Tar {
private Tar() {}
public static InputStream tar(File fileOrDir) throws IOException {
ByteArrayOutputStream target = new ByteArrayOutputStream();
tarTo(fileOrDir, target);
return target.toInputStream();
File tmp = File.createTempFile(fileOrDir.getName(), ".tar");
tmp.deleteOnExit();
try (FileOutputStream target = new FileOutputStream(tmp)) {
tarTo(fileOrDir, target);
return new FileInputStream(tmp);
}
}
public static void tarTo(

View File

@@ -5,13 +5,17 @@ import java.util.TimerTask;
/**
* Created by winston on 23/08/2016.
*/
public class Timer {
public class TimerUtils {
public static TimerTask makeTimerTask(Runnable lamb) {
return new TimerTask() {
@Override
public void run() {
lamb.run();
try {
lamb.run();
} catch (Throwable t) {
Log.warn("Error on timer", t);
}
}
};
}