Source code for trajectorydata

"""Top-level package for TrajectoryData."""
import hashlib
import uuid
import os
import re
import json
import multiprocessing
from collections import OrderedDict, defaultdict

import numpy as np


__version__ = '0.1.0'


[docs]class TrajectoryParserError(Exception): """Exception raised if a :class:`TrajectoryData` file is malformed""" pass
[docs]class TrajectoryData(object): """Tabular data of expectation values for one or more trajectories. Multiple `TrajectoryData` objects can be combined with the :meth:`extend` method, in order to accumulate averages over an arbitrary number o trajectories. As much as possible, it is checked that all trajectories are statistically independent. A record is kept to ensure exact reproducibility. Arguments: ID (str): A unique, RFC 4122 compliant identifier (as generated by :meth:`new_id`) dt (float): Time step between data points (>0) seed (int): The random number generator seed on which the data is based n_trajectories (int): The number of trajectories from which the data is averaged (It is assumed that the random number generator was seeded with the given seed, and then the given number of trajectories were calculated *sequentially*) data (dict of str to tuple of arrays): dictionary (preferably `OrderedDict`) of expectation value data. The value of ``data[operator_name]`` must be a tuple of four numpy arrays (real part of expectation value, imaginary part of expectation value, real part of standard deviation, imaginary part of standard deviation). The operator names must contain only ASCII characters and must be shorter than ``col_width - 10``. Raises: ValueError: if `ID` is not RFC 4122 compliant, `dt` is an invalid or non-positive float, or `data` does not follow the correct structure. Attributes: ID (str): A unique ID for the current state of the `TrajectoryData` (read-only). See :attr:`ID` property. table (OrderedDict of str to numpy array): A table that contains four column for every known operator (real/imaginary part of the expectation value, real/imaginary part of the variance). Note that the `table` attribute can easily be converted to a :obj:`pandas.DataFrame` (``DataFrame(data=traj.table)``). The `table` attribute should be considered read-only. dt (float): Time step between data points nt (int): Number of time steps / data points operators (list of str): An iterator of the operator names. The column names in the `table` attribute derive from these. Assuming "X" is one of the operator names, there will be four keys in `table`: "Re[<X>]", "Im[<X>]", "Re[var(X)]", "Im[var(X)]" record (OrderedDic of str to tuple of int, int, list): A copy of the complete record of how the averaged expectation values for all operators were obtained. See discussion of the :attr:`record` property. col_width (int): width of the data columns when writing out data. Defaults to 25 (allowing to full double precision). Note that operator names may be at most of length ``col_width-10`` """ # When instantiating with the from_qsd_data class method, we want the # ID to depend uniquely on the read data files (via their md5 hash). # RFC4122 accounts for a situation like this with a "UUID3" that requires # the combination of a namespace and a name. We define a completely # arbitrary namespace UUID here for this purpose. _uuid_namespace = uuid.UUID('c84069eb-cf80-48a6-9584-74b7f2c742c1') _prec_dt = 1.0e-6 # how close dt's have to be to be equal col_width = 25 # width of columns when writing _col_padding = 10 # space needed in col header besides operator name _rx = { 'op_name': re.compile(r'^[\x20-\x7E]+$'), # ascii w/o control chars 'head_ID': re.compile(r'# QNET Trajectory Data ID\s+' r'(?P<ID>[a-f\d-]{36})'), 'record': re.compile(r'# Record\s+(?P<ID>[a-f\d-]{36})\s*' r'\(seed (?P<seed>\d+)\):' r'\s*(?P<n_traj>\d+)' r'(\s*(?P<ops>\[.*\]))?$'), 'header': re.compile(r'#\s+t') } def __init__(self, ID, dt, seed, n_trajectories, data): # seed and n_trajectories may be passed None without raising an error, # assuming the _record is manually set immediately after instantiation self._ID = str(uuid.UUID(ID)) # self.ID = ID, with validation self.table = OrderedDict() try: self._dt = float(dt) except TypeError: raise ValueError("dt must be a float with value >0") if self.dt <= 0.0: raise ValueError("dt must be a value >0") self._operators = [] for op, (re_exp, im_exp, re_var, im_var) in data.items(): op = str(op).strip() if len(op) > self.col_width-self._col_padding: self.col_width = len(op) + self._col_padding self._check_op_name(op) self._operators.append(op) self._nt = len(re_exp) # assumed valid for all (check below) re_exp_lb, im_exp_lb, re_var_lb, im_var_lb \ = self._operator_cols(op) self.table[re_exp_lb] = np.array(re_exp, dtype=np.float64) self.table[im_exp_lb] = np.array(im_exp, dtype=np.float64) self.table[re_var_lb] = np.array(re_var, dtype=np.float64) self.table[im_var_lb] = np.array(im_var, dtype=np.float64) for col in self.table: if len(self.table[col]) != self.nt: raise ValueError("All columns must be of length nt") record_ops = list(self._operators) self._record = OrderedDict([ (self.ID, (seed, n_trajectories, record_ops)), ]) @classmethod def _check_op_name(cls, op): """Raise a ValueError if op is not a valid operator name (with at most `max_len` characters if `max_len > 0`)""" if not cls._rx['op_name'].match(op): raise ValueError(("Operator name '%s' contains invalid " "characters") % op) brackets = 0 for letter in op: if letter == '[': brackets += 1 if letter == ']': brackets -= 1 if brackets != 0: raise ValueError(("Operator name '%s' contains unbalanced " "brackets") % op) def __eq__(self, other): return self.ID == other.ID def __hash__(self): return hash(self.ID)
[docs] def copy(self): """Return a (deep) copy of the current object""" data = OrderedDict() for op in self._operators: cols = self._operator_cols(op) data[op] = tuple([self.table[col] for col in cols]) new = self.__class__(ID=self.ID, dt=self.dt, seed=None, n_trajectories=None, data=data) new._record = self._record.copy() return new
@classmethod def _operator_cols(cls, op): """Return the four column names holding the data for the given operator name Raises: ValueError: if invalid operator name is given """ cls._check_op_name(op) return ['Re[<'+op+'>]', 'Im[<'+op+'>]', 'Re[var('+op+')]', 'Im[var('+op+')]'] @staticmethod def _parse_header_line(line, strip=False): """Split up the header from a data file (see `read` method)""" fields = [] field = '' bracket_level = 0 for letter in line: field += letter if letter == '[': bracket_level += 1 elif letter == ']': bracket_level -= 1 if bracket_level < 0: raise TrajectoryParserError( "Invalid header line (unbalanced brackets): '%s'" % line.strip()) if bracket_level == 0: if strip: field = field.strip() fields.append(field); field = '' elif len(fields) == 0: if letter == 't': fields.append(field); field = '' if len(field.strip()) > 0: raise TrajectoryParserError( "Invalid header line (trailing characters): '%s'" % line.strip()) return fields @classmethod def _get_op_names_from_header_line(cls, line): """Return list of operator names based on the data file header (see read method). Raises TrajectoryParserError if any operator names are invalid or the header is in any other way invalid """ # we assume that column names follow cls._operator_cols(op) ops = [] cols = cls._parse_header_line(line, strip=True) for col_name in cols: if col_name.startswith("Re[<"): op = col_name[4:-2] try: cls._check_op_name(op) except ValueError as exc: raise TrajectoryParserError(exc) ops.append(op) if len(ops) == 0: raise TrajectoryParserError("Malformed header, cannot extract " "operator names") if (4*len(ops)+1) != len(cols): raise TrajectoryParserError( "Unexpected number of columns for %d operator(s)" % len(ops)) return ops
[docs] @classmethod def read(cls, filename): """Read in TrajectoryData from the given filename. The file must be in the format generated by the `write` method. Raises: TrajectoryParserError: if the file has an incorrect format """ ID = None dt = None operators = [] record = OrderedDict() col_width = cls.col_width with open(filename) as in_fh: # process the header only for line in in_fh: if not line.startswith("#"): break # done with header m = cls._rx['head_ID'].match(line) if m: ID = m.group('ID') m = cls._rx['record'].match(line) if m: record_ID = m.group('ID') seed = int(m.group('seed')) n_traj = int(m.group('n_traj')) record_ops = 'ALL' # temporary placeholder if m.group('ops') is not None: record_ops = json.loads(m.group('ops')) record[record_ID] = (seed, n_traj, record_ops) m = cls._rx['header'].match(line) if m: try: operators = cls._get_op_names_from_header_line(line) except TrajectoryParserError as exc: raise TrajectoryParserError("File %s: %s" % (filename, exc)) # We assume that the regex is written in such a way that it # matches exactly the first column header (time grid), such # that we can derive the general column width from the # extent of the match (all columns have the same width) col_width = m.end() if ID is None: raise TrajectoryParserError("File %s does not define an ID" % filename) if len(record) == 0: raise TrajectoryParserError("File %s does not contain a record" % filename) if len(operators) == 0: raise TrajectoryParserError(("File %s does not contain a header, " "or missing time grid") % filename) for (record_ID, (seed, n_traj, record_ops)) in record.items(): if record_ops == 'ALL': # temporary placeholder, see above record[record_ID] = (seed, n_traj, list(operators)) # process the actual data try: table = np.loadtxt(filename, dtype=np.float64, comments='#') except ValueError as exc: raise TrajectoryParserError("Could not read data in %s: %s" % (filename, str(exc))) if len(table) == 0: raise TrajectoryParserError("File %s contains no data" % filename) if len(table.shape) < 2: raise TrajectoryParserError("Too few rows in %s (minimum 2)" % filename) n_cols = table.shape[1] if n_cols != (4*len(operators)+1): raise TrajectoryParserError(("In file %s, the number of data " "columns differs from the number indicated in the header") % filename) dt = table[1,0] - table[0,0] data = OrderedDict() # data in the format that __init__ accepts i = 0 # counter for column in table for op in operators: op_cols = [] for col in cls._operator_cols(op): i += 1 op_cols.append(np.array(table[:,i])) data[op] = tuple(op_cols) traj = cls(ID, dt=dt, seed=None, n_trajectories=None, data=data) traj._record = record if col_width != traj.col_width: traj.col_width = col_width return traj
[docs] @classmethod def from_qsd_data(cls, operators, seed, workdir='.'): """Instantiate from one or more QSD output files specified as values of the dictionary `operators` Each QSD output file must have the following structure: * The first line must start with the string "Number_of_Trajectories", followed by an integer (separated by whitespace) * All following lines must contain five floating point numbers (time, real/imaginary part of expectation value, and real/imaginary part of variance), separated by whitespace. All QSD output files must contain the same number of lines, specify the same number of trajectories, and use the same time grid values (first column). It is the user's responsibility to ensure that all out output files were indeed generated in a single QSD run using the specified initial seed for the random number generator. Arguments: operators (dict of str to str): dictionary (preferably `OrderedDict`) of operator name to filename. The filenames are relative to the `workdir`. Each filename must contain data in the format described above seed (int): The seed to the random number generator that was used to produce the data file workdir (str): directory to which the filenames in `operators` are relative to Raises: ValueError: if any of the data files do not have the correct format or are inconsistent Note: Remember that is is vitally important that all quantum trajectories that go into an average are statistically independent. The :class:`TrajectoryData` class tries as much as possible to ensure this, by refusing to combine identical IDs, or trajectories originating from the same seed. To this end, in the :meth:`~TrajectoryData.from_qsd_data` method, the ID of the instantiated object will depend uniquely on the collective data read from the QSD output files.""" md5s = [] # MD5 hash of all files (in order to generate ID) data = OrderedDict(); n_trajectories = None dt = None if len(operators) == 0: raise ValueError("Must give at least one mapping " "operator_name => file in operators dic") for (operator_name, file_name) in operators.items(): file_name = os.path.join(workdir, file_name) md5s.append(hashlib.md5(open(file_name,'rb').read()).hexdigest()) with open(file_name) as in_fh: header = in_fh.readline() m = re.match(r'Number_of_Trajectories\s*(\d+)', header) if m: file_n_trajectories = int(m.group(1)) else: raise ValueError(("First line in %s must contain the " "Number_of_Trajectories")%file_name) (tgrid, re_exp, im_exp, re_var, im_var) \ = np.genfromtxt(file_name, dtype=np.float64, skip_header=1, unpack=True) data[operator_name] = (re_exp, im_exp, re_var, im_var) if len(tgrid) < 2: raise ValueError(("File %s does not contain sufficient " "data (at least two rows)") % file_name) file_dt = tgrid[1] - tgrid[0] if dt is None: dt = file_dt else: if abs(dt-file_dt) > cls._prec_dt: raise ValueError(("dt in file %s inconsistent with dt " "in other files")%file_name) if n_trajectories is None: n_trajectories = file_n_trajectories else: if n_trajectories != file_n_trajectories: raise ValueError(("number of trajectories in file %s " "does not match number of trajectories " "in other files")%file_name) ID = cls.new_id(name="".join(sorted(md5s))) return cls(ID, dt, seed, n_trajectories, data)
[docs] @classmethod def new_id(cls, name=None): """Generate a new unique identifier, as a string. The identifier will be RFC 4122 compliant. If name is None, the resulting ID will be random. Otherwise, name must be a string that the ID will depend on. That is, calling `new_id` repeatedly with the same `name` will result in identical IDs. """ if name is None: return str(uuid.uuid4()) else: return str(uuid.uuid3(cls._uuid_namespace, name))
@property def ID(self): """A unique RFC 4122 compliant identifier. The identifier changes whenever the class data is modified (via the :meth:`extend` method). Two instances of `TrajectoryData` with the same ID are assumed to be identical """ return self._ID @property def record(self): """A copy of the full trajectory record, i.e., a history of calls to the :meth:`extend` method. Its purpose is to ensure that the data is completely reproducible. This entails storing the seed to the random number generator for all sets of trajectories. The record is an `OrderedDict` that maps the original ID of any `TrajectoryData` instance combined via :meth:`extend` to a tuple ``(seed, n_trajectories, ops)``, where ``seed`` is the seed to the random number generator that was used to calculate a specific set of trajectories (sequentially), ``n_trajectories`` are the number of trajectories in that dataset, and ``ops`` is a list of operator names for which expectation values were calculated. This may be the complete list of operators in the `operators` attribute, or a subset of those operators (Not all trajectories have to include data for all operators). For example, let's assume we have a :obj:`~qnet.misc.qsd_codegen.QSDCodeGen` instance to set up for a QSD propagation. Two observables 'X1', 'X2', have been added to be written to file 'X1.out', and 'X2.out'. The :meth:`~qnet.misc.qsd_codegen.QSDCodeGen.set_trajectories` method has been called with ``n_trajectories=10``, after which a call to :meth:`~qnet.misc.qsd_codegen.QSDCodeGen.run` with argument ``seed=SEED1``, performed a sequential propagation of 10 trajectories, with the averaged expectation values written to the output files. This data may now be read into a new `TrajectoryData` instance ``traj`` via the :meth:`from_qsd_data` class method (with `seed=SEED1`). The newly created instance (with, let's say, ``ID='8d102e4b-...'``) will have one entry in its record:: '8d102e4b-...': (SEED1, 10, ['X1', 'X2']) Now, let's say we add a new observable 'A2' (output file 'A2.out') for the :obj:`~qnet.misc.qsd_codegen.QSDCodeGen` instance (in addition to the existing observables X1, X2), and call the :meth:`~qnet.misc.qsd_codegen.QSDCodeGen.run` method again, with a new seed ``SEED2``. We then update ``traj`` with a call such as:: traj.extend(TrajectoryData.from_qsd_data( {'X1':'X1.out', 'X2':'X2.out', 'A2':'A2.out'}, SEED2) The record will now have an additional entry, e.g.:: 'd9831647-...': (SEED2, 10, ['X1', 'X2', 'A2']) ``traj.table`` will contain the averaged expectation values (average over 20 trajectories for 'X1', 'X2', and 10 trajectories for 'A2'). The record tells use that to reproduce this table, 10 sequential trajectories starting from ``SEED1`` must be performed for X1, X2, followed by another 10 trajectories for X1, X2, A2 starting from ``SEED2``. """ return self._record.copy() @property def operators(self): """Iterator over all operators""" return iter(self._operators) @property def record_IDs(self): """Set of all IDs in the record""" return set(self._record.keys()) @property def dt(self): """Time step between data points""" return self._dt @property def nt(self): """Number of time steps / data points""" return self._nt @property def shape(self): """Tuple ``(n_row, n_cols)`` for the data in self.table. The time grid is included in the column count""" return (self.nt, len(self.table.keys())+1) @property def record_seeds(self): """Set of all random number generator seeds in the record""" return set([seed for (seed, ntraj, op) in self._record.values()]) @property def tgrid(self): """Time grid, as numpy array""" return np.array(range(self.nt)) * self._dt def __str__(self): return self.to_str(show_rows=6) def __repr__(self): return "TrajectoryData(ID=%s)" % self.ID def _data_line(self, index, fmt): line = fmt % (self._dt*index) for col in self.table.keys(): line += fmt % self.table[col][index] return line
[docs] def to_str(self, show_rows=-1): """Generate full string representation of the `TrajectoryData` Parameters: show_rows (int): If given > 0, maximum number of data rows to show. If there are more rows, they will be indicated by an ellipsis (``...``) Raises: ValueError: if any operator name is too long to generate a label that fits in the limit given by the :attr:`col_width` class attribute """ lines = [] w = self.col_width prec = min(self.col_width - 9, 16) # 16 = max. double precision fmt = "%{width:d}.{prec:d}e".format(width=w, prec=prec) ellipsis = "...".center(w) lines.append("# QNET Trajectory Data ID %s" % self.ID) for (ID, (seed, n_traj, ops)) in self._record.items(): if set(ops) == set(self._operators): lines.append("# Record %s (seed %d): %d" % (ID, seed, n_traj)) else: lines.append("# Record %s (seed %d): %d %s" % (ID, seed, n_traj, json.dumps(ops))) header = ("#%{width:d}s".format(width=w-1)) % 't' for col in self.table.keys(): if len(col) >= self.col_width: raise ValueError(("Column name '%s' must be shorter than max " "column width %d") % (col, self.col_width)) header += ("%{width:d}s".format(width=w)) % col lines.append(header) nt = self.nt if (show_rows < 0) or (show_rows >= nt): show_row_indices_1 = range(nt) show_row_indices_2 = [] else: show_row_indices_1 = range(show_rows//2) show_row_indices_2 = range(nt - ((show_rows//2)+(show_rows%2)), nt) # if show_rows is odd, we show the first show_rows//2 rows, and the # last (show_rows//2)+1 rows. for i in show_row_indices_1: lines.append(self._data_line(i, fmt)) if len(show_row_indices_2) > 0: lines.append((ellipsis * int(len(self.table)+1)).rstrip()) for i in show_row_indices_2: lines.append(self._data_line(i, fmt)) return "\n".join(lines)
[docs] def write(self, filename): """Write data to a text file. The `TrajectoryData` may later be restored by the `read` class method from the same file""" with open(filename, 'w') as out_fh: out_fh.write(self.to_str())
[docs] def n_trajectories(self, operator): "Return the total number of trajectories for the given operator" n_total = 0 for ID, (seed, n_traj, ops) in self._record.items(): if operator in ops: n_total += n_traj return n_total
[docs] def extend(self, *others, **kwargs): """Extend data with data from one or more other `TrajectoryData` instances, averaging the expectation values. Equivalently to ``traj1.extend(traj2)``, the syntax ``traj1 += traj2`` may be used. Raises: ValueError: if `data` in `self` and and any element of `others` are incompatible TypeError: if any `others` are not an instance of `TrajectoryData` """ n_procs = kwargs.get('n_procs', 1) err_msg = "TrajectoryData may only be extended by completely "\ "disjunct other TrajectoryData object" if len(others) == 0: return n_trajs = defaultdict(lambda: 0) # op => total number of trajectories # X_n -> n * X_n for existing operators for op in self._operators: n_trajs[op] = self.n_trajectories(op) for col in self._operator_cols(op): self.table[col] *= n_trajs[op] used_IDs = [self.ID, ] # X_n += n_j*X_j for j over all j sets of trajectories if n_procs > 1: # perform a first pass, in which `others` is split into chuncs, # then each chunk is average, and finally `others` is replaced with # the list of averaged trajectories from all the chunks. chunks = [] splitsize = 1.0/n_procs*len(others) for i in range(n_procs): chunks.append(others[int(round(i*splitsize)) :int(round((i+1)*splitsize))]) if len(chunks[-1]) == 0: del chunks[-1] n_chunks = len(chunks) others = multiprocessing.Pool(n_chunks).map(_extend_worker, chunks) try: for other in others: if not isinstance(other, TrajectoryData): raise TypeError("All 'others' must be instances of " "TrajectoryData") if not self.record_IDs.isdisjoint(other.record_IDs): raise ValueError("%s: Repeated ID"%err_msg) if not self.record_seeds.isdisjoint(other.record_seeds): if not set(self.operators).isdisjoint(set(other.operators)): raise ValueError("%s: Repeated seed"%err_msg) if not abs(self.dt - other.dt) < self._prec_dt: raise ValueError("Extending TrajectoryData does not " "match dt") for op in other._operators: n_other = other.n_trajectories(op) n_trajs[op] = n_trajs[op] + n_other if op in self._operators: for col in self._operator_cols(op): if n_other == 1: self.table[col] += other.table[col] else: self.table[col] += n_other * other.table[col] else: self._operators.append(op) for col in self._operator_cols(op): if n_other == 1: self.table[col] = other.table[col].copy() else: self.table[col] = n_other * other.table[col] self._record.update(other._record) used_IDs.append(other.ID) finally: # do not leave trajectory data in a broken state! # X_n -> X_n / n (with new n) for op in self._operators: for col in self._operator_cols(op): if n_trajs[op] > 1: self.table[col] /= n_trajs[op] self._ID = self.new_id(name="".join(sorted(used_IDs)))
def __add__(self, other): combined = self.copy() combined.extend(other) return combined def __iadd__(self, other): self.extend(other) return self
def _extend_worker(chunk): # chuck is guaranteed to have a least one traj if len(chunk) == 1: return chunk[0] traj = chunk[0].copy() traj.extend(*chunk[1:]) return traj