Skip to content

Commit 2f1f615

Browse files
committed
#748 Implement the ability to specify copybooks directly in JAR.
1 parent 565825f commit 2f1f615

File tree

8 files changed

+149
-49
lines changed

8 files changed

+149
-49
lines changed

README.md

+22-18
Original file line numberDiff line numberDiff line change
@@ -15,28 +15,28 @@ Add mainframe as a source to your data engineering strategy.
1515

1616
Among the motivations for this project, it is possible to highlight:
1717

18-
- Lack of expertise in the Cobol ecosystem, which makes it hard to integrate mainframes into data engineering strategies
18+
- Lack of expertise in the Cobol ecosystem, which makes it hard to integrate mainframes into data engineering strategies.
1919

20-
- Lack of support from the open-source community to initiatives in this field
20+
- Lack of support from the open-source community to initiatives in this field.
2121

22-
- The overwhelming majority (if not all) of tools to cope with this domain are proprietary
22+
- The overwhelming majority (if not all) of tools to cope with this domain are proprietary.
2323

24-
- Several institutions struggle daily to maintain their legacy mainframes, which prevents them from evolving to more modern approaches to data management
24+
- Several institutions struggle daily to maintain their legacy mainframes, which prevents them from evolving to more modern approaches to data management.
2525

26-
- Mainframe data can only take part in data science activities through very expensive investments
26+
- Mainframe data can only take part in data science activities through very expensive investments.
2727

2828

2929
## Features
3030

31-
- Supports primitive types (although some are "Cobol compiler specific")
31+
- Supports primitive types (although some are "Cobol compiler specific").
3232

33-
- Supports REDEFINES, OCCURS and DEPENDING ON fields (e.g. unchecked unions and variable-size arrays)
33+
- Supports REDEFINES, OCCURS and DEPENDING ON fields (e.g. unchecked unions and variable-size arrays).
3434

35-
- Supports nested structures and arrays
35+
- Supports nested structures and arrays.
3636

37-
- Supports HDFS as well as local file systems
37+
- Supports Hadoop (HDFS, S3, ...) as well as local file system.
3838

39-
- The COBOL copybooks parser doesn't have a Spark dependency and can be reused for integrating into other data processing engines
39+
- The COBOL copybooks parser doesn't have a Spark dependency and can be reused for integrating into other data processing engines.
4040

4141
## Videos
4242

@@ -135,18 +135,20 @@ Code coverage will be generated on path:
135135
{project-root}/cobrix/{module}/target/scala-{scala_version}/jacoco/report/html
136136
```
137137

138-
### Reading Cobol binary files from HDFS/local and querying them
138+
### Reading Cobol binary files from Hadoop/local and querying them
139139

140140
1. Create a Spark ```SQLContext```
141141

142142
2. Start a ```sqlContext.read``` operation specifying ```za.co.absa.cobrix.spark.cobol.source``` as the format
143143

144-
3. Inform the path to the copybook describing the files through ```... .option("copybook", "path_to_copybook_file")```. By default the copybook
145-
is expected to be in HDFS. You can specify that a copybook is located in the local file system by adding `file://` prefix. For example, you
146-
can specify a local file like this `.option("copybook", "file:///home/user/data/compybook.cpy")`. Alternatively, instead of providing a path
147-
to a copybook file you can provide the contents of the copybook itself by using `.option("copybook_contents", "...copybook contents...")`.
144+
3. Inform the path to the copybook describing the files through ```... .option("copybook", "path_to_copybook_file")```.
145+
- By default the copybook is expected to be in the default Hadoop filesystem (HDFS, S3, etc).
146+
- You can specify that a copybook is located in the local file system by adding `file://` prefix.
147+
- For example, you can specify a local file like this `.option("copybook", "file:///home/user/data/copybook.cpy")`.
148+
- Alternatively, instead of providing a path to a copybook file you can provide the contents of the copybook itself by using `.option("copybook_contents", "...copybook contents...")`.
149+
- You can store the copybook in the JAR itself at resources section in this case use `jar://` prefix, e.g.: `.option("copybook", "jar:///copybooks/copybook.cpy")`.
148150

149-
4. Inform the path to the HDFS directory containing the files: ```... .load("path_to_directory_containing_the_binary_files")```
151+
4. Inform the path to the Hadoop directory containing the files: ```... .load("s3a://path_to_directory_containing_the_binary_files")```
150152

