5
5
package edu .vanderbilt .accre .laurelin .root_proxy ;
6
6
7
7
import java .io .IOException ;
8
+ import java .util .ArrayList ;
9
+ import java .util .Arrays ;
10
+ import java .util .HashMap ;
8
11
import java .util .List ;
12
+ import java .util .Map ;
13
+ import java .util .Map .Entry ;
9
14
import java .util .regex .Pattern ;
10
15
16
+ import org .apache .hadoop .conf .Configuration ;
17
+ import org .apache .hadoop .fs .FileStatus ;
18
+ import org .apache .hadoop .fs .FileSystem ;
19
+ import org .apache .hadoop .fs .LocatedFileStatus ;
20
+ import org .apache .hadoop .fs .Path ;
21
+ import org .apache .hadoop .fs .RemoteIterator ;
22
+ import org .apache .spark .deploy .SparkHadoopUtil ;
23
+ import org .apache .spark .sql .SparkSession ;
24
+
25
+ import scala .collection .JavaConverters ;
26
+ import scala .collection .Seq ;
27
+
11
28
public class IOFactory {
12
- static final String hadoopPattern = "^[a-zA-Z]+:/ .*" ;
29
+ static final String hadoopPattern = "^[a-zA-Z]+:.*" ;
13
30
14
31
public static FileInterface openForRead (String path ) throws IOException {
15
32
/**
@@ -36,6 +53,157 @@ public static FileInterface openForRead(String path) throws IOException {
36
53
return ret ;
37
54
}
38
55
56
+ /**
57
+ * Perform glob-expansion on a list of paths, then recursively expand any
58
+ * directories listed in the list.
59
+ *
60
+ * @param paths Paths to be expanded
61
+ * @return Fully expanded list of ROOT file paths
62
+ * @throws IOException If any globs don't resolve or paths don't exist
63
+ */
64
+ public static List <Path > resolvePathList (List <String > paths ) throws IOException {
65
+ Configuration hadoopConf ;
66
+ try {
67
+ hadoopConf = SparkSession .active ().sparkContext ().hadoopConfiguration ();
68
+ } catch (IllegalStateException e ) {
69
+ hadoopConf = new Configuration ();
70
+ }
71
+
72
+ List <Path > globResolved = new ArrayList <Path >(paths .size ());
73
+ // First perform any globbing
74
+ for (String path : paths ) {
75
+ if (isGlob (path )) {
76
+ globResolved .addAll (resolveGlob (path ));
77
+ } else {
78
+ globResolved .add (new Path (path ));
79
+ }
80
+ }
81
+
82
+ /*
83
+ * Now, with globs turned into concrete paths, we want to walk through
84
+ * the list and check the type of each file:
85
+ *
86
+ * 1) If a file, add that file directly to our list of input paths
87
+ * 2) If a directory, recurseivly add every file ending in .root
88
+ *
89
+ * There is a problem, however. Each file lookup is synchronous, and if
90
+ * the filesystem is remote (e.g. reading xrootd across the WAN), each
91
+ * stat() can take upwards of 100msec, which can take forever if the
92
+ * user passes in a list of 10k files they'd like to process.
93
+ *
94
+ * As an optimization, instead of requesting the status of each path
95
+ * directly, request the directory listing of each path's parent
96
+ * directory to discover the types of each entry. This way, the number
97
+ * of FS calls scales by the number of parent directories and not the
98
+ * number of paths.
99
+ *
100
+ * It should also be noted that the hadoop-xrootd connector unrolls
101
+ * the multi-arg form of listStatus to individual calls, so that doesn't
102
+ * help.
103
+ */
104
+
105
+ // Loop over all the paths and keep the unique parents of them all
106
+ // TODO: Is repeatedly instantiating FileSystem objects slow over WAN?
107
+ Map <Path , List <FileStatus >> parentDirectories = new HashMap <Path , List <FileStatus >>();
108
+ Map <Path , Path > childToParentMap = new HashMap <Path , Path >();
109
+ Map <Path , Path > qualifiedChildToParentMap = new HashMap <Path , Path >();
110
+ for (Path path : globResolved ) {
111
+ Path parent = path .getParent ();
112
+ parentDirectories .put (parent , null );
113
+ childToParentMap .put (path , parent );
114
+ FileSystem fs = parent .getFileSystem (hadoopConf );
115
+ Path qualifiedChild = path .makeQualified (fs .getUri (), fs .getWorkingDirectory ());
116
+ qualifiedChildToParentMap .put (qualifiedChild , parent );
117
+ }
118
+
119
+ // Retrieve the listing for all the parent dirs
120
+ Map <Path , List <FileStatus >> parentToStatusMap = new HashMap <Path , List <FileStatus >>();
121
+ Map <Path , FileStatus > qualifiedListingToStatusMap = new HashMap <Path , FileStatus >();
122
+ for (Path parent : parentDirectories .keySet ()) {
123
+ FileSystem fs = parent .getFileSystem (hadoopConf );
124
+ FileStatus [] listing = fs .listStatus (parent );
125
+ parentToStatusMap .put (parent , Arrays .asList (listing ));
126
+ for (FileStatus s : listing ) {
127
+ assert qualifiedListingToStatusMap .containsKey (s .getPath ()) == false ;
128
+ qualifiedListingToStatusMap .put (s .getPath (), s );
129
+ }
130
+ }
131
+
132
+ assert qualifiedListingToStatusMap .size () >= globResolved .size (): "qualifiedlisting < globresolved" ;
133
+
134
+ /*
135
+ * At this point, we have a list of post-globbing URIs and lists of
136
+ * FileStatus for every parent of those URIs. Use this to make a map of
137
+ * Globbed path -> FileStatus
138
+ */
139
+ Map <Path , FileStatus > clientRequestedPathToStatusMap = new HashMap <Path , FileStatus >();
140
+ for (Entry <Path , Path > e : qualifiedChildToParentMap .entrySet ()) {
141
+ if (!qualifiedListingToStatusMap .containsKey (e .getKey ())) {
142
+ throw new IOException ("Path not found: " + e .getKey ());
143
+ }
144
+ FileStatus status = qualifiedListingToStatusMap .get (e .getKey ());
145
+ clientRequestedPathToStatusMap .put (e .getKey (), status );
146
+ }
147
+
148
+ // Walk the statuses to sort between files and directories
149
+ List <Path > ret = new ArrayList <Path >(globResolved .size ());
150
+ for (FileStatus status : clientRequestedPathToStatusMap .values ()) {
151
+ Path path = status .getPath ();
152
+ if (status .isDirectory ()) {
153
+ // We were given a directory, add everything recursively
154
+ FileSystem fs = status .getPath ().getFileSystem (hadoopConf );
155
+ RemoteIterator <LocatedFileStatus > fileList = fs .listFiles (status .getPath (), true );
156
+ while (fileList .hasNext ()) {
157
+ LocatedFileStatus file = fileList .next ();
158
+ if (file .isFile () && (file .getPath ().getName ().endsWith (".root" ))) {
159
+ ret .add (file .getPath ());
160
+ }
161
+ }
162
+ } else if (status .isFile ()) {
163
+ ret .add (status .getPath ());
164
+ } else {
165
+ throw new IOException ("File '" + path + "' is an unknown type" );
166
+ }
167
+ }
168
+
169
+ return ret ;
170
+ }
171
+
172
+ /**
173
+ * Perform glob expansion on a path
174
+ * @param path Glob to expand
175
+ * @return List of paths that match the given glob
176
+ * @throws IOException Nothing matches the given glob
177
+ */
178
+ private static List <Path > resolveGlob (String path ) throws IOException {
179
+ Configuration hadoopConf ;
180
+ try {
181
+ hadoopConf = SparkSession .active ().sparkContext ().hadoopConfiguration ();
182
+ } catch (IllegalStateException e ) {
183
+ hadoopConf = new Configuration ();
184
+ }
185
+
186
+ Path hdfsPath = new Path (path );
187
+ FileSystem fs = hdfsPath .getFileSystem (hadoopConf );
188
+ Path qualified = hdfsPath .makeQualified (fs .getUri (), fs .getWorkingDirectory ());
189
+ Seq <Path > globPath = SparkHadoopUtil .get ().globPathIfNecessary (fs , qualified );
190
+ if (globPath .isEmpty ()) {
191
+ throw new IOException ("Path does not exist: " + qualified );
192
+ }
193
+ // TODO: Is this stable across Scala versions?
194
+ List <Path > ret = JavaConverters .seqAsJavaListConverter (globPath ).asJava ();
195
+ return ret ;
196
+ }
197
+
198
+ /**
199
+ * See if the given path has any glob metacharacters
200
+ * @param path Input path
201
+ * @return True if the path looks like a glob. False otherwise.
202
+ */
203
+ private static boolean isGlob (String path ) {
204
+ return path .matches (".*[{}\\ [\\ ]*?].*" );
205
+ }
206
+
39
207
public static List <String > expandPathToList (String path ) throws IOException {
40
208
if (Pattern .matches (hadoopPattern , path )) {
41
209
return HadoopFile .expandPathToList (path );
0 commit comments