"""Top-level package for TrajectoryData."""
import hashlib
import uuid
import os
import re
import json
import multiprocessing
from collections import OrderedDict, defaultdict
import numpy as np
__version__ = '0.1.0'
[docs]class TrajectoryParserError(Exception):
"""Exception raised if a :class:`TrajectoryData` file is malformed"""
pass
[docs]class TrajectoryData(object):
"""Tabular data of expectation values for one or more trajectories.
Multiple `TrajectoryData` objects can be combined with the :meth:`extend`
method, in order to accumulate averages over an arbitrary number o
trajectories. As much as possible, it is checked that all trajectories are
statistically independent. A record is kept to ensure exact
reproducibility.
Arguments:
ID (str): A unique, RFC 4122 compliant identifier (as generated by
:meth:`new_id`)
dt (float): Time step between data points (>0)
seed (int): The random number generator seed on which the data is
based
n_trajectories (int): The number of trajectories from which the data
is averaged (It is assumed that the random number generator was
seeded with the given seed, and then the given number of
trajectories were calculated *sequentially*)
data (dict of str to tuple of arrays): dictionary (preferably
`OrderedDict`) of expectation value data. The value of
``data[operator_name]`` must be a tuple of four numpy arrays (real
part of expectation value, imaginary part of expectation value,
real part of standard deviation, imaginary part of standard
deviation). The operator names must contain only ASCII characters
and must be shorter than ``col_width - 10``.
Raises:
ValueError: if `ID` is not RFC 4122 compliant, `dt` is an invalid or
non-positive float, or `data` does not follow the correct
structure.
Attributes:
ID (str): A unique ID for the current state of the `TrajectoryData`
(read-only). See :attr:`ID` property.
table (OrderedDict of str to numpy array): A table that contains four
column for every known operator (real/imaginary part of the
expectation value, real/imaginary part of the variance). Note that
the `table` attribute can easily be converted to a
:obj:`pandas.DataFrame` (``DataFrame(data=traj.table)``). The
`table` attribute should be considered read-only.
dt (float): Time step between data points
nt (int): Number of time steps / data points
operators (list of str): An iterator of the operator names. The column
names in the `table` attribute derive from these. Assuming "X" is
one of the operator names, there will be four keys in `table`:
"Re[<X>]", "Im[<X>]", "Re[var(X)]", "Im[var(X)]"
record (OrderedDic of str to tuple of int, int, list): A copy of the
complete record of how the averaged expectation values for all
operators were obtained. See discussion of the :attr:`record`
property.
col_width (int): width of the data columns when writing out data.
Defaults to 25 (allowing to full double precision). Note that
operator names may be at most of length ``col_width-10``
"""
# When instantiating with the from_qsd_data class method, we want the
# ID to depend uniquely on the read data files (via their md5 hash).
# RFC4122 accounts for a situation like this with a "UUID3" that requires
# the combination of a namespace and a name. We define a completely
# arbitrary namespace UUID here for this purpose.
_uuid_namespace = uuid.UUID('c84069eb-cf80-48a6-9584-74b7f2c742c1')
_prec_dt = 1.0e-6 # how close dt's have to be to be equal
col_width = 25 # width of columns when writing
_col_padding = 10 # space needed in col header besides operator name
_rx = {
'op_name': re.compile(r'^[\x20-\x7E]+$'), # ascii w/o control chars
'head_ID': re.compile(r'# QNET Trajectory Data ID\s+'
r'(?P<ID>[a-f\d-]{36})'),
'record': re.compile(r'# Record\s+(?P<ID>[a-f\d-]{36})\s*'
r'\(seed (?P<seed>\d+)\):'
r'\s*(?P<n_traj>\d+)'
r'(\s*(?P<ops>\[.*\]))?$'),
'header': re.compile(r'#\s+t')
}
def __init__(self, ID, dt, seed, n_trajectories, data):
# seed and n_trajectories may be passed None without raising an error,
# assuming the _record is manually set immediately after instantiation
self._ID = str(uuid.UUID(ID)) # self.ID = ID, with validation
self.table = OrderedDict()
try:
self._dt = float(dt)
except TypeError:
raise ValueError("dt must be a float with value >0")
if self.dt <= 0.0:
raise ValueError("dt must be a value >0")
self._operators = []
for op, (re_exp, im_exp, re_var, im_var) in data.items():
op = str(op).strip()
if len(op) > self.col_width-self._col_padding:
self.col_width = len(op) + self._col_padding
self._check_op_name(op)
self._operators.append(op)
self._nt = len(re_exp) # assumed valid for all (check below)
re_exp_lb, im_exp_lb, re_var_lb, im_var_lb \
= self._operator_cols(op)
self.table[re_exp_lb] = np.array(re_exp, dtype=np.float64)
self.table[im_exp_lb] = np.array(im_exp, dtype=np.float64)
self.table[re_var_lb] = np.array(re_var, dtype=np.float64)
self.table[im_var_lb] = np.array(im_var, dtype=np.float64)
for col in self.table:
if len(self.table[col]) != self.nt:
raise ValueError("All columns must be of length nt")
record_ops = list(self._operators)
self._record = OrderedDict([
(self.ID, (seed, n_trajectories, record_ops)),
])
@classmethod
def _check_op_name(cls, op):
"""Raise a ValueError if op is not a valid operator name (with at most
`max_len` characters if `max_len > 0`)"""
if not cls._rx['op_name'].match(op):
raise ValueError(("Operator name '%s' contains invalid "
"characters") % op)
brackets = 0
for letter in op:
if letter == '[':
brackets += 1
if letter == ']':
brackets -= 1
if brackets != 0:
raise ValueError(("Operator name '%s' contains unbalanced "
"brackets") % op)
def __eq__(self, other):
return self.ID == other.ID
def __hash__(self):
return hash(self.ID)
[docs] def copy(self):
"""Return a (deep) copy of the current object"""
data = OrderedDict()
for op in self._operators:
cols = self._operator_cols(op)
data[op] = tuple([self.table[col] for col in cols])
new = self.__class__(ID=self.ID, dt=self.dt, seed=None,
n_trajectories=None, data=data)
new._record = self._record.copy()
return new
@classmethod
def _operator_cols(cls, op):
"""Return the four column names holding the data for the given
operator name
Raises:
ValueError: if invalid operator name is given
"""
cls._check_op_name(op)
return ['Re[<'+op+'>]', 'Im[<'+op+'>]',
'Re[var('+op+')]', 'Im[var('+op+')]']
@staticmethod
def _parse_header_line(line, strip=False):
"""Split up the header from a data file (see `read` method)"""
fields = []
field = ''
bracket_level = 0
for letter in line:
field += letter
if letter == '[':
bracket_level += 1
elif letter == ']':
bracket_level -= 1
if bracket_level < 0:
raise TrajectoryParserError(
"Invalid header line (unbalanced brackets): '%s'"
% line.strip())
if bracket_level == 0:
if strip:
field = field.strip()
fields.append(field); field = ''
elif len(fields) == 0:
if letter == 't':
fields.append(field); field = ''
if len(field.strip()) > 0:
raise TrajectoryParserError(
"Invalid header line (trailing characters): '%s'"
% line.strip())
return fields
@classmethod
def _get_op_names_from_header_line(cls, line):
"""Return list of operator names based on the data file header (see
read method). Raises TrajectoryParserError if any operator names are
invalid or the header is in any other way invalid
"""
# we assume that column names follow cls._operator_cols(op)
ops = []
cols = cls._parse_header_line(line, strip=True)
for col_name in cols:
if col_name.startswith("Re[<"):
op = col_name[4:-2]
try:
cls._check_op_name(op)
except ValueError as exc:
raise TrajectoryParserError(exc)
ops.append(op)
if len(ops) == 0:
raise TrajectoryParserError("Malformed header, cannot extract "
"operator names")
if (4*len(ops)+1) != len(cols):
raise TrajectoryParserError(
"Unexpected number of columns for %d operator(s)" % len(ops))
return ops
[docs] @classmethod
def read(cls, filename):
"""Read in TrajectoryData from the given filename. The file must be in
the format generated by the `write` method.
Raises:
TrajectoryParserError: if the file has an incorrect format
"""
ID = None
dt = None
operators = []
record = OrderedDict()
col_width = cls.col_width
with open(filename) as in_fh:
# process the header only
for line in in_fh:
if not line.startswith("#"):
break # done with header
m = cls._rx['head_ID'].match(line)
if m:
ID = m.group('ID')
m = cls._rx['record'].match(line)
if m:
record_ID = m.group('ID')
seed = int(m.group('seed'))
n_traj = int(m.group('n_traj'))
record_ops = 'ALL' # temporary placeholder
if m.group('ops') is not None:
record_ops = json.loads(m.group('ops'))
record[record_ID] = (seed, n_traj, record_ops)
m = cls._rx['header'].match(line)
if m:
try:
operators = cls._get_op_names_from_header_line(line)
except TrajectoryParserError as exc:
raise TrajectoryParserError("File %s: %s"
% (filename, exc))
# We assume that the regex is written in such a way that it
# matches exactly the first column header (time grid), such
# that we can derive the general column width from the
# extent of the match (all columns have the same width)
col_width = m.end()
if ID is None:
raise TrajectoryParserError("File %s does not define an ID"
% filename)
if len(record) == 0:
raise TrajectoryParserError("File %s does not contain a record"
% filename)
if len(operators) == 0:
raise TrajectoryParserError(("File %s does not contain a header, "
"or missing time grid") % filename)
for (record_ID, (seed, n_traj, record_ops)) in record.items():
if record_ops == 'ALL': # temporary placeholder, see above
record[record_ID] = (seed, n_traj, list(operators))
# process the actual data
try:
table = np.loadtxt(filename, dtype=np.float64, comments='#')
except ValueError as exc:
raise TrajectoryParserError("Could not read data in %s: %s"
% (filename, str(exc)))
if len(table) == 0:
raise TrajectoryParserError("File %s contains no data"
% filename)
if len(table.shape) < 2:
raise TrajectoryParserError("Too few rows in %s (minimum 2)"
% filename)
n_cols = table.shape[1]
if n_cols != (4*len(operators)+1):
raise TrajectoryParserError(("In file %s, the number of data "
"columns differs from the number indicated in the header")
% filename)
dt = table[1,0] - table[0,0]
data = OrderedDict() # data in the format that __init__ accepts
i = 0 # counter for column in table
for op in operators:
op_cols = []
for col in cls._operator_cols(op):
i += 1
op_cols.append(np.array(table[:,i]))
data[op] = tuple(op_cols)
traj = cls(ID, dt=dt, seed=None, n_trajectories=None, data=data)
traj._record = record
if col_width != traj.col_width:
traj.col_width = col_width
return traj
[docs] @classmethod
def from_qsd_data(cls, operators, seed, workdir='.'):
"""Instantiate from one or more QSD output files specified as values of
the dictionary `operators`
Each QSD output file must have the following structure:
* The first line must start with the string "Number_of_Trajectories",
followed by an integer (separated by whitespace)
* All following lines must contain five floating point numbers
(time, real/imaginary part of expectation value, and real/imaginary
part of variance), separated by whitespace.
All QSD output files must contain the same number of lines, specify the
same number of trajectories, and use the same time grid values (first
column). It is the user's responsibility to ensure that all out output
files were indeed generated in a single QSD run using the specified
initial seed for the random number generator.
Arguments:
operators (dict of str to str): dictionary (preferably
`OrderedDict`) of operator name to filename. The filenames are
relative to the `workdir`. Each filename must contain data in
the format described above
seed (int): The seed to the random number generator that was used
to produce the data file
workdir (str): directory to which the filenames in `operators` are
relative to
Raises:
ValueError: if any of the data files do not have the correct
format or are inconsistent
Note:
Remember that is is vitally important that all quantum trajectories
that go into an average are statistically independent. The
:class:`TrajectoryData` class tries as much as possible to ensure
this, by refusing to combine identical IDs, or trajectories
originating from the same seed. To this end, in the
:meth:`~TrajectoryData.from_qsd_data` method, the
ID of the instantiated object will depend uniquely on the
collective data read from the QSD output files."""
md5s = [] # MD5 hash of all files (in order to generate ID)
data = OrderedDict();
n_trajectories = None
dt = None
if len(operators) == 0:
raise ValueError("Must give at least one mapping "
"operator_name => file in operators dic")
for (operator_name, file_name) in operators.items():
file_name = os.path.join(workdir, file_name)
md5s.append(hashlib.md5(open(file_name,'rb').read()).hexdigest())
with open(file_name) as in_fh:
header = in_fh.readline()
m = re.match(r'Number_of_Trajectories\s*(\d+)', header)
if m:
file_n_trajectories = int(m.group(1))
else:
raise ValueError(("First line in %s must contain the "
"Number_of_Trajectories")%file_name)
(tgrid, re_exp, im_exp, re_var, im_var) \
= np.genfromtxt(file_name, dtype=np.float64, skip_header=1,
unpack=True)
data[operator_name] = (re_exp, im_exp, re_var, im_var)
if len(tgrid) < 2:
raise ValueError(("File %s does not contain sufficient "
"data (at least two rows)") % file_name)
file_dt = tgrid[1] - tgrid[0]
if dt is None:
dt = file_dt
else:
if abs(dt-file_dt) > cls._prec_dt:
raise ValueError(("dt in file %s inconsistent with dt "
"in other files")%file_name)
if n_trajectories is None:
n_trajectories = file_n_trajectories
else:
if n_trajectories != file_n_trajectories:
raise ValueError(("number of trajectories in file %s "
"does not match number of trajectories "
"in other files")%file_name)
ID = cls.new_id(name="".join(sorted(md5s)))
return cls(ID, dt, seed, n_trajectories, data)
[docs] @classmethod
def new_id(cls, name=None):
"""Generate a new unique identifier, as a string. The identifier will
be RFC 4122 compliant. If name is None, the resulting ID will be
random. Otherwise, name must be a string that the ID will depend on.
That is, calling `new_id` repeatedly with the same `name` will
result in identical IDs.
"""
if name is None:
return str(uuid.uuid4())
else:
return str(uuid.uuid3(cls._uuid_namespace, name))
@property
def ID(self):
"""A unique RFC 4122 compliant identifier. The identifier changes
whenever the class data is modified (via the :meth:`extend` method).
Two instances of `TrajectoryData` with the same ID are assumed to be
identical
"""
return self._ID
@property
def record(self):
"""A copy of the full trajectory record, i.e., a history of calls to
the :meth:`extend` method. Its purpose is to ensure that the data is
completely reproducible. This entails storing the seed to the random
number generator for all sets of trajectories.
The record is an `OrderedDict` that maps the original ID of any
`TrajectoryData` instance combined via :meth:`extend` to a tuple
``(seed, n_trajectories, ops)``, where ``seed`` is the seed to the
random number generator that was used to calculate a specific set of
trajectories (sequentially), ``n_trajectories`` are the number of
trajectories in that dataset, and ``ops`` is a list of operator names
for which expectation values were calculated. This may be the complete
list of operators in the `operators` attribute, or a subset of those
operators (Not all trajectories have to include data for all
operators).
For example, let's assume we have a
:obj:`~qnet.misc.qsd_codegen.QSDCodeGen` instance to set up
for a QSD propagation. Two observables 'X1', 'X2', have been
added to be written to file 'X1.out', and 'X2.out'. The
:meth:`~qnet.misc.qsd_codegen.QSDCodeGen.set_trajectories` method
has been called with ``n_trajectories=10``, after which a call
to :meth:`~qnet.misc.qsd_codegen.QSDCodeGen.run` with argument
``seed=SEED1``, performed a sequential propagation of 10 trajectories,
with the averaged expectation values written to the output files.
This data may now be read into a new `TrajectoryData` instance ``traj``
via the :meth:`from_qsd_data` class method (with `seed=SEED1`). The
newly created instance (with, let's say, ``ID='8d102e4b-...'``) will
have one entry in its record::
'8d102e4b-...': (SEED1, 10, ['X1', 'X2'])
Now, let's say we add a new observable 'A2' (output file 'A2.out') for
the :obj:`~qnet.misc.qsd_codegen.QSDCodeGen` instance (in addition to
the existing observables X1, X2), and call the
:meth:`~qnet.misc.qsd_codegen.QSDCodeGen.run` method again,
with a new seed ``SEED2``. We then update ``traj`` with a call such
as::
traj.extend(TrajectoryData.from_qsd_data(
{'X1':'X1.out', 'X2':'X2.out', 'A2':'A2.out'}, SEED2)
The record will now have an additional entry, e.g.::
'd9831647-...': (SEED2, 10, ['X1', 'X2', 'A2'])
``traj.table`` will contain the averaged expectation values (average
over 20 trajectories for 'X1', 'X2', and 10 trajectories for 'A2').
The record tells use that to reproduce this table, 10 sequential
trajectories starting from ``SEED1`` must be performed for X1, X2,
followed by another 10 trajectories for X1, X2, A2 starting from
``SEED2``.
"""
return self._record.copy()
@property
def operators(self):
"""Iterator over all operators"""
return iter(self._operators)
@property
def record_IDs(self):
"""Set of all IDs in the record"""
return set(self._record.keys())
@property
def dt(self):
"""Time step between data points"""
return self._dt
@property
def nt(self):
"""Number of time steps / data points"""
return self._nt
@property
def shape(self):
"""Tuple ``(n_row, n_cols)`` for the data in self.table. The time grid
is included in the column count"""
return (self.nt, len(self.table.keys())+1)
@property
def record_seeds(self):
"""Set of all random number generator seeds in the record"""
return set([seed for (seed, ntraj, op) in self._record.values()])
@property
def tgrid(self):
"""Time grid, as numpy array"""
return np.array(range(self.nt)) * self._dt
def __str__(self):
return self.to_str(show_rows=6)
def __repr__(self):
return "TrajectoryData(ID=%s)" % self.ID
def _data_line(self, index, fmt):
line = fmt % (self._dt*index)
for col in self.table.keys():
line += fmt % self.table[col][index]
return line
[docs] def to_str(self, show_rows=-1):
"""Generate full string representation of the `TrajectoryData`
Parameters:
show_rows (int): If given > 0, maximum number of data rows to show.
If there are more rows, they will be indicated by an ellipsis
(``...``)
Raises:
ValueError: if any operator name is too long to generate a
label that fits in the limit given by the :attr:`col_width`
class attribute
"""
lines = []
w = self.col_width
prec = min(self.col_width - 9, 16) # 16 = max. double precision
fmt = "%{width:d}.{prec:d}e".format(width=w, prec=prec)
ellipsis = "...".center(w)
lines.append("# QNET Trajectory Data ID %s" % self.ID)
for (ID, (seed, n_traj, ops)) in self._record.items():
if set(ops) == set(self._operators):
lines.append("# Record %s (seed %d): %d" % (ID, seed, n_traj))
else:
lines.append("# Record %s (seed %d): %d %s"
% (ID, seed, n_traj, json.dumps(ops)))
header = ("#%{width:d}s".format(width=w-1)) % 't'
for col in self.table.keys():
if len(col) >= self.col_width:
raise ValueError(("Column name '%s' must be shorter than max "
"column width %d") % (col, self.col_width))
header += ("%{width:d}s".format(width=w)) % col
lines.append(header)
nt = self.nt
if (show_rows < 0) or (show_rows >= nt):
show_row_indices_1 = range(nt)
show_row_indices_2 = []
else:
show_row_indices_1 = range(show_rows//2)
show_row_indices_2 = range(nt - ((show_rows//2)+(show_rows%2)), nt)
# if show_rows is odd, we show the first show_rows//2 rows, and the
# last (show_rows//2)+1 rows.
for i in show_row_indices_1:
lines.append(self._data_line(i, fmt))
if len(show_row_indices_2) > 0:
lines.append((ellipsis * int(len(self.table)+1)).rstrip())
for i in show_row_indices_2:
lines.append(self._data_line(i, fmt))
return "\n".join(lines)
[docs] def write(self, filename):
"""Write data to a text file. The `TrajectoryData` may later be
restored by the `read` class method from the same file"""
with open(filename, 'w') as out_fh:
out_fh.write(self.to_str())
[docs] def n_trajectories(self, operator):
"Return the total number of trajectories for the given operator"
n_total = 0
for ID, (seed, n_traj, ops) in self._record.items():
if operator in ops:
n_total += n_traj
return n_total
[docs] def extend(self, *others, **kwargs):
"""Extend data with data from one or more other `TrajectoryData`
instances, averaging the expectation values. Equivalently to
``traj1.extend(traj2)``, the syntax ``traj1 += traj2`` may be used.
Raises:
ValueError: if `data` in `self` and and any element of `others` are
incompatible
TypeError: if any `others` are not an instance of `TrajectoryData`
"""
n_procs = kwargs.get('n_procs', 1)
err_msg = "TrajectoryData may only be extended by completely "\
"disjunct other TrajectoryData object"
if len(others) == 0:
return
n_trajs = defaultdict(lambda: 0) # op => total number of trajectories
# X_n -> n * X_n for existing operators
for op in self._operators:
n_trajs[op] = self.n_trajectories(op)
for col in self._operator_cols(op):
self.table[col] *= n_trajs[op]
used_IDs = [self.ID, ]
# X_n += n_j*X_j for j over all j sets of trajectories
if n_procs > 1:
# perform a first pass, in which `others` is split into chuncs,
# then each chunk is average, and finally `others` is replaced with
# the list of averaged trajectories from all the chunks.
chunks = []
splitsize = 1.0/n_procs*len(others)
for i in range(n_procs):
chunks.append(others[int(round(i*splitsize))
:int(round((i+1)*splitsize))])
if len(chunks[-1]) == 0:
del chunks[-1]
n_chunks = len(chunks)
others = multiprocessing.Pool(n_chunks).map(_extend_worker, chunks)
try:
for other in others:
if not isinstance(other, TrajectoryData):
raise TypeError("All 'others' must be instances of "
"TrajectoryData")
if not self.record_IDs.isdisjoint(other.record_IDs):
raise ValueError("%s: Repeated ID"%err_msg)
if not self.record_seeds.isdisjoint(other.record_seeds):
if not set(self.operators).isdisjoint(set(other.operators)):
raise ValueError("%s: Repeated seed"%err_msg)
if not abs(self.dt - other.dt) < self._prec_dt:
raise ValueError("Extending TrajectoryData does not "
"match dt")
for op in other._operators:
n_other = other.n_trajectories(op)
n_trajs[op] = n_trajs[op] + n_other
if op in self._operators:
for col in self._operator_cols(op):
if n_other == 1:
self.table[col] += other.table[col]
else:
self.table[col] += n_other * other.table[col]
else:
self._operators.append(op)
for col in self._operator_cols(op):
if n_other == 1:
self.table[col] = other.table[col].copy()
else:
self.table[col] = n_other * other.table[col]
self._record.update(other._record)
used_IDs.append(other.ID)
finally: # do not leave trajectory data in a broken state!
# X_n -> X_n / n (with new n)
for op in self._operators:
for col in self._operator_cols(op):
if n_trajs[op] > 1:
self.table[col] /= n_trajs[op]
self._ID = self.new_id(name="".join(sorted(used_IDs)))
def __add__(self, other):
combined = self.copy()
combined.extend(other)
return combined
def __iadd__(self, other):
self.extend(other)
return self
def _extend_worker(chunk):
# chuck is guaranteed to have a least one traj
if len(chunk) == 1:
return chunk[0]
traj = chunk[0].copy()
traj.extend(*chunk[1:])
return traj