151153
5. Inform the query you would like to run on the Cobol Dataframe
152154

@@ -208,7 +210,7 @@ val spark = SparkSession
208210
.master("local[2]")
209211
.config("duration", 2)
210212
.config("copybook", "path_to_the_copybook")
211-
.config("path", "path_to_source_directory") // could be both, local or HDFS
213+
.config("path", "path_to_source_directory") // could be both, local or Hadoop (s3://, hdfs://, etc)
212214
.getOrCreate()
213215

214216
val streamingContext = new StreamingContext(spark.sparkContext, Seconds(3))
@@ -607,7 +609,7 @@ records parsing.
607609

608610
However effective, this strategy may also suffer from excessive shuffling, since indexes may be sent to executors far from the actual data.
609611

610-
The latter issue is overcome by extracting the preferred locations for each index directly from HDFS, and then passing those locations to
612+
The latter issue is overcome by extracting the preferred locations for each index directly from HDFS/S3/..., and then passing those locations to
611613
Spark during the creation of the RDD that distributes the indexes.
612614

613615
When processing large collections, the overhead of collecting the locations is offset by the benefits of locality, thus, this feature is
@@ -618,6 +620,8 @@ enabled by default, but can be disabled by the configuration below:
618620

619621
### Workload optimization for variable-length records parsing
620622

623+
This feature works only for HDFS, not for any other of Hadoop filesystems.
624+
621625
When dealing with variable-length records, Cobrix strives to maximize locality by identifying the preferred locations in the cluster to parse
622626
each record, i.e. the nodes where the record resides.
623627

pom.xml

+2-2
Original file line numberDiff line numberDiff line change
@@ -108,9 +108,9 @@
108108
<maven.gpg.plugin.version>1.6</maven.gpg.plugin.version>
109109

110110
<!-- Frameworks and libraries -->
111-
<scala.version>2.12.17</scala.version>
111+
<scala.version>2.12.20</scala.version>
112112
<scala.compat.version>2.12</scala.compat.version>
113-
<spark.version>3.2.3</spark.version>
113+
<spark.version>3.4.4</spark.version>
114114
<scalatest.version>3.2.14</scalatest.version>
115115
<specs.version>2.4.16</specs.version>
116116
<guava.version>15.0</guava.version>

spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/copybook/CopybookContentLoader.scala

+19-19
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@ package za.co.absa.cobrix.spark.cobol.source.copybook
1818

1919
import org.apache.hadoop.conf.Configuration
2020
import za.co.absa.cobrix.cobol.reader.parameters.CobolParameters
21-
import za.co.absa.cobrix.spark.cobol.utils.{FileNameUtils, HDFSUtils}
21+
import za.co.absa.cobrix.spark.cobol.utils.FsType.LocalFs
22+
import za.co.absa.cobrix.spark.cobol.utils.{FileNameUtils, FsType, HDFSUtils, ResourceUtils}
2223

2324
import java.nio.charset.StandardCharsets
2425
import java.nio.file.{Files, Paths}
@@ -33,28 +34,27 @@ object CopybookContentLoader {
3334
(copyBookContents, copyBookPathFileName) match {
3435
case (Some(contents), _) => Seq(contents)
3536
case (None, Some(_)) =>
36-
val (isLocalFS, copyBookFileName) = FileNameUtils.getCopyBookFileName(copyBookPathFileName.get)
37-
Seq(
38-
if (isLocalFS) {
39-
loadCopybookFromLocalFS(copyBookFileName)
40-
} else {
41-
HDFSUtils.loadTextFileFromHadoop(hadoopConf, copyBookFileName)
42-
}
43-
)
44-
case (None, None) => parameters.multiCopybookPath.map(
45-
fileName => {
46-
val (isLocalFS, copyBookFileName) = FileNameUtils.getCopyBookFileName(fileName)
47-
if (isLocalFS) {
48-
loadCopybookFromLocalFS(copyBookFileName)
49-
} else {
50-
HDFSUtils.loadTextFileFromHadoop(hadoopConf, copyBookFileName)
51-
}
52-
}
53-
)
37+
val copybookContent = loadCopybook(copyBookPathFileName.get, hadoopConf)
38+
Seq(copybookContent)
39+
case (None, None) =>
40+
parameters.multiCopybookPath.map(fileName => loadCopybook(fileName, hadoopConf))
41+
}
42+
}
43+
44+
private def loadCopybook(pathToCopybook: String, hadoopConf: Configuration): String = {
45+
val (fsType, copyBookFileName) = FileNameUtils.getCopyBookFileName(pathToCopybook)
46+
fsType match {
47+
case FsType.LocalFs => loadCopybookFromLocalFS(copyBookFileName)
48+
case FsType.JarFs => loadCopybookFromJarResources(copyBookFileName)
49+
case FsType.HadoopFs => HDFSUtils.loadTextFileFromHadoop(hadoopConf, copyBookFileName)
5450
}
5551
}
5652

