diff --git a/build.gradle b/build.gradle index c6b1b422623bd7f8a10d6c0e8e3278d0085e08e5..0e45b160d9c5b420698b844f9cd6b31aaf0a2045 100644 --- a/build.gradle +++ b/build.gradle @@ -76,6 +76,9 @@ dependencies { // https://mvnrepository.com/artifact/com.ontotext.graphdb/graphdb-free-runtime implementation group: 'com.ontotext.graphdb', name: 'graphdb-free-runtime', version: '9.5.0' + // https://mvnrepository.com/artifact/org.eclipse.rdf4j/rdf4j-model + implementation group: 'org.eclipse.rdf4j', name: 'rdf4j-model', version: '3.6.0' + // HDT clone and maven install the hdt library from hdt-java git repository implementation group: 'org.rdfhdt', name: 'hdt-java-core', version: '2.1.3-SNAPSHOT' diff --git a/src/main/java/nl/munlock/graphdb/App.java b/src/main/java/nl/munlock/graphdb/App.java index b8b46be69ec8a3fe6d461150f630e6501bef4707..bfe509a523c07c6285b510b16dbe27cacbbbc8c6 100644 --- a/src/main/java/nl/munlock/graphdb/App.java +++ b/src/main/java/nl/munlock/graphdb/App.java @@ -31,13 +31,14 @@ public class App { CommandOptions commandOptions = new CommandOptions(args); CreateRemoteRepository.main(commandOptions); - HashMap<String, String> rdfFiles = Download.findRDFFiles(commandOptions); - Load.rdf(commandOptions, rdfFiles); + // Change to HDT... + HashMap<String, String> hdtFiles = Download.findHDTFiles(commandOptions); + Load.hdt(commandOptions, hdtFiles); } else if (arguments.contains("-hdt")) { logger.info("HDT parsing"); // Check if hdt is in args CommandOptionsHDT commandOptions = new CommandOptionsHDT(args); - HashMap<String, String> rdfFiles = Download.findRDFFiles(commandOptions.folder); + HashMap<String, String> rdfFiles = Download.findHDTFiles(commandOptions.folder); if (rdfFiles.size() > 0) { nl.munlock.hdt.Create.start(commandOptions, rdfFiles); } else { diff --git a/src/main/java/nl/munlock/graphdb/options/CommandOptionsHDT.java b/src/main/java/nl/munlock/graphdb/options/CommandOptionsHDT.java index adb5a5130f957cce475781daeeb354cf3d52dfb9..ec0a932f58eadd4bd2c4e3f7b4d82a01ca3e130a 100644 --- a/src/main/java/nl/munlock/graphdb/options/CommandOptionsHDT.java +++ b/src/main/java/nl/munlock/graphdb/options/CommandOptionsHDT.java @@ -18,6 +18,9 @@ public class CommandOptionsHDT { @Parameter(names = {"-hdt"}, description = "Running in HDT mode", required = true) public boolean hdt; +// @Parameter(names = {"-force"}, description = "Force rebuilding of HDT files") +// public boolean force; + public String[] args; public CommandOptionsHDT(String args[]) { diff --git a/src/main/java/nl/munlock/hdt/Create.java b/src/main/java/nl/munlock/hdt/Create.java index 7aa1583012ece2f471aa9cc57273265e1d3d90dd..5fa3ec664003994aa4907e5897e4c718502ef98b 100644 --- a/src/main/java/nl/munlock/hdt/Create.java +++ b/src/main/java/nl/munlock/hdt/Create.java @@ -62,12 +62,13 @@ public class Create { if (new File(outputFile).exists() && new File(outputFile).length() > 10) { outputFiles.add(outputFile); } else { + // Obtain file + downloadFile(connection, new File(file)); + // Start RDF validation boolean validating = true; while (validating) { - logger.info("Validating " + localFile); try { - downloadFile(connection, new File(file)); - + logger.info("Validating " + localFile); InputStream inputStream = new FileInputStream(localFile); FileOutputStream outputStream = new FileOutputStream(outputFile); diff --git a/src/main/java/nl/munlock/irods/Download.java b/src/main/java/nl/munlock/irods/Download.java index c0ddf946398a8d9ce5045e36d14eb4364fec4698..2ad0a79609e76395eb6fe1d97dbefff2ec47a7f0 100644 --- a/src/main/java/nl/munlock/irods/Download.java +++ b/src/main/java/nl/munlock/irods/Download.java @@ -17,6 +17,8 @@ import org.irods.jargon.core.pub.DataTransferOperations; import org.irods.jargon.core.pub.IRODSGenQueryExecutor; import org.irods.jargon.core.pub.io.IRODSFile; import org.irods.jargon.core.query.*; +import org.irods.jargon.core.transfer.TransferStatusCallbackListener; +import org.rdfhdt.hdt.listener.ProgressListener; import java.io.File; import java.io.FileNotFoundException; @@ -28,7 +30,7 @@ import java.util.concurrent.TimeUnit; public class Download { private static Logger logger = Logger.getLogger(Download.class); - public static HashMap<String, String> findRDFFiles(CommandOptionsHDT commandOptions, String searchPath) throws GenQueryBuilderException, JargonException { + public static HashMap<String, String> findHDTFiles(CommandOptionsHDT commandOptions, String searchPath) throws GenQueryBuilderException, JargonException { logger.info("Searching for RDF files"); // HashSet<String> loadedFiles = getLoadedFiles(commandOptions); @@ -86,7 +88,7 @@ public class Download { return paths; } - public static HashMap<String, String> findRDFFiles(String searchPath) throws GenQueryBuilderException, JargonException { + public static HashMap<String, String> findHDTFiles(String searchPath) throws GenQueryBuilderException, JargonException { logger.info("Searching for RDF files in " + searchPath); Connection connection = new Connection(); @@ -136,7 +138,7 @@ public class Download { return paths; } - public static HashMap<String, String> findRDFFiles(CommandOptions commandOptions) throws GenQueryBuilderException, JargonException { + public static HashMap<String, String> findHDTFiles(CommandOptions commandOptions) throws GenQueryBuilderException, JargonException { logger.info("Searching for RDF files"); HashSet<String> loadedFiles = getLoadedFiles(commandOptions); @@ -191,6 +193,42 @@ public class Download { return paths; } + public static HashSet<String> getObservationUnits() throws GenQueryBuilderException, JargonException, JargonQueryException, InterruptedException { + System.err.println("Getting OU's"); + Connection connection = new Connection(); + + IRODSGenQueryBuilder queryBuilder = new IRODSGenQueryBuilder(true, null); + + // Generalised to landingzone folder, check later if its ENA or Project + queryBuilder.addConditionAsGenQueryField(RodsGenQueryEnum.COL_META_COLL_ATTR_NAME, QueryConditionOperators.EQUAL, "type"); + queryBuilder.addConditionAsGenQueryField(RodsGenQueryEnum.COL_META_COLL_ATTR_VALUE, QueryConditionOperators.EQUAL, "ObservationUnit"); + // Skip files found in trash + queryBuilder.addConditionAsGenQueryField(RodsGenQueryEnum.COL_COLL_NAME, QueryConditionOperators.NOT_LIKE, "/" + connection.irodsAccount.getZone() + "/trash/%"); + // Get folders + queryBuilder.addSelectAsGenQueryValue(RodsGenQueryEnum.COL_COLL_NAME); + + // Set limit? + IRODSGenQueryFromBuilder query = queryBuilder.exportIRODSQueryFromBuilder(100000); + + IRODSGenQueryExecutor irodsGenQueryExecutor = connection.accessObjectFactory.getIRODSGenQueryExecutor(connection.irodsAccount); + IRODSQueryResultSet irodsQueryResultSet; + + irodsQueryResultSet = irodsGenQueryExecutor.executeIRODSQuery(query, 0); + TimeUnit.SECONDS.sleep(0); + + List<IRODSQueryResultRow> irodsQueryResultSetResults = irodsQueryResultSet.getResults(); + + HashSet<String> paths = new HashSet<>(); + int count = irodsQueryResultSetResults.size(); + for (IRODSQueryResultRow irodsQueryResultSetResult : irodsQueryResultSetResults) { + count = count - 1; + if (count % 100 == 0) logger.info("Still " + count + " left"); + String path = irodsQueryResultSetResult.getColumn(0); + paths.add(path); + } + return paths; + } + private static HashSet<String> getLoadedFiles(CommandOptions commandOptions) { SPARQLRepository sparqlRepository = new SPARQLRepository(commandOptions.graphdb + "/repositories/" + commandOptions.project +"_" + commandOptions.investigation); sparqlRepository.setUsernameAndPassword(commandOptions.username, commandOptions.password); @@ -231,12 +269,6 @@ public class Download { File localFile = new File("." + download); - // TODO remove entry - if (localFile.exists()) { - logger.info("File already exists"); - return; - } - if (localFile.exists()) { // Perform hash check! // Get local HASH diff --git a/src/main/java/nl/munlock/irods/Load.java b/src/main/java/nl/munlock/irods/Load.java index 6512f9db8e012c17eddf5c444ac9927421570021..9554281bc5e8a1c7972600080e790ad9f0e7d29d 100644 --- a/src/main/java/nl/munlock/irods/Load.java +++ b/src/main/java/nl/munlock/irods/Load.java @@ -6,8 +6,15 @@ import org.apache.jena.rdf.model.ModelFactory; import org.apache.jena.riot.Lang; import org.apache.jena.riot.RDFDataMgr; import org.apache.log4j.Logger; +import org.eclipse.rdf4j.model.IRI; +import org.eclipse.rdf4j.model.Resource; +import org.eclipse.rdf4j.model.Statement; +import org.eclipse.rdf4j.model.Value; import org.eclipse.rdf4j.repository.Repository; import org.eclipse.rdf4j.repository.RepositoryConnection; +import org.eclipse.rdf4j.repository.RepositoryResult; +import org.eclipse.rdf4j.repository.event.base.NotifyingRepositoryConnectionWrapper; +import org.eclipse.rdf4j.repository.event.base.RepositoryConnectionListenerAdapter; import org.eclipse.rdf4j.repository.manager.RemoteRepositoryManager; import org.eclipse.rdf4j.rio.RDFFormat; import org.irods.jargon.core.exception.JargonException; @@ -18,70 +25,133 @@ import org.rdfhdt.hdtjena.HDTGraph; import java.io.*; import java.nio.charset.StandardCharsets; import java.util.HashMap; +import java.util.HashSet; import java.util.zip.GZIPInputStream; import java.util.zip.GZIPOutputStream; import static nl.munlock.irods.Download.downloadFile; +import static org.eclipse.rdf4j.model.util.Values.iri; public class Load { static Logger logger = Logger.getLogger(Load.class); + private static RepositoryConnection repositoryConnection; + private static Repository repository; - public static void rdf(CommandOptions commandOptions, HashMap<String, String> rdfFiles) throws IOException, JargonException { + public static void hdt(CommandOptions commandOptions, HashMap<String, String> hdtFiles) throws IOException, JargonException { logger.info("Loading RDF files"); String strServerUrl = commandOptions.graphdb; RemoteRepositoryManager remoteRepositoryManager = RemoteRepositoryManager.getInstance(strServerUrl); - remoteRepositoryManager.setUsernameAndPassword("admin", "root"); + remoteRepositoryManager.setUsernameAndPassword(commandOptions.username, commandOptions.password); remoteRepositoryManager.init(); // Get the repository from repository manager, note the repository id // set in configuration .ttl file - Repository repository = remoteRepositoryManager.getRepository(commandOptions.project +"_" + commandOptions.investigation); + repository = remoteRepositoryManager.getRepository(commandOptions.project + "_" + commandOptions.investigation); // Open a connection to this repository - RepositoryConnection repositoryConnection = repository.getConnection(); + repositoryConnection = repository.getConnection(); + + // Obtain all hashes + IRI predicate = iri("http://www.w3.org/1999/02/22-rdf-syntax-ns#type"); + IRI object = iri("http://m-unlock.nl/ontology/RDFDataSet"); + RepositoryResult<Statement> repositoryResults = repositoryConnection.getStatements(null, predicate, object); + HashSet<String> hashes = new HashSet<>(); + for (Statement repositoryResult : repositoryResults) { + hashes.add(repositoryResult.getSubject().stringValue()); + } + // iRODS connection for inputstream Connection connection = new Connection(); - - for (String rdfFile : rdfFiles.keySet()) { - - downloadFile(connection, new File(rdfFile)); - logger.info("Converting to NT file"); - HDT hdt = HDTManager.mapIndexedHDT("." + rdfFile); - HDTGraph graph = new HDTGraph(hdt); - Model model = ModelFactory.createModelForGraph(graph); - GZIPOutputStream outputStream = new GZIPOutputStream(new FileOutputStream("." + rdfFile + ".nt.gz")); - RDFDataMgr.write(outputStream, model, Lang.NTRIPLES) ; - outputStream.close(); - - // Open the stream - GZIPInputStream inputStream = new GZIPInputStream(new FileInputStream("." + rdfFile + ".nt.gz")); - // Sent to databaseConver - repositoryConnection.add(inputStream, commandOptions.base, RDFFormat.NTRIPLES); - - // Add triples of loaded files - String statement = "<" + rdfFiles.get(rdfFile) + "> <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <http://m-unlock.nl/ontology/RDFDataSet> ."; - InputStream stream = new ByteArrayInputStream(statement.getBytes(StandardCharsets.UTF_8)); - repositoryConnection.add(stream, null, RDFFormat.NTRIPLES); - -// try { -// Rio.parse(inputStream, commandOptions.base, RDFFormat.TURTLE); -// inputStream.close(); -// } catch (RDFParseException e) { -// logger.info("Failed " + rdfFile); -// continue; -// } catch (IOException e) { -// System.err.println("NO IDEA " + e.getMessage()); -// } -// -// logger.info("Loading " + rdfFile); - -// inputStream = new FileInputStream("." + rdfFile); -// repositoryConnection.add(inputStream, commandOptions.base, RDFFormat.TURTLE); - // inputStream.close(); - + int counter = 0; + for (String hdtFile : hdtFiles.keySet()) { + counter++; + // Downloading the hdt file + downloadFile(connection, new File(hdtFile)); + // Downloading the index file + downloadFile(connection, new File(hdtFile + ".index.v1-1")); + + // Perform the splitter and split check... + // First check if .nt.gz.0 exists... if so collect all files containing <name>.nt.gz.[0-1].* + // TODO if < 10.000.000 no split needed perhaps rename to .nt.gz.0 and be done with it? + File ntTriples = new File("." + hdtFile + ".nt.gz"); + // No conversion needed + HashSet<File> localTripleFileSubsets = new HashSet<>(); + if (ntTriples.exists()) { + logger.info("File already split"); + // List all files in directory and check if name ends with .nt.gz.part.* + for (File file : ntTriples.getParentFile().listFiles()) { + if (file.getName().matches(ntTriples.getName() + ".part.*")) { + localTripleFileSubsets.add(file); + } + } + } + + // Double check if parts are not available... + if (localTripleFileSubsets.size() == 0) { + // Perform the conversion to nt.gz + logger.info("Converting " + counter + " of " + hdtFiles.keySet().size() + " " + new File(hdtFile).getName() + " to NT file"); + HDT hdt = HDTManager.mapIndexedHDT("." + hdtFile); + HDTGraph graph = new HDTGraph(hdt); + Model model = ModelFactory.createModelForGraph(graph); + GZIPOutputStream outputStream = new GZIPOutputStream(new FileOutputStream(ntTriples)); + RDFDataMgr.write(outputStream, model, Lang.NTRIPLES); + outputStream.close(); + + // Split file into 10 million reads per file? + GZIPInputStream inputStream = new GZIPInputStream(new FileInputStream(ntTriples)); + BufferedReader buffered = new BufferedReader(new InputStreamReader(inputStream)); + int lineCounter = 0; + String content; + File localTripleFileSubset = new File(ntTriples + ".part." + lineCounter); + + localTripleFileSubsets.add(localTripleFileSubset); + // First 10m stream + PrintWriter localTripleOutputStream = new PrintWriter(new GZIPOutputStream(new FileOutputStream(localTripleFileSubset))); + while ((content = buffered.readLine()) != null) { + // increment line counter + lineCounter++; + // Split in chunks of 10.000.000 + if (lineCounter % 10000000 == 0) { + // Close the file stream + localTripleOutputStream.close(); + // Make new file and stream with line counter + localTripleFileSubset = new File(ntTriples + ".part." + lineCounter); + logger.info("New part created " + localTripleFileSubset.getName()); + // Add to list of output files + localTripleFileSubsets.add(localTripleFileSubset); + // The new output stream in gzip compression + localTripleOutputStream = new PrintWriter(new GZIPOutputStream(new FileOutputStream(localTripleFileSubset))); + } + // Writing the content line + localTripleOutputStream.println(content); + } + localTripleOutputStream.close(); + } + + // Iterate over all output subset files + for (File tripleFileSubset : localTripleFileSubsets) { + // Sent to database + logger.info("Loading " + tripleFileSubset.getName() + " into " + strServerUrl); + // Generate hash of file + GZIPInputStream inputStream = new GZIPInputStream(new FileInputStream(tripleFileSubset)); + String sha256 = "sha256:"+org.apache.commons.codec.digest.DigestUtils.sha256Hex(inputStream); + // If hash is present we can skip the loading process + if (hashes.contains(sha256)) continue; + + hashes.add(sha256); + + // Load file into triple store + inputStream = new GZIPInputStream(new FileInputStream(tripleFileSubset)); + loadZippedFile(inputStream, RDFFormat.NTRIPLES); + // Add hash of file to triple store + String statement = "<" + sha256 + "> <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <http://m-unlock.nl/ontology/RDFDataSet> ."; + InputStream stream = new ByteArrayInputStream(statement.getBytes(StandardCharsets.UTF_8)); + repositoryConnection.add(stream, null, RDFFormat.NTRIPLES); + stream.close(); + } } // Shutdown connection, repository and manager @@ -89,4 +159,33 @@ public class Load { repository.shutDown(); remoteRepositoryManager.shutDown(); } + + public static void loadZippedFile(InputStream in, RDFFormat format) throws IOException { + NotifyingRepositoryConnectionWrapper con = new NotifyingRepositoryConnectionWrapper(repository, repository.getConnection()); + RepositoryConnectionListenerAdapter myListener = + new RepositoryConnectionListenerAdapter() { + private long count = 0; + + @Override + public void add(RepositoryConnection arg0, Resource arg1, IRI arg2, + Value arg3, Resource... arg4) { + count++; + if (count % 100000 == 0) + logger.info("Add statement number " + count + "\t" + arg1 + " " + arg2 + " " + arg3); + } + }; + con.addRepositoryConnectionListener(myListener); + int attempt = 0; + while (true) { + attempt++; + try { + con.add(in, "", format); + break; + } catch (Exception e) { + logger.error(e.getMessage()); + if (attempt > 5) + throw new IOException("After 5 attempts of loading the data it failed to continue. "); + } + } + } }