import logging
from collections import defaultdict, deque
from dataclasses import dataclass, field, is_dataclass, replace
from typing import List, Mapping
from spoonbill.common import DEFAULT_FIELDS, JOINABLE, JOINABLE_SEPARATOR, SEPARATOR
from spoonbill.i18n import LOCALE, _
from spoonbill.rowdata import Rows
from spoonbill.spec import Table
from spoonbill.utils import get_pointer, get_root, make_count_column
LOGGER = logging.getLogger("spoonbill")
[docs]
@dataclass
class TableFlattenConfig:
"""Table specific flattening configuration
:param split: Split child arrays to separate tables
:param pretty_headers: Use human friendly headers extracted from schema
:param headers: User edited headers to override automatically extracted
:param unnest: List of columns to output from child to parent table
:param repeat: List of columns to clone in child tables
:param only: List of columns to output
:param name: Overwrite table name
"""
split: bool
pretty_headers: bool = False
headers: Mapping[str, str] = field(default_factory=dict)
repeat: List[str] = field(default_factory=list)
unnest: List[str] = field(default_factory=list)
only: List[str] = field(default_factory=list)
name: str = ""
[docs]
@dataclass
class FlattenOptions:
"""Flattening configuration
:param selection: List of selected tables to extract from data
:param count: Include number of rows in child table in each parent table
:param exclude: List of tables to exclude from export
"""
selection: Mapping[str, TableFlattenConfig]
exclude: List[str] = field(default_factory=list)
count: bool = False
def __post_init__(self):
for name, table in self.selection.items():
if not is_dataclass(table):
self.selection[name] = TableFlattenConfig(**table)
[docs]
class Flattener:
"""Data flattener
In order to export data correctly Flattener requires previously analyzed tables data.
During the process flattener could add columns not based on schema analysis, such as
`itemsCount`.
In every generated row, depending on table type, flattener will always few add autogenerated columns.
For root table:
* rowID
* id
* ocid
For child tables this list well be extended with `parentID` column.
:param options: Flattening options
:param tables: Analyzed tables data
:param language: Language to use for the human-readable headings
"""
def __init__(self, options: FlattenOptions, tables: Mapping[str, Table], language=LOCALE):
if not is_dataclass(options):
options = FlattenOptions(**options)
self.options = options
self.tables = tables
self.language = language
self._lookup_map = {}
self._types_map = {}
self._path_map = {}
self.init_table_selection(tables)
self.init_options(self.tables)
[docs]
def init_table_selection(self, tables):
self.tables = {}
for name, table in tables.items():
if name not in self.options.selection:
continue
options = self.options.selection[name]
if options.only:
self.init_only(table, options.only, options.split)
self.init_table_lookup(self.tables, table)
self.init_child_tables(tables, table, options)
[docs]
def init_child_tables(self, tables, table, options):
for c_name in table.child_tables:
if c_name in self.options.exclude:
continue
c_table = tables[c_name]
if c_table.rolled_up or options.split:
if c_table.name not in self.options.selection:
self.options.selection[c_table.name] = replace(options)
self.init_table_lookup(self.tables, c_table)
else:
# use parent table
target = table
while target.parent and not target.rolled_up:
target = target.parent
self.init_table_lookup(self.tables, target)
self.init_child_tables(tables, c_table, options)
[docs]
def init_map(self, map, paths, table, only=None, target=None):
if not target:
target = table
for path in paths:
if path not in DEFAULT_FIELDS:
if not only or (only and path in only):
map[path] = target
def _map_path(self, table, target=None):
self.init_map(self._path_map, table.path, table, target=target)
def _map_types(self, table, target=None):
self.init_map(self._types_map, table.types, table, target=target)
def _map_cols(self, table, split, target=None):
cols = table if split else table.combined_columns
self.init_map(self._lookup_map, cols, table, target=target)
[docs]
def init_table_lookup(self, tables, table, target=None):
if table.total_rows == 0:
return
name = table.name
tables[name] = table
self._map_path(table, target=target)
self._map_types(table, target=target)
self._map_cols(table, self.options.selection[name].split, target=target)
[docs]
def init_count(self, table, options):
if not table.splitted:
return
for array in table.arrays:
path = make_count_column(array)
# add count columns only if table is rolled up
# in other way it could be frustrating
# e.g. it may generate columns for whole array like:
# /tender/items/200/additionalClassificationsCount
table.add_column(path, "integer", path, additional=True, propagated=False, header=path)
table.inc_column(path, path)
[docs]
def init_unnest(self, table, options):
for col_id in options.unnest:
col = table.combined_columns.get(col_id)
if col:
table.columns[col_id] = col
[docs]
def init_repeat(self, table, options):
for col_id in options.repeat:
columns = table.columns if options.split else table.combined_columns
col = columns.get(col_id)
if not col:
LOGGER.warning(_("Ignoring repeat column {} because it is not in table {}").format(col_id, table.name))
continue
for c_name in table.child_tables:
child_table = self.tables.get(c_name)
if child_table:
child_table.columns[col_id] = col
child_table.combined_columns[col_id] = col
child_table.titles[col_id] = col_id
[docs]
def init_options(self, tables):
for table in tables.values():
options = self.options.selection[table.name]
if self.options.count:
self.init_count(table, options)
if options.unnest:
self.init_unnest(table, options)
if options.repeat:
self.init_repeat(table, options)
[docs]
def init_only(self, table, only, split):
paths = {c_id: c for c_id, c in table.types.items() if c_id not in table.combined_columns}
table.filter_columns(lambda col: col.id not in only)
paths.update({c_id: c.type for c_id, c in table.columns.items()})
table.types = paths
[docs]
def get_table(self, pointer):
return self._lookup_map.get(pointer) or self._types_map.get(pointer)
[docs]
def flatten(self, releases):
"""Flatten releases
:param releases: releases as iterable object
:return: Iterator over mapping between table name and list of rows for each release
"""
for counter, release in enumerate(releases):
to_flatten = deque([("", "", "", {}, release, {})])
rows = Rows(ocid=release["ocid"], buyer=release.get("buyer", {}), data=defaultdict(list))
while to_flatten:
abs_path, path, parent_key, parent, record, repeat = to_flatten.pop()
table = self._path_map.get(path)
if path == "/buyer":
# only useful in analysis
continue
if table:
# Strict match /tender /parties etc., so this is a new row
row = rows.new_row(table, record.get("id", ""))
only = self.options.selection[table.name].only
if only:
row = {col: col_v for col, col_v in row.items() if col in only}
if table.is_root:
repeat = {}
if repeat:
row.update(repeat)
rows.data[table.name].append(row)
for key, item in record.items():
pointer = SEPARATOR.join((path, key))
abs_pointer = SEPARATOR.join((abs_path, key))
table = self.get_table(pointer)
if not table:
continue
item_type = table.types.get(pointer)
options = self.options.selection[table.name]
split = options.split
if pointer in options.repeat:
repeat[pointer] = item
if isinstance(item, dict):
to_flatten.append((abs_pointer, pointer, key, record, item, repeat))
elif isinstance(item, list):
if item_type == JOINABLE:
value = JOINABLE_SEPARATOR.join(str(i) for i in item)
rows.data[table.name][-1][pointer] = value
else:
if self.options.count and table.splitted:
abs_pointer = get_pointer(
table,
abs_pointer,
pointer,
split,
)
abs_pointer += "Count"
if abs_pointer in table:
rows.data[table.name][-1][abs_pointer] = len(item)
for index, value in enumerate(item):
if isinstance(value, dict):
abs_pointer = get_pointer(
table,
SEPARATOR.join((abs_path, key)),
pointer,
split,
index=str(index),
)
to_flatten.append(
(
abs_pointer,
pointer,
key,
record,
value,
repeat,
)
)
else:
if table.is_combined:
pointer = SEPARATOR + SEPARATOR.join((parent_key, key))
abs_pointer = pointer
if not table.is_root:
root = get_root(table)
unnest = self.options.selection[root.name].unnest
if unnest and abs_pointer in unnest:
rows.data[root.name][-1][abs_pointer] = item
continue
pointer = get_pointer(table, abs_pointer, pointer, split)
if pointer in table.combined_columns:
rows.data[table.name][-1][pointer] = item
yield counter, rows