import datetime
import subprocess
import os
import re
from mosdef_cassandra.runners.utils import check_system
from mosdef_cassandra.runners.utils import get_restart_name
from mosdef_cassandra.writers.writers import write_mcfs
from mosdef_cassandra.writers.writers import write_configs
from mosdef_cassandra.writers.writers import write_input
from mosdef_cassandra.writers.writers import write_pdb
from mosdef_cassandra.writers.writers import write_restart_input
from mosdef_cassandra.utils.detect import detect_cassandra_binaries
from mosdef_cassandra.utils.exceptions import CassandraRuntimeError
[docs]def run(system, moveset, run_type, run_length, temperature, **kwargs):
"""Run the Monte Carlo simulation with Cassandra
The following steps are performed: write the molecular connectivity
files for each species to disk, write the starting structures
(if any) to disk, generate and write the Cassandra input file to disk,
call Cassandra to generate the required fragment libraries, and
call Cassandra to run the MC simulation.
Parameters
----------
system : mosdef_cassandra.System
the System to simulate
moveset : mosdef_cassandra.MoveSet
the MoveSet to simulate
run_type : "equilibration" or "production"
the type of run; in "equilibration" mode, Cassandra adaptively changes
the maximum translation, rotation, and volume move sizes to achieve
an acceptance ratio of 0.5
run_length : int
length of the MC simulation
temperature : float
temperature at which to perform the MC simulation
**kwargs : keyword arguments
any other valid keyword arguments, see
``mosdef_cassandra.print_valid_kwargs()`` for details
"""
# Check that the user has the Cassandra binary on their PATH
# Also need library_setup.py on the PATH and python2
py, fraglib_setup, cassandra = detect_cassandra_binaries()
# Sanity checks
# TODO: Write more of these
check_system(system, moveset)
# Write MCF files
if "angle_style" in kwargs:
write_mcfs(system, angle_style=kwargs["angle_style"])
else:
write_mcfs(system)
# Write starting configs (if needed)
write_configs(system)
# Write input file
inp_file = write_input(
system=system,
moveset=moveset,
run_type=run_type,
run_length=run_length,
temperature=temperature,
**kwargs,
)
# Write pdb files (this step will be removed when frag generation
# is incorporated into this workflow )
for isp, top in enumerate(system.species_topologies):
filename = "species{}.pdb".format(isp + 1)
write_pdb(top, filename)
log_file = "mosdef_cassandra_{}.log".format(
datetime.datetime.now().strftime("%Y-%m-%d_%H:%M:%S.%f")
)
# Run fragment generation
print("Generating fragment libraries...")
_run_fraglib_setup(
py,
fraglib_setup,
cassandra,
inp_file,
log_file,
len(system.species_topologies),
)
# Run simulation
print("Running Cassandra...")
_run_cassandra(cassandra, inp_file, log_file)
[docs]def restart(
total_run_length=None, restart_from=None, run_name=None, run_type=None
):
"""Restart a Monte Carlo simulation from a checkpoint file with Cassandra
The function requires the following in the working directory. These items
would have all been generated for the original run:
* Cassandra input (.inp) file named {restart_from}.inp
* Cassandra checkpoint file (.chk) name {restart_from}.out.chk
* MCF files for each species
* Fragment libraries for each species
The maximum translation, rotation, and volume move sizes are read from
the checkpoint file. Similarly, the starting structure is taken from the
checkpoint file. If the "restart_name" is not provided or if the
"run_name" is the same as "restart_name", ".rst.N" will be appended to the
"run_name".
If you wish to extend a simulation you will need to specify the _total_
number of simulation steps desired with the total_run_length option. For example,
if your original run was 1e6 MC steps, but you wish to extend it by an
additional 1e6 steps, use total_run_length=2000000.
Parameters
----------
total_run_length: int, optional, default=None
total length of the MC simulation; if None, use original simulation length
restart_from: str, optional, default=None
name of run to restart from; if None, searches current
directory for Cassandra inp files
run_name: str, optional, default=None
name of this run; if None, appends ".rst.NNN." to run_name,
where "NNN" is the restart iteration "001", "002", ...,
run_type : str, "equilibration" or "production", default=None
the type of run; in "equilibration" mode, Cassandra adaptively changes
the maximum translation, rotation, and volume move sizes to achieve
an acceptance ratio of 0.5. If None, use the same choice as the
previous run.
"""
valid_run_types = ["equilibration", "equil", "production", "prod"]
# Check that the user has the Cassandra binary on their PATH
# Also need library_setup.py on the PATH and python2
py, fraglib_setup, cassandra = detect_cassandra_binaries()
# Parse the arguments
if total_run_length is not None:
if not isinstance(total_run_length, int):
raise TypeError("`total_run_length` must be an integer")
if run_name is not None:
if not isinstance(run_name, str):
raise TypeError("`run_name` must be a string")
if run_type is not None:
if (
not isinstance(run_type, str)
or run_type.lower() not in valid_run_types
):
raise TypeError(f"`run_type` must be one of: {valid_run_types}")
if run_type.lower() == "equil" or run_type.lower() == "equilibration":
run_type = "equilibration"
if run_type.lower() == "prod" or run_type.lower() == "production":
run_type = "production"
restart_from, run_name = get_restart_name(restart_from, run_name)
checkpoint_name = restart_from + ".out.chk"
if not os.path.isfile(checkpoint_name):
raise FileNotFoundError(
f"Checkpoint file: {checkpoint_name} does not exist."
)
write_restart_input(restart_from, run_name, run_type, total_run_length)
log_file = "mosdef_cassandra_{}.log".format(
datetime.datetime.now().strftime("%Y-%m-%d_%H:%M:%S.%f")
)
print("Running Cassandra...")
_run_cassandra(cassandra, run_name + ".inp", log_file)
def _run_fraglib_setup(
py, fraglib_setup, cassandra, inp_file, log_file, nspecies
):
"""Builds the fragment libraries required to run Cassandra.
Requires python.
"""
species_pdb_files = ""
for isp in range(nspecies):
species_pdb_files += "species{}.pdb ".format(isp + 1)
fraglib_cmd = (
"{py} {fraglib_setup} {cassandra} {inp_file} "
"{species_pdb_files}".format(
py=py,
fraglib_setup=fraglib_setup,
cassandra=cassandra,
inp_file=inp_file,
species_pdb_files=species_pdb_files,
)
)
p = subprocess.Popen(
fraglib_cmd,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True,
)
out, err = p.communicate()
with open(log_file, "a") as log:
header = (
"\n*************************************************\n"
"******* CASSANDRA FRAGLIB STANDARD OUTPUT *******\n"
"*************************************************\n\n"
)
log.write(header)
log.write(_clean_cassandra_log(out))
header = (
"\n*************************************************\n"
"******* CASSANDRA FRAGLIB STANDARD ERROR ********\n"
"*************************************************\n\n"
)
log.write(header)
log.write(_clean_cassandra_log(err))
if p.returncode != 0 or "error" in err.lower() or "error" in out.lower():
raise CassandraRuntimeError(
"Cassandra fragment library generation failed, "
"see {} for details".format(log_file)
)
def _run_cassandra(cassandra, inp_file, log_file):
"""Calls Cassandra"""
cassandra_cmd = "{cassandra} {inp_file}".format(
cassandra=cassandra, inp_file=inp_file
)
p = subprocess.Popen(
cassandra_cmd,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True,
)
out, err = p.communicate()
with open(log_file, "a") as log:
header = (
"\n*************************************************\n"
"*********** CASSANDRA STANDARD OUTPUT ***********\n"
"*************************************************\n\n"
)
log.write(header)
log.write(_clean_cassandra_log(out))
header = (
"\n*************************************************\n"
"*********** CASSANDRA STANDARD ERROR ************\n"
"*************************************************\n\n"
)
log.write(header)
log.write(_clean_cassandra_log(err))
if p.returncode != 0 or "error" in err.lower() or "error" in out.lower():
raise CassandraRuntimeError(
"Cassandra exited with an error, "
"see {} for details.".format(log_file)
)
def _clean_cassandra_log(string):
"""Strip the text formatting from Cassandra output"""
string = re.sub(r"\^\[\[0m", "", string)
string = re.sub(r"\^\[\[1m", "", string)
return string