Source code for spoonbill.spec

import logging
from dataclasses import dataclass, field, is_dataclass, replace
from typing import List, Mapping, Sequence

from spoonbill.common import DEFAULT_FIELDS, DEFAULT_FIELDS_COMBINED
from spoonbill.i18n import _
from spoonbill.utils import (
    combine_path,
    common_prefix,
    generate_table_name,
    get_path_for_array_col,
    get_pointer,
    insert_after_key,
)

LOGGER = logging.getLogger("spoonbill")


[docs] @dataclass class Column: """ A container for column information. :param id: The JSON path without indexes :param path: The JSON path with indexes :param title: The human-friendly title :param type: The expected type :param hits: The number of times the column contains data during analysis """ id: str path: str title: str type: str hits: int = 0 header: list = field(default_factory=list)
[docs] @dataclass class Table: """ A container for table information. :param name: Table name :param path: List of paths to gather data to this table :param total_rows: Total available rows in this table :param parent: Parent table, None if this table is root table :param is_root: This table is root table :param is_combined: This table contains data collected from different paths :param splitted: This table should be splitted :param rolled_up: This table should be ated from its parent :param columns: Columns extracted from schema for split version of this table :param combined_columns: Columns extracted from schema for unsplit version of this table :param additional_columns: Columns identified in dataset but not in schema :param arrays: Table array columns and maximum items (not the total count) in each array :param titles: All human-friendly column titles, extracted from the schema :param child_tables: List of possible child tables :param types: All paths matched to this table with corresponding object type on each path :param preview_rows: Generated preview for split version of this table :param preview_rows_combined: Generated preview for unsplit version of this table """ name: str path: List[str] total_rows: int = 0 # `parent` is a Table object, but dataclasses don't play well with recursion. parent: object = field(default_factory=dict) is_root: bool = False is_combined: bool = False splitted: bool = False rolled_up: bool = False columns: Mapping[str, Column] = field(default_factory=dict) combined_columns: Mapping[str, Column] = field(default_factory=dict) additional_columns: Mapping[str, Column] = field(default_factory=dict) arrays: Mapping[str, int] = field(default_factory=dict) titles: Mapping[str, str] = field(default_factory=dict) child_tables: List[str] = field(default_factory=list) types: Mapping[str, List[str]] = field(default_factory=dict) array_columns: Mapping[str, Column] = field(default_factory=dict) array_positions: Mapping[str, str] = field(default_factory=dict) preview_rows: Sequence[dict] = field(default_factory=list) preview_rows_combined: Sequence[dict] = field(default_factory=list) def __post_init__(self): for attr in ( "columns", "combined_columns", "additional_columns", ): obj = getattr(self, attr, {}) if obj: init = {} for name, col in obj.items(): if not is_dataclass(col): col = Column(**col) init[name] = col setattr(self, attr, init) cols = DEFAULT_FIELDS_COMBINED if self.is_root and not self.is_combined: cols = DEFAULT_FIELDS for col in cols: column = Column(col, col, "string", col) if col not in self.columns: self.columns[col] = column if col not in self.combined_columns: self.combined_columns[col] = column self.titles[col] = col def _counter(self, split, cond): cols = self.columns if split else self.combined_columns return [header for header, col in cols.items() if cond(col)]
[docs] def missing_rows(self, split=True): """ Return the columns that are available in the schema, but not present in the analyzed data. """ return self._counter(split, lambda c: c.hits == 0)
[docs] def available_rows(self, split=True): """ Return the columns that are available in the analyzed data. """ return self._counter(split, lambda c: c.hits > 0)
[docs] def filter_columns(self, filter): self.columns = {col_id: col for col_id, col in self.columns.items() if not filter(col)} self.combined_columns = {col_id: col for col_id, col in self.combined_columns.items() if not filter(col)}
def __iter__(self): yield from self.columns def __getitem__(self, path): return self.columns.get(path)
[docs] def add_array_column(self, col, path, abs_path, max): array = self.is_array(path) col_path = get_path_for_array_col(abs_path, array) if self.arrays[array] > max: return if col_path not in self.combined_columns: col = replace(col, path=col_path) last_key = self.array_positions[array] self.array_positions[array] = col_path self.combined_columns = insert_after_key(self.combined_columns, {col_path: col}, last_key) if not self.is_root: self.parent.add_array_column(col, path, abs_path, max=max)
[docs] def add_column(self, path, item_type, title, *, propagated=False, additional=False, abs_path=None, header=[]): """ Add a new column to the table. :param path: The column's path :param item_type: The column's expected type :param title: Column title :param combined_only: Make this column available only in combined version of table :param propagated: Add column to parent table :param additional: Mark this column as missing in schema :param abs_path: The column's full JSON path """ combined_path = combine_path(self, path) col = Column(path, combined_path, title, item_type, header=header) array = self.is_array(path) if additional: if array: # when we analyzing file we need to keep index from data not to use 0 # e.g. /tender/items/166/relatedLot combined_path = abs_path col = replace(col, path=combined_path) LOGGER.debug(_("Detected additional column: %s in %s table") % (path, self.name)) self.additional_columns[combined_path] = col if not propagated: self.columns[combined_path] = col self.combined_columns[combined_path] = col if propagated: self.array_columns[combined_path] = col self.array_positions[array] = combined_path if not self.is_root: self.parent.add_column(path, item_type, title, propagated=True, header=header) for p in (path, combined_path): if path not in self.titles: self.titles[p] = header if path not in self.types: self.types[path] = item_type
[docs] def is_array(self, path): """ Check whether the given path is in any table's arrays. """ for array in sorted(self.arrays, reverse=True): if common_prefix(array, path) == array: return array return False
[docs] def inc_column(self, abs_path, path): """ Increment the number of non-empty cells in the column. :param abs_path: The column's full JSON path :param path: The column's JSON path without array indexes """ header = get_pointer(self, abs_path, path, True) if header in self.combined_columns: self.combined_columns[header].hits += 1 if not self.is_root: self.parent.inc_column(abs_path, path)
[docs] def add_array(self, header): self.arrays[header] = 0 if not self.is_root: self.parent.add_array(header)
[docs] def set_array(self, header, item): """ Try to set the maximum length of an array. :param header: The path to the array :param item: Array from data :return: Whether the array is bigger than previously found and the length was updated """ count = self.arrays.get(header, 0) length = len(item) if length > count: self.arrays[header] = length if not self.is_root: return self.parent.set_array(header, item) return True return False
[docs] def inc(self): """ Increment the number of rows in the table. """ self.total_rows += 1 for col_name in DEFAULT_FIELDS_COMBINED: self.inc_column(col_name, col_name)
[docs] def set_preview_path(self, abs_path, path, value, max_items): header = get_pointer(self, abs_path, path, True) array = self.is_array(path) self.preview_rows_combined[-1][header] = value if header in self.combined_columns: if not array or (array and self.arrays[array] < max_items): self.preview_rows[-1][header] = value if not self.is_root: self.parent.set_preview_path(abs_path, path, value, max_items)
[docs] def split(self, pointer): def drop(col): return parent.is_array(col.id) == pointer self.rolled_up = True parent = self.parent if parent: parent.splitted = True parent.filter_columns(drop) parent.split(pointer)
[docs] def add_child_table(table, pointer, parent_key, key): """ Create and append a new child table to the given table. :param table: The parent table to the newly created table :param pointer: Path to which table should match :param parent_key: New table parent object filed name, used to generate table name :param key: New table field name object filed name, used to generate table name :return: Child table """ table_name = generate_table_name(table.name, parent_key, key) child_table = Table(table_name, [pointer], parent=table) table.child_tables.append(table_name) table.add_array(pointer) return child_table