Source code for merge_streams

from datetime import datetime


[docs]class Merge: """ Live merging of csv files. The call of this object is a generator function which interleaves lines from a collection of files, ordered by a sort_column parameter. Assumes: (i) Individual files are ordered by ascending sort column values. (ii) Individual files have headers with one column named the same as <sort_column> parameter. (iii) Files to merge are in the same folder specified by <file_path> parameter>. The generator operates as follows: (i) Upon initialization, aligned lists of files, file names, file headers, and the first non-header line (split on delimiter with file-type index appended) of each file are constructed. (ii) When the Merge object is called the list of lines is sorted by time-stamp specified by <sort_column> and <date_format> parameters. (iii) The line (split on delimiter) with the earliest time stamp is returned along with the name of the file it came from (determined by appended event_type int). (iv) The line is replaced from the file it came from (determined by appended event_type int). (v) If there are no more lines left in the file then it is closed and list entries associated with this file are removed from lists (determined by appended event_type int). (vi) Concludes generating when all files are ended. """ def __init__(self, filepath='./', file_list=['short_t_toy_auth.txt', 'short_t_toy_proc.txt'], sort_column='time', date_format='int', delimiter=','): """ :param filepath: Path to folder with files to merge. :param file_list: List of names of files to merge. :param sort_column: Column to sort lines of files on for sequential ordering of log lines. :param date_format: Can be any format string which makes sense to datetime.strptime or 'int' for simple integer time stamps. :param delimiter: Delimiter of csv columns, e.g. ',', ' ' ... """ if not filepath.endswith('/'): filepath += '/' self.file_list = file_list self.filepath = filepath self.files = [open(filepath + f, 'r') for f in file_list] self._headers = [f.readline().strip().split(',') for f in self.files] self.sorters = [header.index(sort_column) for header in self._headers] self.event_types = range(len(self.files)) self.events = [f.readline().strip().split(delimiter) + [idx] for idx, f in enumerate(self.files)] self.event_lengths = [len(header) for header in self._headers] self.auth_index = 0 self.proc_index = 0 if date_format == 'int': self.sort_function = lambda x: int(x[self.sorters[x[-1]]]) else: self.sort_function = lambda x: datetime.strptime(x[self.sorters[x[-1]]], self.date_format) @property def headers(self): """ :return: A list of headers (split by delimiter) from files being merged """ return self._headers
[docs] def next_event(self, event_type): """ :param event_type: Integer associated with a file to read from. :return: Next event (line from file split on delimiter with type appended) from file associated with event_type. """ return self.files[event_type].readline().strip().split(',') + [event_type]
def __call__(self): """ :return: (tuple) filename, Next event (line from file split on delimiter) according to time stamp. """ while True: # try: if len(self.events) == 0: return elif len(self.events) == 1: if self.events[0][0] == '': return # else: # event_type = self.events[0][:-1] # least_time_event = self.events[0][:-1] # self.events[0] = self.next_event(event_type) # yield self.file_list[event_type], least_time_event least_time_event = sorted(self.events, key=self.sort_function)[0] event_type = least_time_event[-1] yield self.file_list[event_type], least_time_event[:-1] new_event = self.next_event(event_type) if new_event[0] == '': self.files[event_type].close() self.files.pop(event_type) self.sorters.pop(event_type) self.event_types.pop(event_type) self.events.pop(event_type) self.file_list.pop(event_type) self.event_lengths.pop(event_type) if len(self.files) == 0: return else: self.events[event_type] = new_event assert len(self.events[event_type]) == self.event_lengths[event_type] + 1