Source code for spoonbill.flatten

import logging
from collections import defaultdict, deque
from dataclasses import dataclass, field, is_dataclass, replace
from typing import List, Mapping

from spoonbill.common import DEFAULT_FIELDS, JOINABLE, JOINABLE_SEPARATOR, SEPARATOR
from spoonbill.i18n import LOCALE, _
from spoonbill.rowdata import Rows
from spoonbill.spec import Table
from spoonbill.utils import get_pointer, get_root, make_count_column

LOGGER = logging.getLogger("spoonbill")


[docs] @dataclass class TableFlattenConfig: """Table specific flattening configuration :param split: Split child arrays to separate tables :param pretty_headers: Use human friendly headers extracted from schema :param headers: User edited headers to override automatically extracted :param unnest: List of columns to output from child to parent table :param repeat: List of columns to clone in child tables :param only: List of columns to output :param name: Overwrite table name """ split: bool pretty_headers: bool = False headers: Mapping[str, str] = field(default_factory=dict) repeat: List[str] = field(default_factory=list) unnest: List[str] = field(default_factory=list) only: List[str] = field(default_factory=list) name: str = ""
[docs] @dataclass class FlattenOptions: """Flattening configuration :param selection: List of selected tables to extract from data :param count: Include number of rows in child table in each parent table :param exclude: List of tables to exclude from export """ selection: Mapping[str, TableFlattenConfig] exclude: List[str] = field(default_factory=list) count: bool = False def __post_init__(self): for name, table in self.selection.items(): if not is_dataclass(table): self.selection[name] = TableFlattenConfig(**table)
[docs] class Flattener: """Data flattener In order to export data correctly Flattener requires previously analyzed tables data. During the process flattener could add columns not based on schema analysis, such as `itemsCount`. In every generated row, depending on table type, flattener will always few add autogenerated columns. For root table: * rowID * id * ocid For child tables this list well be extended with `parentID` column. :param options: Flattening options :param tables: Analyzed tables data :param language: Language to use for the human-readable headings """ def __init__(self, options: FlattenOptions, tables: Mapping[str, Table], language=LOCALE): if not is_dataclass(options): options = FlattenOptions(**options) self.options = options self.tables = tables self.language = language self._lookup_map = {} self._types_map = {} self._path_map = {} self.init_table_selection(tables) self.init_options(self.tables)
[docs] def init_table_selection(self, tables): self.tables = {} for name, table in tables.items(): if name not in self.options.selection: continue options = self.options.selection[name] if options.only: self.init_only(table, options.only, options.split) self.init_table_lookup(self.tables, table) self.init_child_tables(tables, table, options)
[docs] def init_child_tables(self, tables, table, options): for c_name in table.child_tables: if c_name in self.options.exclude: continue c_table = tables[c_name] if c_table.rolled_up or options.split: if c_table.name not in self.options.selection: self.options.selection[c_table.name] = replace(options) self.init_table_lookup(self.tables, c_table) else: # use parent table target = table while target.parent and not target.rolled_up: target = target.parent self.init_table_lookup(self.tables, target) self.init_child_tables(tables, c_table, options)
[docs] def init_map(self, map, paths, table, only=None, target=None): if not target: target = table for path in paths: if path not in DEFAULT_FIELDS: if not only or (only and path in only): map[path] = target
def _map_path(self, table, target=None): self.init_map(self._path_map, table.path, table, target=target) def _map_types(self, table, target=None): self.init_map(self._types_map, table.types, table, target=target) def _map_cols(self, table, split, target=None): cols = table if split else table.combined_columns self.init_map(self._lookup_map, cols, table, target=target)
[docs] def init_table_lookup(self, tables, table, target=None): if table.total_rows == 0: return name = table.name tables[name] = table self._map_path(table, target=target) self._map_types(table, target=target) self._map_cols(table, self.options.selection[name].split, target=target)
[docs] def init_count(self, table, options): if not table.splitted: return for array in table.arrays: path = make_count_column(array) # add count columns only if table is rolled up # in other way it could be frustrating # e.g. it may generate columns for whole array like: # /tender/items/200/additionalClassificationsCount table.add_column(path, "integer", path, additional=True, propagated=False, header=path) table.inc_column(path, path)
[docs] def init_unnest(self, table, options): for col_id in options.unnest: col = table.combined_columns.get(col_id) if col: table.columns[col_id] = col
[docs] def init_repeat(self, table, options): for col_id in options.repeat: columns = table.columns if options.split else table.combined_columns col = columns.get(col_id) if not col: LOGGER.warning(_("Ignoring repeat column {} because it is not in table {}").format(col_id, table.name)) continue for c_name in table.child_tables: child_table = self.tables.get(c_name) if child_table: child_table.columns[col_id] = col child_table.combined_columns[col_id] = col child_table.titles[col_id] = col_id
[docs] def init_options(self, tables): for table in tables.values(): options = self.options.selection[table.name] if self.options.count: self.init_count(table, options) if options.unnest: self.init_unnest(table, options) if options.repeat: self.init_repeat(table, options)
[docs] def init_only(self, table, only, split): paths = {c_id: c for c_id, c in table.types.items() if c_id not in table.combined_columns} table.filter_columns(lambda col: col.id not in only) paths.update({c_id: c.type for c_id, c in table.columns.items()}) table.types = paths
[docs] def get_table(self, pointer): return self._lookup_map.get(pointer) or self._types_map.get(pointer)
[docs] def flatten(self, releases): """Flatten releases :param releases: releases as iterable object :return: Iterator over mapping between table name and list of rows for each release """ for counter, release in enumerate(releases): to_flatten = deque([("", "", "", {}, release, {})]) rows = Rows(ocid=release["ocid"], buyer=release.get("buyer", {}), data=defaultdict(list)) while to_flatten: abs_path, path, parent_key, parent, record, repeat = to_flatten.pop() table = self._path_map.get(path) if path == "/buyer": # only useful in analysis continue if table: # Strict match /tender /parties etc., so this is a new row row = rows.new_row(table, record.get("id", "")) only = self.options.selection[table.name].only if only: row = {col: col_v for col, col_v in row.items() if col in only} if table.is_root: repeat = {} if repeat: row.update(repeat) rows.data[table.name].append(row) for key, item in record.items(): pointer = SEPARATOR.join((path, key)) abs_pointer = SEPARATOR.join((abs_path, key)) table = self.get_table(pointer) if not table: continue item_type = table.types.get(pointer) options = self.options.selection[table.name] split = options.split if pointer in options.repeat: repeat[pointer] = item if isinstance(item, dict): to_flatten.append((abs_pointer, pointer, key, record, item, repeat)) elif isinstance(item, list): if item_type == JOINABLE: value = JOINABLE_SEPARATOR.join(str(i) for i in item) rows.data[table.name][-1][pointer] = value else: if self.options.count and table.splitted: abs_pointer = get_pointer( table, abs_pointer, pointer, split, ) abs_pointer += "Count" if abs_pointer in table: rows.data[table.name][-1][abs_pointer] = len(item) for index, value in enumerate(item): if isinstance(value, dict): abs_pointer = get_pointer( table, SEPARATOR.join((abs_path, key)), pointer, split, index=str(index), ) to_flatten.append( ( abs_pointer, pointer, key, record, value, repeat, ) ) else: if table.is_combined: pointer = SEPARATOR + SEPARATOR.join((parent_key, key)) abs_pointer = pointer if not table.is_root: root = get_root(table) unnest = self.options.selection[root.name].unnest if unnest and abs_pointer in unnest: rows.data[root.name][-1][abs_pointer] = item continue pointer = get_pointer(table, abs_pointer, pointer, split) if pointer in table.combined_columns: rows.data[table.name][-1][pointer] = item yield counter, rows