Source code for spoonbill.writers.base_writer

from collections import defaultdict

from spoonbill.utils import SchemaHeaderExtractor


[docs] class BaseWriter: """ Base writer class """
[docs] def __init__(self, workdir, tables, options, schema): """ :param workdir: Working directory :param tables: The table objects :param options: Flattening options """ self.workdir = workdir self.tables = tables self.options = options self.names = {} self.headers = {} self.names_counter = defaultdict(int) self.schema = schema self.schema_headers = SchemaHeaderExtractor(self.schema)
[docs] def get_headers(self, table, options): """ Return a table's headers, respecting the human and override options. :param table: A table object :param options: Flattening options :return: Mapping between the machine-readable headers and the output headers """ split = options.split and (table.rolled_up or table.splitted) # split = options.split and table.splitted if table.parent != "" and table.child_tables: split = True headers = {c: c for c in table.available_rows(split=split)} if ( table.parent != "" and table.parent.name in self.options.selection and self.options.selection[table.parent.name].pretty_headers ): self.options.selection[table.name].pretty_headers = True if options.pretty_headers: for c in headers: headers[c] = table.titles.get(c, c) for k, v in headers.items(): headers[k] = self.schema_headers.get_header(k, v) for col in headers.keys(): for char in col: if char.isnumeric() and char != "0": title_col = col.replace(char, "0") headers[col] = headers[title_col] if options.headers: for c, h in options.headers.items(): headers[c] = h return headers
def _name_check(self, table_name): self.names_counter[table_name] += 1 if self.names_counter[table_name] > 1: table_name += str(self.names_counter[table_name] - 1) return table_name
[docs] def init_sheet(self, name, table): """ Initialize a sheet, setting its headers and unique name. In this context, the sheet might be either a CSV file or a sheet in an Excel workbook. :param name: Table name :param table: Table object """ options = self.options.selection[name] self.headers[name] = self.get_headers(table, options) self.names[name] = self._name_check(options.name or name) return self.names[name], self.headers[name]