From 9120dc2defbfbbdba7e56234d661ca1f128e0fda Mon Sep 17 00:00:00 2001 From: Lindsey Gray Date: Tue, 26 Nov 2019 18:26:03 -0600 Subject: [PATCH 1/4] faster file list ingestion for flat lists --- src/main/java/edu/vanderbilt/accre/laurelin/Root.java | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/src/main/java/edu/vanderbilt/accre/laurelin/Root.java b/src/main/java/edu/vanderbilt/accre/laurelin/Root.java index 3d515ed9..45afcc01 100644 --- a/src/main/java/edu/vanderbilt/accre/laurelin/Root.java +++ b/src/main/java/edu/vanderbilt/accre/laurelin/Root.java @@ -253,9 +253,14 @@ public TTreeDataSourceV2Reader(DataSourceOptions options, CacheFactory basketCac logger.trace("construct ttreedatasourcev2reader"); this.sparkContext = sparkContext; try { - this.paths = new LinkedList(); - for (String path: options.paths()) { - this.paths.addAll(IOFactory.expandPathToList(path)); + this.paths = new LinkedList(); + String[] in_paths = options.paths(); + for (int i = 0; i < in_paths.length; ++i) { + if(in_paths[i].endsWith(".root") ) { + this.paths.add(in_paths[i]); + } else { + this.paths.addAll(IOFactory.expandPathToList(in_paths[i])); + } } // FIXME - More than one file, please currFile = TFile.getFromFile(fileCache.getROOTFile(this.paths.get(0))); From 80635de5678c5f898c911e2578ab93def6fed8da Mon Sep 17 00:00:00 2001 From: Lindsey Gray Date: Thu, 5 Dec 2019 13:51:24 -0600 Subject: [PATCH 2/4] use more hadoopy goodness --- .../edu/vanderbilt/accre/laurelin/Root.java | 11 ++-------- .../accre/laurelin/root_proxy/HadoopFile.java | 20 +++++++++++++++++++ .../accre/laurelin/root_proxy/IOFactory.java | 8 ++++++++ .../accre/laurelin/root_proxy/NIOFile.java | 9 +++++++++ 4 files changed, 39 insertions(+), 9 deletions(-) diff --git a/src/main/java/edu/vanderbilt/accre/laurelin/Root.java b/src/main/java/edu/vanderbilt/accre/laurelin/Root.java index 45afcc01..daf0f372 100644 --- a/src/main/java/edu/vanderbilt/accre/laurelin/Root.java +++ b/src/main/java/edu/vanderbilt/accre/laurelin/Root.java @@ -253,15 +253,8 @@ public TTreeDataSourceV2Reader(DataSourceOptions options, CacheFactory basketCac logger.trace("construct ttreedatasourcev2reader"); this.sparkContext = sparkContext; try { - this.paths = new LinkedList(); - String[] in_paths = options.paths(); - for (int i = 0; i < in_paths.length; ++i) { - if(in_paths[i].endsWith(".root") ) { - this.paths.add(in_paths[i]); - } else { - this.paths.addAll(IOFactory.expandPathToList(in_paths[i])); - } - } + this.paths = new LinkedList(); + this.paths.addAll(IOFactory.expandPathsToList(options.paths())); // FIXME - More than one file, please currFile = TFile.getFromFile(fileCache.getROOTFile(this.paths.get(0))); treeName = options.get("tree").orElse("Events"); diff --git a/src/main/java/edu/vanderbilt/accre/laurelin/root_proxy/HadoopFile.java b/src/main/java/edu/vanderbilt/accre/laurelin/root_proxy/HadoopFile.java index 63fa4275..71f7795f 100644 --- a/src/main/java/edu/vanderbilt/accre/laurelin/root_proxy/HadoopFile.java +++ b/src/main/java/edu/vanderbilt/accre/laurelin/root_proxy/HadoopFile.java @@ -11,11 +11,13 @@ import java.util.ArrayList; import java.util.LinkedList; import java.util.List; +import java.util.Arrays; import java.util.concurrent.Future; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; @@ -71,6 +73,24 @@ public long getLimit() throws IOException { return limit; } + public static List expandPathsToList(String[] paths) throws IOException { + ArrayList out =new ArrayList(); + Configuration conf = new Configuration(); + FileSystem fileSystem = FileSystem.get(conf); + Path[] hpaths = new Path[paths.length]; + for(int i = 0; i < paths.length; ++i) { hpaths[i] = new Path(paths[i]); } + FileStatus[] statoos = fileSystem.listStatus(hpaths); + for (int i = 0; i < statoos.length; ++i) { + String strpath = statoos[i].getPath().getName(); + if((statoos[i].isFile() || statoos[i].isSymlink()) && strpath.endsWith(".root")) { + out.add(strpath); + } else if(statoos[i].isDirectory()) { + out.addAll(HadoopFile.expandPathToList(strpath)); + } + } + return out; + } + public static List expandPathToList(String path) throws IOException { Configuration conf = new Configuration(); URI uri = URI.create(path); diff --git a/src/main/java/edu/vanderbilt/accre/laurelin/root_proxy/IOFactory.java b/src/main/java/edu/vanderbilt/accre/laurelin/root_proxy/IOFactory.java index 9ca52c1f..14fe77d9 100644 --- a/src/main/java/edu/vanderbilt/accre/laurelin/root_proxy/IOFactory.java +++ b/src/main/java/edu/vanderbilt/accre/laurelin/root_proxy/IOFactory.java @@ -36,6 +36,14 @@ public static FileInterface openForRead(String path) throws IOException { return ret; } + public static List expandPathsToList(String[] paths) throws IOException { + if (Pattern.matches(hadoopPattern, paths[0])) { + return HadoopFile.expandPathsToList(paths); + } else { + return NIOFile.expandPathsToList(paths); + } + } + public static List expandPathToList(String path) throws IOException { if (Pattern.matches(hadoopPattern, path)) { return HadoopFile.expandPathToList(path); diff --git a/src/main/java/edu/vanderbilt/accre/laurelin/root_proxy/NIOFile.java b/src/main/java/edu/vanderbilt/accre/laurelin/root_proxy/NIOFile.java index 6fa6ea44..7b4503e2 100644 --- a/src/main/java/edu/vanderbilt/accre/laurelin/root_proxy/NIOFile.java +++ b/src/main/java/edu/vanderbilt/accre/laurelin/root_proxy/NIOFile.java @@ -20,6 +20,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; + public class NIOFile implements FileInterface { private RandomAccessFile fh; private FileChannel channel; @@ -68,6 +69,14 @@ public long getLimit() throws IOException { return fh.length(); } + public static List expandPathsToList(String[] paths) throws IOException { + ArrayList out = new ArrayList(); + for(int i = 0; i < paths.length; ++i) { + out.addAll(NIOFile.expandPathToList(paths[i])); + } + return out; + } + public static List expandPathToList(String path) throws IOException { File tmp = FileSystems.getDefault().getPath(path).toFile(); if (!tmp.isDirectory()) { From 904f1a2323d4cda91cd528baa4c72e4a9b0e901d Mon Sep 17 00:00:00 2001 From: Lindsey Gray Date: Thu, 5 Dec 2019 14:02:59 -0600 Subject: [PATCH 3/4] use LinkedList all the way through to avoid necessary loops when getting multiple paths --- src/main/java/edu/vanderbilt/accre/laurelin/Root.java | 3 +-- .../edu/vanderbilt/accre/laurelin/root_proxy/HadoopFile.java | 2 +- .../java/edu/vanderbilt/accre/laurelin/root_proxy/NIOFile.java | 3 ++- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/main/java/edu/vanderbilt/accre/laurelin/Root.java b/src/main/java/edu/vanderbilt/accre/laurelin/Root.java index daf0f372..e209f85a 100644 --- a/src/main/java/edu/vanderbilt/accre/laurelin/Root.java +++ b/src/main/java/edu/vanderbilt/accre/laurelin/Root.java @@ -253,8 +253,7 @@ public TTreeDataSourceV2Reader(DataSourceOptions options, CacheFactory basketCac logger.trace("construct ttreedatasourcev2reader"); this.sparkContext = sparkContext; try { - this.paths = new LinkedList(); - this.paths.addAll(IOFactory.expandPathsToList(options.paths())); + this.paths = (LinkedList)IOFactory.expandPathsToList(options.paths()); // FIXME - More than one file, please currFile = TFile.getFromFile(fileCache.getROOTFile(this.paths.get(0))); treeName = options.get("tree").orElse("Events"); diff --git a/src/main/java/edu/vanderbilt/accre/laurelin/root_proxy/HadoopFile.java b/src/main/java/edu/vanderbilt/accre/laurelin/root_proxy/HadoopFile.java index 71f7795f..64b9d22f 100644 --- a/src/main/java/edu/vanderbilt/accre/laurelin/root_proxy/HadoopFile.java +++ b/src/main/java/edu/vanderbilt/accre/laurelin/root_proxy/HadoopFile.java @@ -74,7 +74,7 @@ public long getLimit() throws IOException { } public static List expandPathsToList(String[] paths) throws IOException { - ArrayList out =new ArrayList(); + LinkedList out = new LinkedList(); Configuration conf = new Configuration(); FileSystem fileSystem = FileSystem.get(conf); Path[] hpaths = new Path[paths.length]; diff --git a/src/main/java/edu/vanderbilt/accre/laurelin/root_proxy/NIOFile.java b/src/main/java/edu/vanderbilt/accre/laurelin/root_proxy/NIOFile.java index 7b4503e2..2478b7e7 100644 --- a/src/main/java/edu/vanderbilt/accre/laurelin/root_proxy/NIOFile.java +++ b/src/main/java/edu/vanderbilt/accre/laurelin/root_proxy/NIOFile.java @@ -15,6 +15,7 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.util.ArrayList; +import java.util.LinkedList; import java.util.List; import java.util.concurrent.Future; import java.util.stream.Collectors; @@ -70,7 +71,7 @@ public long getLimit() throws IOException { } public static List expandPathsToList(String[] paths) throws IOException { - ArrayList out = new ArrayList(); + LinkedList out = new LinkedList(); for(int i = 0; i < paths.length; ++i) { out.addAll(NIOFile.expandPathToList(paths[i])); } From 447af062e0538cbcbd00d2bd88fe9f4eb3e93cdb Mon Sep 17 00:00:00 2001 From: Lindsey Gray Date: Thu, 5 Dec 2019 14:25:35 -0600 Subject: [PATCH 4/4] add tests --- .../accre/laurelin/root_proxy/HadoopFile.java | 5 +++-- .../vanderbilt/accre/root_proxy/IOTest.java | 19 +++++++++++++++++++ 2 files changed, 22 insertions(+), 2 deletions(-) diff --git a/src/main/java/edu/vanderbilt/accre/laurelin/root_proxy/HadoopFile.java b/src/main/java/edu/vanderbilt/accre/laurelin/root_proxy/HadoopFile.java index 64b9d22f..7a042912 100644 --- a/src/main/java/edu/vanderbilt/accre/laurelin/root_proxy/HadoopFile.java +++ b/src/main/java/edu/vanderbilt/accre/laurelin/root_proxy/HadoopFile.java @@ -76,12 +76,13 @@ public long getLimit() throws IOException { public static List expandPathsToList(String[] paths) throws IOException { LinkedList out = new LinkedList(); Configuration conf = new Configuration(); - FileSystem fileSystem = FileSystem.get(conf); + URI uri = URI.create(paths[0]); + FileSystem fileSystem = FileSystem.get(uri, conf); Path[] hpaths = new Path[paths.length]; for(int i = 0; i < paths.length; ++i) { hpaths[i] = new Path(paths[i]); } FileStatus[] statoos = fileSystem.listStatus(hpaths); for (int i = 0; i < statoos.length; ++i) { - String strpath = statoos[i].getPath().getName(); + String strpath = statoos[i].getPath().toString(); if((statoos[i].isFile() || statoos[i].isSymlink()) && strpath.endsWith(".root")) { out.add(strpath); } else if(statoos[i].isDirectory()) { diff --git a/src/test/java/edu/vanderbilt/accre/root_proxy/IOTest.java b/src/test/java/edu/vanderbilt/accre/root_proxy/IOTest.java index 6b9018dc..7c976166 100644 --- a/src/test/java/edu/vanderbilt/accre/root_proxy/IOTest.java +++ b/src/test/java/edu/vanderbilt/accre/root_proxy/IOTest.java @@ -13,6 +13,7 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.util.List; +import java.util.LinkedList; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -171,4 +172,22 @@ public void searchDirectory_hadoop() throws IOException { assertEquals(3, files.size()); } + @Test + public void searchDirectory_nio_paths() throws IOException { + String[] paths = new String[1]; + paths[0] = "testdata/recursive"; + List files = IOFactory.expandPathsToList(paths); + assertEquals(3, files.size()); + } + + @Test + public void searchDirectory_hadoop_paths() throws IOException { + Path currentRelativePath = Paths.get(""); + String s = currentRelativePath.toAbsolutePath().toString(); + String[] paths = new String[1]; + paths[0] = "file:///" + s + "/" + "testdata/recursive"; + List files = IOFactory.expandPathsToList(paths); + assertEquals(3, files.size()); + } + }