5753
private def loadCopybookFromLocalFS(copyBookLocalPath: String): String = {
5854
Files.readAllLines(Paths.get(copyBookLocalPath), StandardCharsets.ISO_8859_1).toArray.mkString("\n")
5955
}
56+
57+
private def loadCopybookFromJarResources(copyBookJarPath: String): String = {
58+
ResourceUtils.readResourceAsString(copyBookJarPath)
59+
}
6060
}

spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/parameters/CobolParametersValidator.scala

+10-4
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,13 @@ package za.co.absa.cobrix.spark.cobol.source.parameters
1818

1919
import java.io.FileNotFoundException
2020
import java.nio.file.{Files, Paths}
21-
2221
import org.apache.hadoop.conf.Configuration
2322
import org.apache.hadoop.fs.Path
2423
import org.apache.spark.SparkConf
2524
import za.co.absa.cobrix.cobol.reader.parameters.CobolParameters
2625
import za.co.absa.cobrix.spark.cobol.parameters.CobolParametersParser._
27-
import za.co.absa.cobrix.spark.cobol.utils.FileNameUtils
26+
import za.co.absa.cobrix.spark.cobol.utils.ResourceUtils.getClass
27+
import za.co.absa.cobrix.spark.cobol.utils.{FileNameUtils, FsType}
2828

