diff --git a/src/main/java/edu/vanderbilt/accre/laurelin/Root.java b/src/main/java/edu/vanderbilt/accre/laurelin/Root.java index 3d515ed9..e209f85a 100644 --- a/src/main/java/edu/vanderbilt/accre/laurelin/Root.java +++ b/src/main/java/edu/vanderbilt/accre/laurelin/Root.java @@ -253,10 +253,7 @@ public TTreeDataSourceV2Reader(DataSourceOptions options, CacheFactory basketCac logger.trace("construct ttreedatasourcev2reader"); this.sparkContext = sparkContext; try { - this.paths = new LinkedList(); - for (String path: options.paths()) { - this.paths.addAll(IOFactory.expandPathToList(path)); - } + this.paths = (LinkedList)IOFactory.expandPathsToList(options.paths()); // FIXME - More than one file, please currFile = TFile.getFromFile(fileCache.getROOTFile(this.paths.get(0))); treeName = options.get("tree").orElse("Events"); diff --git a/src/main/java/edu/vanderbilt/accre/laurelin/root_proxy/HadoopFile.java b/src/main/java/edu/vanderbilt/accre/laurelin/root_proxy/HadoopFile.java index 63fa4275..7a042912 100644 --- a/src/main/java/edu/vanderbilt/accre/laurelin/root_proxy/HadoopFile.java +++ b/src/main/java/edu/vanderbilt/accre/laurelin/root_proxy/HadoopFile.java @@ -11,11 +11,13 @@ import java.util.ArrayList; import java.util.LinkedList; import java.util.List; +import java.util.Arrays; import java.util.concurrent.Future; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; @@ -71,6 +73,25 @@ public long getLimit() throws IOException { return limit; } + public static List expandPathsToList(String[] paths) throws IOException { + LinkedList out = new LinkedList(); + Configuration conf = new Configuration(); + URI uri = URI.create(paths[0]); + FileSystem fileSystem = FileSystem.get(uri, conf); + Path[] hpaths = new Path[paths.length]; + for(int i = 0; i < paths.length; ++i) { hpaths[i] = new Path(paths[i]); } + FileStatus[] statoos = fileSystem.listStatus(hpaths); + for (int i = 0; i < statoos.length; ++i) { + String strpath = statoos[i].getPath().toString(); + if((statoos[i].isFile() || statoos[i].isSymlink()) && strpath.endsWith(".root")) { + out.add(strpath); + } else if(statoos[i].isDirectory()) { + out.addAll(HadoopFile.expandPathToList(strpath)); + } + } + return out; + } + public static List expandPathToList(String path) throws IOException { Configuration conf = new Configuration(); URI uri = URI.create(path); diff --git a/src/main/java/edu/vanderbilt/accre/laurelin/root_proxy/IOFactory.java b/src/main/java/edu/vanderbilt/accre/laurelin/root_proxy/IOFactory.java index 9ca52c1f..14fe77d9 100644 --- a/src/main/java/edu/vanderbilt/accre/laurelin/root_proxy/IOFactory.java +++ b/src/main/java/edu/vanderbilt/accre/laurelin/root_proxy/IOFactory.java @@ -36,6 +36,14 @@ public static FileInterface openForRead(String path) throws IOException { return ret; } + public static List expandPathsToList(String[] paths) throws IOException { + if (Pattern.matches(hadoopPattern, paths[0])) { + return HadoopFile.expandPathsToList(paths); + } else { + return NIOFile.expandPathsToList(paths); + } + } + public static List expandPathToList(String path) throws IOException { if (Pattern.matches(hadoopPattern, path)) { return HadoopFile.expandPathToList(path); diff --git a/src/main/java/edu/vanderbilt/accre/laurelin/root_proxy/NIOFile.java b/src/main/java/edu/vanderbilt/accre/laurelin/root_proxy/NIOFile.java index 6fa6ea44..2478b7e7 100644 --- a/src/main/java/edu/vanderbilt/accre/laurelin/root_proxy/NIOFile.java +++ b/src/main/java/edu/vanderbilt/accre/laurelin/root_proxy/NIOFile.java @@ -15,11 +15,13 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.util.ArrayList; +import java.util.LinkedList; import java.util.List; import java.util.concurrent.Future; import java.util.stream.Collectors; import java.util.stream.Stream; + public class NIOFile implements FileInterface { private RandomAccessFile fh; private FileChannel channel; @@ -68,6 +70,14 @@ public long getLimit() throws IOException { return fh.length(); } + public static List expandPathsToList(String[] paths) throws IOException { + LinkedList out = new LinkedList(); + for(int i = 0; i < paths.length; ++i) { + out.addAll(NIOFile.expandPathToList(paths[i])); + } + return out; + } + public static List expandPathToList(String path) throws IOException { File tmp = FileSystems.getDefault().getPath(path).toFile(); if (!tmp.isDirectory()) { diff --git a/src/test/java/edu/vanderbilt/accre/root_proxy/IOTest.java b/src/test/java/edu/vanderbilt/accre/root_proxy/IOTest.java index 6b9018dc..7c976166 100644 --- a/src/test/java/edu/vanderbilt/accre/root_proxy/IOTest.java +++ b/src/test/java/edu/vanderbilt/accre/root_proxy/IOTest.java @@ -13,6 +13,7 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.util.List; +import java.util.LinkedList; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -171,4 +172,22 @@ public void searchDirectory_hadoop() throws IOException { assertEquals(3, files.size()); } + @Test + public void searchDirectory_nio_paths() throws IOException { + String[] paths = new String[1]; + paths[0] = "testdata/recursive"; + List files = IOFactory.expandPathsToList(paths); + assertEquals(3, files.size()); + } + + @Test + public void searchDirectory_hadoop_paths() throws IOException { + Path currentRelativePath = Paths.get(""); + String s = currentRelativePath.toAbsolutePath().toString(); + String[] paths = new String[1]; + paths[0] = "file:///" + s + "/" + "testdata/recursive"; + List files = IOFactory.expandPathsToList(paths); + assertEquals(3, files.size()); + } + }