import logging
import pickle
from collections import defaultdict, deque
from functools import lru_cache
from pathlib import Path
from typing import List, Mapping
import jsonref
from flatten_dict import flatten
from spoonbill.common import ARRAY, JOINABLE, JOINABLE_SEPARATOR, PREVIEW_ROWS, SEPARATOR, TABLE_THRESHOLD
from spoonbill.i18n import LOCALE, _
from spoonbill.rowdata import Rows
from spoonbill.spec import Table, add_child_table
from spoonbill.utils import (
PYTHON_TO_JSON_TYPE,
RepeatFilter,
add_paths_to_schema,
common_prefix,
extract_type,
generate_table_name,
get_matching_tables,
resolve_file_uri,
validate_type,
)
LOGGER = logging.getLogger("spoonbill")
LOGGER.addFilter(RepeatFilter())
[docs]
class DataPreprocessor:
"""
Data analyzer
Processes the given schema and, based on this, extracts information from the iterable dataset.
:param schema: The dataset's schema
:param root_tables: The paths which should become root tables
:param combined_tables: The paths which should become tables that combine data from different locations
:param tables: Use these tables objects instead of parsing the schema
:param table_threshold: The maximum array length, before it is recommended to split out a child table
:param total_items: The total objects processed
:param language: Language to use for the human-readable headings
"""
def __init__(
self,
schema: Mapping,
root_tables: Mapping[str, List],
combined_tables: Mapping[str, List] = None,
tables: Mapping[str, Table] = None,
table_threshold=TABLE_THRESHOLD,
total_items=0,
language=LOCALE,
multiple_values=False,
pkg_type=None,
with_preview=True,
):
self.schema = schema
self.root_tables = root_tables
self.combined_tables = combined_tables or {}
self.tables = tables or {}
self.table_threshold = table_threshold
self.multiple_values = multiple_values
self.total_items = total_items
self.current_table = None
self.language = language
self.names_counter = defaultdict(int)
self.with_preview = with_preview
if not self.tables:
self.parse_schema()
self.pkg_type = pkg_type
def __getitem__(self, table):
return self.tables[table]
[docs]
def name_check(self, parent_key, key):
table_name = generate_table_name(self.current_table.name, parent_key, key)
self.names_counter[table_name] += 1
if self.names_counter[table_name] > 1:
key = key[:4] + str(self.names_counter[table_name] - 1)
return key
[docs]
def guess_type(self, item):
if isinstance(item, list) and item and not isinstance(item[0], (dict, list)):
return JOINABLE
return [PYTHON_TO_JSON_TYPE.get(type(item).__name__)]
[docs]
def init_tables(self, tables, is_combined=False):
"""
Initialize the root tables with default fields.
"""
for name, path in tables.items():
table = Table(name, path, is_root=True, is_combined=is_combined, parent="")
self.tables[name] = table
[docs]
def is_base_table(self):
return self.current_table.is_root or self.current_table.is_combined
[docs]
def load_schema(
self,
):
""""""
# if isinstance(self.schema, (str, Path)):
# self.schema = resolve_file_uri(self.schema)
# if not isinstance(self.schema, jsonref.JsonRef):
# self.schema = jsonref.JsonRef.replace_refs(self.schema)
if isinstance(self.schema, (str, Path)):
self.schema = resolve_file_uri(self.schema)
self.init_tables(self.root_tables)
if not isinstance(self.schema, jsonref.JsonRef):
self.schema = jsonref.JsonRef.replace_refs(self.schema)
if self.combined_tables:
self.init_tables(self.combined_tables, is_combined=True)
[docs]
def prepare_tables(self):
self.init_tables(self.root_tables)
if self.combined_tables:
self.init_tables(self.combined_tables, is_combined=True)
[docs]
def parse_schema(self):
"""
Extract information from the schema.
"""
self.load_schema()
# self.prepare_tables()
proxy = add_paths_to_schema(self.schema)
to_analyze = deque([("", "", {}, proxy)])
# TODO: check if recursion is better for field ordering
while to_analyze:
path, parent_key, parent, prop = to_analyze.pop()
if prop.get("deprecated"):
continue
# TODO: handle oneOf anyOf allOf
properties = prop.get("properties", {})
if properties:
for key, item in properties.items():
if key in ("$title", "$path"):
continue
if item.get("deprecated"):
continue
if hasattr(item, "__reference__") and item.__reference__.get("deprecated"):
continue
typeset = extract_type(item)
pointer = self.join_path(path, key)
self.current_table = self.get_table(pointer)
if not self.current_table:
continue
self.current_table.types[pointer] = typeset
if "object" in typeset:
to_analyze.append((pointer, key, properties, item))
elif "array" in typeset:
items = item["items"]
items_type = extract_type(items)
if set(items_type) & {"array", "object"}:
if pointer not in self.current_table.path:
# found child array, need to create child table
key = self.name_check(parent_key, key)
self._add_table(add_child_table(self.current_table, pointer, parent_key, key), pointer)
to_analyze.append((pointer, key, properties, items))
else:
# This means we in array of strings, so this becomes a single joinable column
typeset = ARRAY.format(items_type)
self.current_table.types[pointer] = JOINABLE
self.current_table.add_column(pointer, typeset, pointer, header=item["$title"])
else:
if self.current_table.is_combined:
pointer = SEPARATOR + self.join_path(parent_key, key)
self.current_table.add_column(pointer, typeset, pointer, header=item["$title"])
else:
# TODO: not sure what to do here
continue
[docs]
def add_column(self, pointer, typeset):
self.current_table.add_column(pointer, typeset, pointer)
def _add_table(self, table, pointer):
self.tables[table.name] = table
self.current_table = table
self.get_table.cache_clear()
[docs]
def add_additional_table(self, pointer, abs_pointer, parent_key, key, item):
LOGGER.debug(_("Detected additional table: %s") % pointer)
self.current_table.types[pointer] = ["array"]
self._add_table(add_child_table(self.current_table, pointer, parent_key, key), pointer)
# add columns beforehand because it might be required
# to recalculate and reorder headers when enlarging array
# there must be a better way but it should work for now
for extended_item in item:
for path_, it in flatten(extended_item, reducer="path").items():
ppointer = self.join_path(pointer, path_)
if ppointer not in self.current_table:
self.current_table.add_column(
ppointer,
self.guess_type(it),
ppointer,
abs_path=self.join_path(abs_pointer, path_),
header=ppointer,
)
[docs]
@lru_cache(maxsize=None)
def get_table(self, path):
"""
Get the table that best matches the given path.
:param path: A path
:return: A table
"""
candidates = get_matching_tables(self.tables, path)
if not candidates:
return
return candidates[0]
[docs]
def add_preview_row(self, rows, item_id, parent_key):
"""
Append a mostly-empty row to the previews.
This is important to do, because other code uses an index of -1 to access and update the current row.
:param rows: The Rows object
:param item_id: Object id
"""
table = self.current_table
if self.with_preview and table.total_rows < PREVIEW_ROWS:
for p_rows in table.preview_rows, table.preview_rows_combined:
row = rows.new_row(table, item_id).as_dict()
p_rows.append(row)
[docs]
def inc_table_rows(self, item, rows, parent_key, record):
c = item if isinstance(item, list) else [item]
for _nop in c:
self.current_table.inc()
self.add_preview_row(rows, record.get("id", ""), parent_key)
[docs]
def is_new_row(self, pointer):
# strict match like /parties, /tender
return pointer in self.current_table.path and pointer != "/buyer"
[docs]
def join_path(self, *args):
return SEPARATOR.join(args)
[docs]
def get_paths_for_combined_table(self, parent_key, key):
pointer = SEPARATOR + self.join_path(parent_key, key)
return (pointer, pointer)
[docs]
def is_type_matched(self, pointer, item, item_type):
# TODO: this validation should probably be smarter with arrays
if item_type and item_type != JOINABLE and not validate_type(item_type, item):
LOGGER.error("Mismatched type on %s expected %s", pointer, item_type)
return False
return True
[docs]
def add_joinable_column(self, abs_pointer, pointer):
LOGGER.debug(_("Detected additional column: %s in %s table") % (abs_pointer, self.current_table.name))
self.current_table.types[pointer] = JOINABLE
self.current_table.add_column(
pointer, JOINABLE, pointer, additional=True, abs_path=abs_pointer, header=pointer
)
[docs]
def handle_array_expanded(self, pointer, item, abs_path, key):
splitted = len(item) >= self.table_threshold
if splitted:
self.current_table.split(pointer)
[docs]
def is_array_col(self, abs_path):
chunks = abs_path.split(SEPARATOR)
path = self.join_path(*[p for p in chunks if not p.isdigit()])
return path in self.current_table
[docs]
def clean_up_missing_arrays(self):
def drop(col):
is_array = table.is_array(col.id)
return is_array and col.hits == 0
for table in self.tables.values():
table.filter_columns(drop)
[docs]
def process_items(self, releases, with_preview=True):
"""
Analyze releases.
Iterates over every release to calculate metrics and optionally generates previews for combined and split
versions of each table.
:param releases: The releases to analyze
:param with_preview: Whether to generate previews for each table
"""
count = 0
for count, release in enumerate(releases):
to_analyze = deque([("", "", "", {}, release)])
rows = Rows(ocid=release["ocid"], buyer=release.get("buyer", {}), data=defaultdict(list))
while to_analyze:
abs_path, path, parent_key, parent, record = to_analyze.popleft()
if hasattr(record, "items"):
for key, item in record.items():
pointer = self.join_path(path, key)
self.current_table = self.get_table(pointer)
if not self.current_table:
continue
if self.is_new_row(pointer):
self.inc_table_rows(item, rows, parent_key, record)
self.extend_table_types(pointer, item)
item_type = self.current_table.types.get(pointer)
if not self.is_type_matched(pointer, item, item_type):
continue
if isinstance(item, dict):
to_analyze.append(
(
self.join_path(abs_path, key),
pointer,
key,
record,
item,
)
)
elif item and isinstance(item, list):
abs_pointer = self.join_path(abs_path, key)
if not isinstance(item[0], dict) and not item_type:
item_type = JOINABLE
self.add_joinable_column(abs_pointer, pointer)
if item_type == JOINABLE:
if pointer not in self.current_table:
self.add_joinable_column(abs_pointer, pointer)
self.current_table.inc_column(abs_pointer, pointer)
if self.with_preview and count < PREVIEW_ROWS:
value = JOINABLE_SEPARATOR.join([str(i) for i in item])
self.current_table.set_preview_path(
abs_pointer, pointer, value, self.table_threshold
)
elif self.is_base_table():
for value in item:
to_analyze.append(
(
abs_pointer,
pointer,
key,
record,
value,
)
)
else:
parent_table = self.current_table.parent
if pointer not in parent_table.arrays:
LOGGER.debug(_("Detected additional table: %s") % pointer)
self.current_table.types[pointer] = ["array"]
parent_table = self.current_table
self.add_additional_table(pointer, abs_pointer, parent_key, key, item)
self.add_preview_row(rows, record.get("id", ""), parent_key)
if parent_table.set_array(pointer, item):
self.handle_array_expanded(pointer, item, abs_path, key)
for i, value in enumerate(item):
if isinstance(value, dict):
abs_pointer = self.join_path(abs_path, key, str(i))
to_analyze.append(
(
abs_pointer,
pointer,
parent_key,
record,
value,
)
)
else:
abs_pointer = self.join_path(abs_path, key)
if self.current_table.is_combined:
pointer, abs_pointer = self.get_paths_for_combined_table(parent_key, key)
col = self.current_table.columns.get(pointer)
if col:
if abs_pointer not in self.current_table:
parent = self.current_table.parent
parent.add_array_column(col, pointer, abs_pointer, max=self.table_threshold)
else:
self.current_table.add_column(
pointer,
self.guess_type(item),
pointer,
additional=True,
abs_path=abs_pointer,
)
self.current_table.inc_column(abs_pointer, pointer)
if item and self.with_preview and count < PREVIEW_ROWS:
if not pointer.startswith("/buyer"):
self.current_table.set_preview_path(
abs_pointer, pointer, item, self.table_threshold
)
yield count
self.clean_up_missing_arrays()
self.total_items = count
[docs]
def dump(self, path):
"""
Dump the data processor's state to a file.
:param path: Full path to file
"""
try:
with open(path, "wb") as fd:
pickle.dump(self, fd)
except OSError as e:
LOGGER.error(_("Failed to dump DataPreprocessor to file. Error: {}").format(e))
[docs]
@classmethod
def restore(_cls, path):
"""
Restore a data preprocessor's state from a file.
:param path: Full path to file
"""
try:
with open(path, "rb") as fd:
return pickle.load(fd)
except (TypeError, pickle.UnpicklingError):
LOGGER.error(_("Invalid pickle file. Can't restore."))
[docs]
def extend_table_types(self, pointer, item):
"""
Check if path belong to table and expand its types
:param pointer: Path to an item
:param item: Item being analyzed
"""
table = self.current_table
if pointer not in table.types and pointer not in table.path:
if any(common_prefix(pointer, path) for path in table.path):
self.current_table.types[pointer] = self.guess_type(item)