2929
/**
3030
* This class provides methods for checking the Spark job options after parsed.
@@ -66,8 +66,8 @@ object CobolParametersValidator {
6666
}
6767

6868
def validatePath(fileName: String): Unit = {
69-
val (isLocalFS, copyBookFileName) = FileNameUtils.getCopyBookFileName(fileName)
70-
if (isLocalFS) {
69+
val (fsType, copyBookFileName) = FileNameUtils.getCopyBookFileName(fileName)
70+
if (fsType == FsType.LocalFs) {
7171
if (!Files.exists(Paths.get(copyBookFileName))) {
7272
throw new FileNotFoundException(s"Copybook not found at $copyBookFileName")
7373
}
@@ -77,6 +77,12 @@ object CobolParametersValidator {
7777
if (!Files.isReadable(Paths.get(copyBookFileName))) {
7878
throw new IllegalArgumentException(s"The copybook path '$copyBookFileName' is not readable.")
7979
}
80+
} else if (fsType == FsType.JarFs) {
81+
if (getClass.getResourceAsStream(copyBookFileName) == null) {
82+
if (!Files.exists(Paths.get(copyBookFileName))) {
83+
throw new FileNotFoundException(s"Copybook not found at the JAR resource path: $copyBookFileName")
84+
}
85+
}
8086
} else {
8187
val fs = new Path(fileName).getFileSystem(hadoopConf)
8288
if (!fs.exists(new Path(copyBookFileName))) {

spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/utils/FileNameUtils.scala

+11-5
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,20 @@ package za.co.absa.cobrix.spark.cobol.utils
1818

1919
object FileNameUtils {
2020
private val LOCALFS_PREFIX = "file://"
21+
private val JAR_PREFIX = "jar://"
2122

22-
def getCopyBookFileName(fileNameURI: String):(Boolean, String) = {
23+
def getCopyBookFileName(fileNameURI: String): (FsType, String) = {
2324
val isLocalFS = fileNameURI.toLowerCase.startsWith(LOCALFS_PREFIX)
24-
val copyBookFileName = if (isLocalFS)
25-
fileNameURI.drop(LOCALFS_PREFIX.length)
25+
val isJar = fileNameURI.toLowerCase.startsWith(JAR_PREFIX)
26+
if (isLocalFS)
27+
(FsType.LocalFs, fileNameURI.drop(LOCALFS_PREFIX.length))
28+
else if (isJar) {
29+
val fileCandidate = fileNameURI.drop(JAR_PREFIX.length)
30+
val filePath = if (fileCandidate.startsWith("/")) fileCandidate else s"/$fileCandidate"
31+
(FsType.JarFs, filePath)
32+
}
2633
else
27-
fileNameURI
28-
(isLocalFS, copyBookFileName)
34+
(FsType.HadoopFs, fileNameURI)
2935
}
3036

3137
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/*
2+
* Copyright 2018 ABSA Group Limited
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package za.co.absa.cobrix.spark.cobol.utils
18+
19+
sealed trait FsType
20+
21+
object FsType {
22+
case object LocalFs extends FsType
23+
case object JarFs extends FsType
24+
case object HadoopFs extends FsType
25+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
****************************************************************************
2+
* *
3+
* Copyright 2018 ABSA Group Limited *
4+
* *
5+
* Licensed under the Apache License, Version 2.0 (the "License"); *
6+
* you may not use this file except in compliance with the License. *
7+
* You may obtain a copy of the License at *
8+
* *
9+
* http://www.apache.org/licenses/LICENSE-2.0 *
10+
* *
11+
* Unless required by applicable law or agreed to in writing, software *
12+
* distributed under the License is distributed on an "AS IS" BASIS, *
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
14+
* See the License for the specific language governing permissions and *
15+
* limitations under the License. *
16+
* *
17+
****************************************************************************
18+
19+
01 R.
20+
03 A PIC X(1).
21+
03 B PIC X(2).

spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/regression/Test11NoCopybookErrMsg.scala

+39-1
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,13 @@
1717
package za.co.absa.cobrix.spark.cobol.source.regression
1818

1919
import java.nio.file.Paths
20-
2120
import org.scalatest.funsuite.AnyFunSuite
2221
import org.slf4j.{Logger, LoggerFactory}
2322
import za.co.absa.cobrix.spark.cobol.source.base.SparkTestBase
2423
import za.co.absa.cobrix.spark.cobol.source.fixtures.BinaryFileFixture
2524

25+
import java.io.FileNotFoundException
26+
2627
class Test11NoCopybookErrMsg extends AnyFunSuite with SparkTestBase with BinaryFileFixture {
2728

2829
private implicit val logger: Logger = LoggerFactory.getLogger(this.getClass)
@@ -50,6 +51,32 @@ class Test11NoCopybookErrMsg extends AnyFunSuite with SparkTestBase with BinaryF
5051
}
5152
}
5253

54+
test("Test a file loads normally when the copybook is a JAR resource") {
55+
withTempBinFile("bin_file2", ".dat", binFileContents) { tmpFileName =>
56+
val df = spark
57+
.read
58+
.format("cobol")
59+
.option("copybook", "jar:///test/copybook.cpy")
60+
.option("schema_retention_policy", "collapse_root")
61+
.load(tmpFileName)
62+
63+
assert(df.count == 1)
64+
}
65+
}
66+
67+
test("Test a file loads normally when the copybook is a JAR resource short") {
68+
withTempBinFile("bin_file2", ".dat", binFileContents) { tmpFileName =>
69+
val df = spark
70+
.read
71+
.format("cobol")
72+
.option("copybook", "jar://test/copybook.cpy")
73+
.option("schema_retention_policy", "collapse_root")
74+
.load(tmpFileName)
75+
76+
assert(df.count == 1)
77+
}
78+
}
79+
5380
test("Test the error message logged when no copybook is provided") {
5481
val ex = intercept[IllegalStateException] {
5582
spark
@@ -132,4 +159,15 @@ class Test11NoCopybookErrMsg extends AnyFunSuite with SparkTestBase with BinaryF
132159
}
133160
}
134161

162+
test("Test the error message of the copybook is not in the JAr resource") {
163+
val ex = intercept[FileNotFoundException] {
164+
spark
165+
.read
166+
.format("cobol")
167+
.option("copybook", "jar://test/copybook_non_existent.cpy")
168+
.load("/tmp/doesnt/matter")
169+
}
170+
171+
assert(ex.getMessage == "Copybook not found at the JAR resource path: /test/copybook_non_existent.cpy")
172+
}
135173
}

0 commit comments

Comments
 (0)