Skip to content

Commit 3873f87

Browse files
Feature prerelease docs and tutorials (#57)
* tutorial changes * doc and tutorial changes * doc and tutorial updates
1 parent a3d4f3e commit 3873f87

13 files changed

+1047
-386
lines changed

docs/source/APIDOCS.md

Lines changed: 18 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -214,8 +214,9 @@ table_schema = spark.table("test_vehicle_data").schema
214214
215215
print(table_schema)
216216
217-
dataspec = (dg.DataGenerator(spark, rows=10000000, partitions=8)
218-
.withSchema(table_schema))
217+
dataspec = (dg.DataGenerator(spark, rows=10000000, partitions=8,
218+
randomSeedMethod="hash_fieldname")
219+
.withSchema(table_schema))
219220
220221
dataspec = (dataspec
221222
.withColumnSpec("name", percentNulls=0.01, template=r'\\w \\w|\\w a. \\w')
@@ -280,13 +281,13 @@ manufacturers = ['Delta corp', 'Xyzzy Inc.', 'Lakehouse Ltd', 'Acme Corp', 'Emba
280281
lines = ['delta', 'xyzzy', 'lakehouse', 'gadget', 'droid']
281282

282283
testDataSpec = (dg.DataGenerator(spark, name="device_data_set", rows=data_rows,
283-
partitions=partitions_requested, randomSeedMethod='hash_fieldname',
284-
verbose=True, debug=True)
284+
partitions=partitions_requested,
285+
randomSeedMethod='hash_fieldname')
285286
.withIdOutput()
286287
# we'll use hash of the base field to generate the ids to
287288
# avoid a simple incrementing sequence
288289
.withColumn("internal_device_id", LongType(), minValue=0x1000000000000,
289-
uniqueValues=device_population)
290+
uniqueValues=device_population, omit=True, baseColumnType="hash")
290291

291292
# note for format strings, we must use "%lx" not "%x" as the
292293
# underlying value is a long
@@ -297,16 +298,7 @@ testDataSpec = (dg.DataGenerator(spark, name="device_data_set", rows=data_rows,
297298
# so lets use the internal device id as the base column for these attribute
298299
.withColumn("country", StringType(), values=country_codes,
299300
weights=country_weights,
300-
baseColumn="internal_device_id", baseColumnType="hash")
301-
.withColumn("country2a", LongType(),
302-
expr="((hash(internal_device_id) % 3847) + 3847) % 3847",
303301
baseColumn="internal_device_id")
304-
.withColumn("country2", IntegerType(),
305-
expr="""floor(cast( (((internal_device_id % 3847) + 3847) % 3847)
306-
as double) )""",
307-
baseColumn="internal_device_id")
308-
.withColumn("country3", StringType(), values=country_codes,
309-
baseColumn="country2")
310302
.withColumn("manufacturer", StringType(), values=manufacturers,
311303
baseColumn="internal_device_id")
312304

@@ -324,6 +316,7 @@ testDataSpec = (dg.DataGenerator(spark, name="device_data_set", rows=data_rows,
324316
values=["activation", "deactivation", "plan change",
325317
"telecoms activity", "internet activity", "device error"],
326318
random=True)
319+
.withColumn("event_ts", "timestamp", begin="2020-01-01 01:00:00", end="2020-12-31 23:59:00", interval="1 minute", random=True)
327320

328321
)
329322

@@ -363,7 +356,7 @@ or billions of rows.
363356
For example, using the same code as before, but with different rows and partitions settings, you can generate a billion
364357
rows of data and write it to a Delta table in under 2 minutes (with a 12 node x 8 core cluster)
365358

366-
```
359+
```python
367360
from pyspark.sql.types import LongType, IntegerType, StringType
368361

369362
import dbldatagen as dg
@@ -386,33 +379,23 @@ manufacturers = ['Delta corp', 'Xyzzy Inc.', 'Lakehouse Ltd', 'Acme Corp', 'Emba
386379
lines = ['delta', 'xyzzy', 'lakehouse', 'gadget', 'droid']
387380

388381
testDataSpec = (dg.DataGenerator(spark, name="device_data_set", rows=data_rows,
389-
partitions=partitions_requested, randomSeedMethod='hash_fieldname',
390-
verbose=True, debug=True)
382+
partitions=partitions_requested, randomSeedMethod='hash_fieldname')
391383
.withIdOutput()
392-
# we'll use hash of the base field to generate the ids to avoid a
393-
# simple incrementing sequence
384+
# we'll use hash of the base field to generate the ids to
385+
# avoid a simple incrementing sequence
394386
.withColumn("internal_device_id", LongType(), minValue=0x1000000000000,
395-
unique_values=device_population)
387+
uniqueValues=device_population, omit=True, baseColumnType="hash")
396388

397-
# note for format strings, we must use "%lx" not "%x" as the underlying
398-
# value is a long
389+
# note for format strings, we must use "%lx" not "%x" as the
390+
# underlying value is a long
399391
.withColumn("device_id", StringType(), format="0x%013x",
400392
baseColumn="internal_device_id")
401393

402394
# the device / user attributes will be the same for the same device id
403395
# so lets use the internal device id as the base column for these attribute
404396
.withColumn("country", StringType(), values=country_codes,
405397
weights=country_weights,
406-
baseColumn="internal_device_id", baseColumnType="hash")
407-
.withColumn("country2a", LongType(),
408-
expr="((hash(internal_device_id) % 3847) + 3847) % 3847",
409398
baseColumn="internal_device_id")
410-
.withColumn("country2", IntegerType(),
411-
expr="""floor(cast( (((internal_device_id % 3847) + 3847) % 3847)
412-
as double) )""",
413-
baseColumn="internal_device_id")
414-
.withColumn("country3", StringType(), values=country_codes,
415-
baseColumn="country2")
416399
.withColumn("manufacturer", StringType(), values=manufacturers,
417400
baseColumn="internal_device_id")
418401

@@ -428,9 +411,9 @@ testDataSpec = (dg.DataGenerator(spark, name="device_data_set", rows=data_rows,
428411
baseColumn=["line", "model_ser"])
429412
.withColumn("event_type", StringType(),
430413
values=["activation", "deactivation", "plan change",
431-
"telecoms activity",
432-
"internet activity", "device error"],
414+
"telecoms activity", "internet activity", "device error"],
433415
random=True)
416+
.withColumn("event_ts", "timestamp", begin="2020-01-01 01:00:00", end="2020-12-31 23:59:00", interval="1 minute", random=True)
434417

435418
)
436419

@@ -455,7 +438,7 @@ data_rows = 10000000
455438

456439
spark.conf.set("spark.sql.shuffle.partitions", shuffle_partitions_requested)
457440

458-
dataspec = (dg.DataGenerator(spark, rows=10000000, partitions=8)
441+
dataspec = (dg.DataGenerator(spark, rows=data_rows, partitions=8, randomSeedMethod="hash_fieldname")
459442
.withColumn("name", percentNulls=0.01, template=r'\\w \\w|\\w a. \\w')
460443
.withColumn("payment_instrument_type", values=['paypal', 'visa', 'mastercard', 'amex'], random=True)
461444
.withColumn("payment_instrument", minValue=1000000, maxValue=10000000, template="dddd dddddd ddddd")
@@ -500,7 +483,7 @@ data_rows = 10000000
500483
spark.conf.set("spark.sql.shuffle.partitions", shuffle_partitions_requested)
501484

502485
dataspec = (
503-
dg.DataGenerator(spark, rows=10000000, partitions=8, randomSeedMethod="hash_fieldname", randomSeed=42)
486+
dg.DataGenerator(spark, rows=data_rows, partitions=8, randomSeedMethod="hash_fieldname", randomSeed=42)
504487
.withColumn("name", percentNulls=0.01, template=r'\\w \\w|\\w a. \\w')
505488
.withColumn("payment_instrument_type", values=['paypal', 'visa', 'mastercard', 'amex'],
506489
random=True)

docs/source/extending_text_generation.rst

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,8 @@ extended syntax.
4141
'sesame','Jelly','beans',
4242
'pie','bar','Ice','oat' ]
4343
44-
fakerDataspec = (dg.DataGenerator(spark, rows=data_rows, partitions=partitions_requested)
44+
fakerDataspec = (dg.DataGenerator(spark, rows=data_rows, partitions=partitions_requested,
45+
randomSeedMethod="hash_fieldname")
4546
.withColumn("name", percentNulls=0.1, text=FakerText("name") )
4647
.withColumn("payment_instrument", text=FakerText("credit_card_number" ))
4748
.withColumn("email", text=FakerText("ascii_company_email") )
@@ -71,7 +72,7 @@ For more information, see :data:`~dbldatagen.text_generator_plugins.PyfuncText`
7172

7273
.. note::
7374

74-
The perform of text generation using external libraries or Python functions is substantially slower than the base
75+
The performance of text generation using external libraries or Python functions may be substantially slower than the base
7576
text generation capabilities. However it should be sufficient for generation of tables of up to
7677
100 million rows on a medium sized cluster.
7778

@@ -98,9 +99,10 @@ The following code shows use of a custom Python function to generate text:
9899
# the data generation function
99100
text_generator = (lambda context, value: context.prefix + str(value))
100101
101-
pluginDataspec = (dg.DataGenerator(spark, rows=data_rows, partitions=partitions_requested)
102-
.withColumn("text", text=PyfuncText(text_generator, initFn=initPluginContext))
103-
)
102+
pluginDataspec = (dg.DataGenerator(spark, rows=data_rows, partitions=partitions_requested,
103+
randomSeedMethod="hash_fieldname")
104+
.withColumn("text", text=PyfuncText(text_generator, initFn=initPluginContext))
105+
)
104106
dfPlugin = pluginDataspec.build()
105107
dfPlugin.show()
106108
@@ -164,7 +166,8 @@ IP addresses and credit card numbers.
164166
cc_generator = (lambda context, v : context.faker.credit_card_number())
165167
email_generator = (lambda context, v : context.faker.ascii_company_email())
166168
167-
fakerDataspec = (dg.DataGenerator(spark, rows=data_rows, partitions=partitions_requested)
169+
fakerDataspec = (dg.DataGenerator(spark, rows=data_rows, partitions=partitions_requested,
170+
randomSeedMethod="hash_fieldname")
168171
.withColumn("name",
169172
percentNulls=0.1,
170173
text=PyfuncText(name_generator , initFn=initFaker))

docs/source/generating_cdc_data.rst

Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
.. Test Data Generator documentation master file, created by
2+
sphinx-quickstart on Sun Jun 21 10:54:30 2020.
3+
You can adapt this file completely to your liking, but it should at least
4+
contain the root `toctree` directive.
5+
6+
Generating Change Data Capture data
7+
===================================
8+
9+
This section explores some of the features for generating CDC style data - that is exploring the abilitty to
10+
generate a base data set and then apply changes such as updates to existing rows and
11+
new rows that will be inserts to the existing data
12+
13+
See the section on repeatable data generation for the concepts that underpin the data generation.
14+
15+
Overview
16+
--------
17+
We'll generate a customer table, and write out the data.
18+
19+
Then we generate changes for the table and show merging them in.
20+
21+
To start, we'll specify some locations for our data:
22+
23+
.. code-block:: python
24+
25+
BASE_PATH = '/tmp/dbldatagen/cdc/'
26+
dbutils.fs.mkdirs(BASE_PATH)
27+
28+
customers1_location = BASE_PATH + "customers1"
29+
30+
Lets generate 10 million customer style records.
31+
32+
We'll add a timestamp for when the row was generated and a memo field to mark what operation added it.
33+
34+
.. code-block:: python
35+
36+
import dbldatagen as dg
37+
import pyspark.sql.functions as F
38+
39+
spark.catalog.clearCache()
40+
shuffle_partitions_requested = 8
41+
partitions_requested = 32
42+
data_rows = 10 * 1000 * 1000
43+
44+
spark.conf.set("spark.sql.shuffle.partitions", shuffle_partitions_requested)
45+
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
46+
spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", 20000)
47+
48+
uniqueCustomers = 10 * 1000000
49+
50+
dataspec = (dg.DataGenerator(spark, rows=data_rows, partitions=partitions_requested)
51+
.withColumn("customer_id","long", uniqueValues=uniqueCustomers)
52+
.withColumn("name", percentNulls=0.01, template=r'\\w \\w|\\w a. \\w')
53+
.withColumn("alias", percentNulls=0.01, template=r'\\w \\w|\\w a. \\w')
54+
.withColumn("payment_instrument_type", values=['paypal', 'Visa', 'Mastercard', 'American Express', 'discover', 'branded visa', 'branded mastercard'], random=True, distribution="normal")
55+
.withColumn("int_payment_instrument", "long", minValue=100000000000000, maxValue=999999999999999, baseColumn="customer_id", baseColumnType="hash", omit=True)
56+
.withColumn("payment_instrument", expr="format_number(int_payment_instrument, '#### ###### #####')", baseColumn="int_payment_instrument")
57+
.withColumn("email", template=r'\\w.\\w@\\w.com|\\w-\\w@\\w')
58+
.withColumn("email2", template=r'\\w.\\w@\\w.com')
59+
.withColumn("ip_address", template=r'\\n.\\n.\\n.\\n')
60+
.withColumn("md5_payment_instrument",
61+
expr="md5(concat(payment_instrument_type, ':', payment_instrument))",
62+
base_column=['payment_instrument_type', 'payment_instrument'])
63+
.withColumn("customer_notes", text=dg.ILText(words=(1,8)))
64+
.withColumn("created_ts", "timestamp", expr="now()")
65+
.withColumn("modified_ts", "timestamp", expr="now()")
66+
.withColumn("memo", expr="'original data'")
67+
)
68+
df1 = dataspec.build()
69+
70+
# write table
71+
72+
df1.write.format("delta").save(customers1_location)
73+
74+
Creating a table definition
75+
^^^^^^^^^^^^^^^^^^^^^^^^^^^
76+
77+
We can use the features of the data generator to script SQL definitions for table creation and merge
78+
statements.
79+
80+
Lets create a table definition around our data. As we generate a SQL statement with an explicit location,
81+
the table is implicitly ``external`` and will not overwrite our data.
82+
83+
.. code-block:: python
84+
85+
customers1_location = BASE_PATH + "customers1"
86+
tableDefn=dataspec.scriptTable(name="customers1", location=customers1_location)
87+
88+
spark.sql(tableDefn)
89+
90+
Now lets explore the table layout:
91+
92+
.. code-block:: sql
93+
94+
%sql
95+
-- lets check our table
96+
97+
select * from customers1
98+
99+
Creating Changes
100+
^^^^^^^^^^^^^^^^
101+
102+
Lets generate some changes.
103+
104+
Here we want to generate a set of new rows, which we guarantee to be new by using customer ids greater than the maximum
105+
existing customer id.
106+
107+
We will also generate a set of updates by sampling from the existing data and adding some modifications.
108+
109+
.. code-block:: python
110+
111+
import dbldatagen as dg
112+
import pyspark.sql.functions as F
113+
114+
start_of_new_ids = df1.select(F.max('customer_id')+1).collect()[0][0]
115+
116+
print(start_of_new_ids)
117+
118+
df1_inserts = (dataspec.clone()
119+
.option("startingId", start_of_new_ids)
120+
.withRowCount(10 * 1000)
121+
.build()
122+
.withColumn("memo", F.lit("insert"))
123+
.withColumn("customer_id", F.expr(f"customer_id + {start_of_new_ids}"))
124+
)
125+
126+
df1_updates = (df1.sample(False, 0.1)
127+
.limit(50 * 1000)
128+
.withColumn("alias", F.lit('modified alias'))
129+
.withColumn("modified_ts",F.expr('current_timestamp()'))
130+
.withColumn("memo", F.lit("update")))
131+
132+
133+
df_changes = df1_inserts.union(df1_updates)
134+
135+
display(df_changes)
136+
137+
Merging in the changes
138+
^^^^^^^^^^^^^^^^^^^^^^
139+
140+
We can script the merge statement in the data generator.
141+
142+
The ``updateColumns`` argument, specifies which columns should be updated.
143+
The corresponding ``updateColumnExprs`` argument provides SQL expressions as overrides for the
144+
columns being updated. These do not have to provided - in which case the
145+
values of the columns from the source table will be used.
146+
147+
.. code-block:: python
148+
149+
df_changes.dropDuplicates(["customer_id"]).createOrReplaceTempView("customers1_changes")
150+
sqlStmt = dataspec.scriptMerge(tgtName="customers1", srcName="customers1_changes",
151+
joinExpr="src.customer_id=tgt.customer_id",
152+
updateColumns=["alias", "memo","modified_ts"],
153+
updateColumnExprs=[ ("memo", "'updated on merge'"),
154+
("modified_ts", "now()")
155+
])
156+
157+
print(sqlStmt)
158+
159+
spark.sql(sqlStmt)
160+
161+
That's all that's required to perform merges with the data generation framework.
162+
Note that these merge script statements can be used as part of a streaming merge implementation also.

docs/source/index.rst

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,10 @@ to Scala or R based Spark applications also.
2727
Using data distributions <DISTRIBUTIONS>
2828
Options for column specification <options_and_features>
2929
Generating repeatable data <repeatable_data_generation>
30-
Extending text generation <extending_text_generation>
3130
Using streaming data <using_streaming_data>
31+
Generating Change Data Capture (CDC) data<generating_cdc_data>
32+
Multi table data <multi_table_data>
33+
Extending text generation <extending_text_generation>
3234
Troubleshooting data generation <troubleshooting>
3335

3436
.. toctree::

docs/source/multi_table_data.rst

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
.. Test Data Generator documentation master file, created by
2+
sphinx-quickstart on Sun Jun 21 10:54:30 2020.
3+
You can adapt this file completely to your liking, but it should at least
4+
contain the root `toctree` directive.
5+
6+
Multi table data and change data capture
7+
========================================
8+
9+
See the section repeatable data generation for the concepts that underpin the data generation.
10+
11+
One common scenario is the need to be able to generate multiple tables
12+
with consistent primary and foreign keys to model join or merge scenarios.
13+
14+
By generating tables with repeatable data, we can generate multiple versions of the same data for different tables and
15+
ensure that we have referential integrity across the tables.
16+
17+
Examples to be added.

0 commit comments

Comments
 (0)