Skip to content

Commit 0c009a5

Browse files
Merge pull request #394 from rustprooflabs/dev
Dev to Main
2 parents 6727773 + 20f91a5 commit 0c009a5

File tree

11 files changed

+365
-272
lines changed

11 files changed

+365
-272
lines changed

Dockerfile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ RUN apt-get update \
2020
curl unzip \
2121
postgresql-16-pgrouting \
2222
nlohmann-json3-dev \
23+
osmium-tool \
2324
&& rm -rf /var/lib/apt/lists/*
2425

2526
RUN wget https://luarocks.org/releases/luarocks-3.9.2.tar.gz \

db/deploy/osm_pgosm_flex.sql

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ BEGIN;
55
CREATE TABLE IF NOT EXISTS {schema_name}.pgosm_flex (
66
id BIGINT NOT NULL GENERATED BY DEFAULT AS IDENTITY,
77
imported TIMESTAMPTZ NOT NULL DEFAULT NOW(),
8-
osm_date date NOT NULL,
8+
osm_date TIMESTAMPTZ NOT NULL,
99
region text NOT NULL,
1010
layerset TEXT NULL,
1111
srid text NOT NULL,
@@ -43,10 +43,12 @@ ALTER TABLE {schema_name}.pgosm_flex
4343
ALTER TABLE {schema_name}.pgosm_flex DROP COLUMN IF EXISTS project_url;
4444
ALTER TABLE {schema_name}.pgosm_flex DROP COLUMN IF EXISTS default_date;
4545

46+
ALTER TABLE {schema_name}.pgosm_flex ALTER COLUMN osm_date TYPE TIMESTAMPTZ;
47+
4648
COMMENT ON TABLE {schema_name}.pgosm_flex IS 'Provides meta information on the PgOSM-Flex project including version and SRID used during the import. One row per import.';
4749

4850
COMMENT ON COLUMN {schema_name}.pgosm_flex.imported IS 'Indicates when the import was ran.';
49-
COMMENT ON COLUMN {schema_name}.pgosm_flex.osm_date IS 'Indicates the date of the OpenStreetMap data loaded. Recommended to set PGOSM_DATE env var at runtime, otherwise defaults to the date PgOSM-Flex was run.';
51+
COMMENT ON COLUMN {schema_name}.pgosm_flex.osm_date IS 'Indicates the date of the OpenStreetMap data loaded. Uses timestamp from PBF file metadata when available. If metadata not available this represents --osm-date at runtime, or the date of today in timezone based on computer running import.';
5052
COMMENT ON COLUMN {schema_name}.pgosm_flex.srid IS 'SRID of imported data.';
5153
COMMENT ON COLUMN {schema_name}.pgosm_flex.pgosm_flex_version IS 'Version of PgOSM-Flex used to generate schema.';
5254
COMMENT ON COLUMN {schema_name}.pgosm_flex.osm2pgsql_version IS 'Version of osm2pgsql used to load data.';

docker/db.py

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,10 @@
11
"""Module to interact with Postgres database.
2+
3+
Dynamic SQL is used in this module to allow customized schema names for storing
4+
data. At a glance, this is vulnerable to SQLi (SQL Injection) considering the
5+
``schema_name`` variable is technically "user input". This is not considered
6+
a concern for this project because the user inputting the ``schema_name`` value
7+
is considered a trusted user.
28
"""
39
import logging
410
import os
@@ -607,7 +613,7 @@ def run_pg_dump(export_path, skip_qgis_style):
607613
fix_pg_dump_create_public(export_path)
608614

609615

610-
def fix_pg_dump_create_public(export_path):
616+
def fix_pg_dump_create_public(export_path: str):
611617
"""Using pg_dump with `--schema=public` results in
612618
a .sql script containing `CREATE SCHEMA public;`, nearly always breaks
613619
in target DB. Replaces with `CREATE SCHEMA IF NOT EXISTS public;`
@@ -623,24 +629,34 @@ def fix_pg_dump_create_public(export_path):
623629
LOGGER.debug(result)
624630

625631

626-
def log_import_message(import_id, msg, schema_name):
632+
def log_import_message(import_id: int, msg: str, schema_name: str):
627633
"""Logs msg to database in osm.pgosm_flex for import_uuid.
628634
635+
Overwrites `osm_date` if `pbf_timestamp` is set.
636+
629637
Parameters
630638
-------------------------------
631639
import_id : int
632640
msg : str
633641
schema_name: str
634642
"""
643+
try:
644+
pbf_timestamp = os.environ['PBF_TIMESTAMP']
645+
except KeyError:
646+
pbf_timestamp = os.environ['PGOSM_DATE']
647+
635648
sql_raw = """
636649
UPDATE {schema_name}.pgosm_flex
637-
SET import_status = %(msg)s
650+
SET import_status = %(msg)s ,
651+
osm_date = COALESCE( %(pbf_timestamp)s , osm_date)
638652
WHERE id = %(import_id)s
639653
;
640654
"""
641655
sql_raw = sql_raw.format(schema_name=schema_name)
642656
with get_db_conn(conn_string=os.environ['PGOSM_CONN']) as conn:
643-
params = {'import_id': import_id, 'msg': msg}
657+
params = {'import_id': import_id,
658+
'msg': msg,
659+
'pbf_timestamp': pbf_timestamp}
644660
cur = conn.cursor()
645661
cur.execute(sql_raw, params=params)
646662

docker/geofabrik.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
"""This module handles the auto-file handling using Geofabrik's download service.
22
"""
33
import logging
4+
import json
45
import os
56
import shutil
67
import subprocess
@@ -67,10 +68,45 @@ def prepare_data(out_path: str) -> str:
6768
md5_file_with_date)
6869

6970
helpers.verify_checksum(md5_file, out_path)
71+
set_date_from_metadata(pbf_file=pbf_file)
7072

7173
return pbf_file
7274

7375

76+
def set_date_from_metadata(pbf_file: str):
77+
"""Use `osmium fileinfo` to set a more accurate date to represent when it was
78+
extracted from OpenStreetMap.
79+
80+
Parameters
81+
---------------------
82+
pbf_file : str
83+
Full path to the `.osm.pbf` file.
84+
"""
85+
logger = logging.getLogger('pgosm-flex')
86+
osmium_cmd = f'osmium fileinfo {pbf_file} --json'
87+
output = []
88+
returncode = helpers.run_command_via_subprocess(cmd=osmium_cmd.split(),
89+
cwd=None,
90+
output_lines=output,
91+
print_to_log=False)
92+
if returncode != 0:
93+
logger.error(f'osmium fileinfo failed. Output: {output}')
94+
95+
output_joined = json.loads(''.join(output))
96+
meta_options = output_joined['header']['option']
97+
98+
try:
99+
meta_timestamp = meta_options['timestamp']
100+
except KeyError:
101+
try:
102+
meta_timestamp = meta_options['osmosis_replication_timestamp']
103+
except KeyError:
104+
meta_timestamp = None
105+
106+
logger.info(f'PBF Meta timestamp: {meta_timestamp}')
107+
os.environ['PBF_TIMESTAMP'] = meta_timestamp
108+
109+
74110
def pbf_download_needed(pbf_file_with_date: str, md5_file_with_date: str,
75111
pgosm_date: str) -> bool:
76112
"""Decides if the PBF/MD5 files need to be downloaded.

docker/helpers.py

Lines changed: 189 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
"""Generic functions and attributes used in multiple modules of PgOSM Flex.
22
"""
33
import datetime
4+
import json
45
import logging
6+
from packaging.version import parse as parse_version
57
import subprocess
68
import os
79
import sys
@@ -25,8 +27,11 @@ def get_today() -> str:
2527
return today
2628

2729

28-
def run_command_via_subprocess(cmd: list, cwd: str, output_lines: list=[],
29-
print: bool=False) -> int:
30+
def run_command_via_subprocess(cmd: list,
31+
cwd: str,
32+
output_lines: list=[],
33+
print_to_log: bool=False
34+
) -> int:
3035
"""Wraps around subprocess.Popen() to run commands outside of Python. Prints
3136
output as it goes, returns the status code from the command.
3237
@@ -38,7 +43,7 @@ def run_command_via_subprocess(cmd: list, cwd: str, output_lines: list=[],
3843
Set the working directory, or to None.
3944
output_lines : list
4045
Pass in a list to return the output details.
41-
print : bool
46+
print_to_log : bool
4247
Default False. Set to true to also print to logger
4348
4449
Returns
@@ -58,12 +63,23 @@ def run_command_via_subprocess(cmd: list, cwd: str, output_lines: list=[],
5863
if output:
5964
ln = output.strip().decode('utf-8')
6065
output_lines.append(ln)
61-
if print:
66+
if print_to_log:
6267
logger.info(ln)
68+
69+
# Detects issue reported in https://github.com/rustprooflabs/pgosm-flex/issues/391
70+
# Status code is incorrectly returned is 0, cannot detect
71+
# problem using that method so forcing failure with custom
72+
# status code.
73+
if 'Error during diff download. Bailing out.' in ln:
74+
logger.error('Data in database is too far behind replication service.')
75+
return 999
76+
6377
else:
6478
# Only sleep when there wasn't output
6579
sleep(1)
80+
6681
status = process.poll()
82+
6783
return status
6884

6985

@@ -223,3 +239,172 @@ def unset_env_vars():
223239
os.environ.pop('PGOSM_CONN', None)
224240
os.environ.pop('PGOSM_CONN_PG', None)
225241
os.environ.pop('SCHEMA_NAME', None)
242+
243+
244+
class ImportMode():
245+
"""Determines logical variables used to control program flow.
246+
247+
WARNING: The values for `append_first_run` and `replication_update`
248+
are used to determine when to drop the local DB. Be careful with any
249+
changes to these values.
250+
"""
251+
def __init__(self, replication: bool, replication_update: bool,
252+
update: str, force: bool):
253+
"""Computes two variables, slim_no_drop and append_first_run
254+
based on inputs.
255+
256+
Parameters
257+
--------------------------
258+
replication : bool
259+
replication_update : bool
260+
update : str or None
261+
Valid options are 'create' or 'append', lining up with osm2pgsql's
262+
`--create` and `--append` modes.
263+
force : bool
264+
"""
265+
self.logger = logging.getLogger('pgosm-flex')
266+
self.replication = replication
267+
self.replication_update = replication_update
268+
269+
# The input via click should enforce this, still worth checking here
270+
valid_update_options = ['append', 'create', None]
271+
272+
if update not in valid_update_options:
273+
raise ValueError(f'Invalid option for --update. Valid options: {valid_update_options}')
274+
275+
self.update = update
276+
self.force = force
277+
278+
self.set_slim_no_drop()
279+
self.set_append_first_run()
280+
self.set_run_post_sql()
281+
282+
283+
def okay_to_run(self, prior_import: dict) -> bool:
284+
"""Determines if it is okay to run PgOSM Flex without fear of data loss.
285+
286+
This logic was along with the `--force` option to make it
287+
less likely to accidentally lose data with improper PgOSM Flex
288+
options.
289+
290+
Remember, this is free and open source software and there is
291+
no warranty!
292+
This does not imply a guarantee that you **cannot** lose data,
293+
only that we want to make it **less likely** something bad will happen.
294+
If you find a way bad things can happen that could be detected here,
295+
please open an issue:
296+
297+
https://github.com/rustprooflabs/pgosm-flex/issues/new?assignees=&labels=&projects=&template=bug_report.md&title=Data%20Safety%20Idea
298+
299+
Parameters
300+
-------------------
301+
prior_import : dict
302+
Details about the latest import from osm.pgosm_flex table.
303+
304+
An empty dictionary (len==0) indicates no prior import.
305+
Only the replication key is specifically used
306+
307+
Returns
308+
-------------------
309+
okay_to_run : bool
310+
"""
311+
self.logger.debug(f'Checking if it is okay to run...')
312+
if self.force:
313+
self.logger.warn(f'Using --force, kiss existing data goodbye')
314+
return True
315+
316+
# If no prior imports, do not require force
317+
if len(prior_import) == 0:
318+
self.logger.debug(f'No prior import found, okay to proceed.')
319+
return True
320+
321+
prior_replication = prior_import['replication']
322+
323+
# Check git version against latest.
324+
# If current version is lower than prior version from latest import, stop.
325+
prior_import_version = prior_import['pgosm_flex_version_no_hash']
326+
git_tag = get_git_info(tag_only=True)
327+
328+
if git_tag == '-- (version unknown) --':
329+
msg = 'Unable to detect PgOSM Flex version from Git.'
330+
msg += ' Not enforcing version check against prior version.'
331+
self.logger.warning(msg)
332+
elif parse_version(git_tag) < parse_version(prior_import_version):
333+
msg = f'PgOSM Flex version ({git_tag}) is lower than latest import'
334+
msg += f' tracked in the pgosm_flex table ({prior_import_version}).'
335+
msg += f' Use PgOSM Flex version {prior_import_version} or newer'
336+
self.logger.error(msg)
337+
return False
338+
else:
339+
self.logger.info(f'Prior import used PgOSM Flex: {prior_import_version}')
340+
341+
if self.replication:
342+
if not prior_replication:
343+
self.logger.error('Running w/ replication but prior import did not. Requires --force to proceed.')
344+
return False
345+
self.logger.debug('Okay to proceed with replication')
346+
return True
347+
348+
msg = 'Prior data exists in the osm schema and --force was not used.'
349+
self.logger.error(msg)
350+
return False
351+
352+
def set_append_first_run(self):
353+
"""Uses `replication_update` and `update` to determine value for
354+
`self.append_first_run`
355+
"""
356+
if self.replication_update:
357+
self.append_first_run = False
358+
else:
359+
self.append_first_run = True
360+
361+
if self.update is not None:
362+
if self.update == 'create':
363+
self.append_first_run = True
364+
else:
365+
self.append_first_run = False
366+
367+
def set_slim_no_drop(self):
368+
"""Uses `replication` and `update` to determine value for
369+
`self.slim_no_drop`
370+
"""
371+
self.slim_no_drop = False
372+
373+
if self.replication:
374+
self.slim_no_drop = True
375+
376+
if self.update is not None:
377+
self.slim_no_drop = True
378+
379+
def set_run_post_sql(self):
380+
"""Uses `update` value to determine value for
381+
`self.run_post_sql`. This value determines if the post-processing SQL
382+
should be executed.
383+
384+
Note: Not checking replication/replication_update because subsequent
385+
imports use osm2pgsql-replication, which does not attempt to run
386+
the post-processing SQL scripts.
387+
"""
388+
self.run_post_sql = True
389+
390+
if self.update is not None:
391+
if self.update == 'append':
392+
self.run_post_sql = False
393+
394+
def as_json(self) -> str:
395+
"""Packs key details as a dictionary passed through `json.dumps()`
396+
397+
Returns
398+
------------------------
399+
json_text : str
400+
Text representation of JSON object built using class attributes.
401+
"""
402+
self_as_dict = {'update': self.update,
403+
'replication': self.replication,
404+
'replication_update': self.replication_update,
405+
'append_first_run': self.append_first_run,
406+
'slim_no_drop': self.slim_no_drop,
407+
'run_post_sql': self.run_post_sql}
408+
return json.dumps(self_as_dict)
409+
410+

0 commit comments

Comments
 (0)