Source code for seabird_ctd.ctd_models

import datetime
import re
from datetime import timezone

import six

[docs]class CTDCommandObject(object): """ Just an empty class that can be subclassed so that any isinstance checks would work """ operation_wait_times = {} # keys are for commands - if there is a key for a command here, it waits that many seconds for that operation before trying to read
[docs] def clean_status(self, status): """ REMINDER: THIS REMOVES *ALL* empty lines, not just leading ones (eg, SBE19plus which has some in the middle) :param status: :return: """ remove_lines = ["?cmd S>", "S>", "DS", ""] return [line for line in status if line not in remove_lines] # remove the bad items from the list that confuse the parsers
[docs] def clean_response(self, response): return response
[docs] def txrealtime(self, value): return "TXREALTIME={}".format(value) # a smart default for this command
[docs]class SBE37S(CTDCommandObject): """ Handles the SBE37S commands """ def __init__(self, main_ctd): self.max_samples = 3655394 # needs verification. This is just a guess based on SBE39 self.keys = ("pressure", "conductivity", "temperature", "salinity", "datetime") self.ctd = main_ctd self.setup()
[docs] def setup(self): self.operation_wait_time = 0.5 self.supports_commands_while_logging = True
[docs] def set_datetime(self): dt = datetime.datetime.now(timezone.utc) return ["DATETIME={}".format(dt.strftime("%m%d%Y%H%M%S"))]
[docs] def sample_interval(self, interval): return "SAMPLEINTERVAL={}".format(interval)
[docs] def retrieve_samples(self, start, end): return [ "OUTPUTFORMAT=1", "GetSamples:{},{}".format(start,end) ]
[docs] def parse_status(self, status_message): return_dict = {} return_dict["full_model"] = status_message[1].split(" ")[0] # verified return_dict["serial_number"] = status_message[1].split(" ")[4] # verified voltages = re.match("vMain\s+=\s+(\d+\.\d+),\s+vLith\s+=\s+(\d+\.\d+)", status_message[2]) return_dict["battery_voltage"] = voltages.group(1) # group zero is whole match, so start with group 1 return_dict["lithium_voltage"] = voltages.group(2) return_dict["sample_number"] = status_message[3].split(", ")[0].split(" = ")[1].replace(" ", "") return_dict["is_sampling"] = True if status_message[4] == "logging" else False return_dict["salinity_output"] = True if "output salinity" in status_message else False return return_dict
[docs] def record_regex(self): """ Handles generation of the regex for the data records. If salinity output is turned on, handles that correctly. STILL TO DO - handle other optional values, such as sound velocity - what happens if sound velocity is on, but salinity is off, or if both are on? :return: """ if self.ctd.salinity_output: salinity_insert = "\s+(?P<salinity>\d+\.\d+)," else: salinity_insert = "" self.regex = "(?P<temperature>-?\d+\.\d+),\s+(?P<conductivity>-?\d+\.\d+),\s+(?P<pressure>-?\d+\.\d+),"+salinity_insert+"\s+(?P<datetime>\d+\s\w+\s\d{4},\s\d{2}:\d{2}:\d{2})" return self.regex
[docs]class SBE37SM(SBE37S):
[docs] def parse_status(self, status_message): if six.PY3: return_dict = super().parse_status(status_message) # call parent status method - then override one item else: return_dict = super(SBE37S, self).parse_status(status_message) return_dict["serial_number"] = status_message[1].split(" ")[5] # verified return return_dict
[docs]class SBE39(CTDCommandObject): def __init__(self, main_ctd): self.max_samples = 3655394 self.keys = ("temperature", "pressure", "datetime") self.ctd = main_ctd self.setup()
[docs] def setup(self): self.operation_wait_time = 0.5 self.supports_commands_while_logging = True
def _confirm_model(self, status_message): """ For models with multiple firmware versions, this takes the results of a DS and returns an activated instance of the correct class :return: object """ model = re.match("(SBE\s?39 .*?)\s+SERIAL NO.*", status_message[1]) try: full_model = model.group(1) except: self.ctd.log.error("Couldn't match model in status_message: '{}'".format(status_message[0])) raise if full_model in supported_ctds: # basically, check if there's a better match based on the full model output in the status message return supported_ctds[full_model](self.ctd) else: # if there's not, keep using this one return self
[docs] def set_datetime(self): dt = datetime.datetime.now(timezone.utc) return ["MMDDYY={}".format(dt.strftime("%m%d%y")), "HHMMSS={}".format(dt.strftime("%H%M%S"))]
[docs] def sample_interval(self, interval): return "INTERVAL={}".format(interval)
[docs] def retrieve_samples(self, start, end): return [ "DD{},{}".format(start, end) ]
[docs] def record_regex(self): self.regex = "(?P<temperature>-?\d+\.\d+),\s+(?P<pressure>-?\d+\.\d+),\s+(?P<datetime>\d+\s\w+\s\d{4},\s\d{2}:\d{2}:\d{2})" return self.regex
[docs] def parse_status(self, status_message): return_dict = {} return_dict["full_model"] = status_message[1].split(" ")[0] return_dict["serial_number"] = status_message[1].split(" ")[1] return_dict["battery_voltage"] = status_message[2].split(" = ")[1] return_dict["sample_number"] = status_message[5].split(", ")[0].split(" = ")[1] return_dict["is_sampling"] = True if status_message[3] == "logging data" else False return return_dict
[docs]class SBE3915(SBE39): """ Older SBE39 - firmware 1.5 - similar, but different status parsing """
[docs] def setup(self): self.operation_wait_time = 2 self.max_samples = 230000 self.supports_commands_while_logging = False
[docs] def parse_status(self, status_message): status_message = self.clean_status(status_message) if len(status_message) == 0 or status_message[0] != "DS": # this model seems to get put to sleep after starting autosampling - this is a hackish workaround, but we want to check the status then, so try again return None return_dict = {} model = re.match("(SBE39 [\d\.]+?)\s+SERIAL NO\.\s+(\d+)\s+.*", status_message[1]) return_dict["full_model"] = model.group(1) # group zero is whole match, so start with group 1 return_dict["serial_number"] = model.group(2) return_dict["sample_number"] = status_message[4].split(", ")[0].split(" = ")[1] return_dict["is_sampling"] = True if status_message[2] == "logging data" else False return_dict["battery_voltage"] = None return return_dict
[docs] def clean_response(self, response): return response.replace(b"\xc5", b"")
[docs]class SBE19plus(CTDCommandObject): def __init__(self, main_ctd): self.max_samples = 727000 self.keys = ("temperature", "pressure", "conductivity", "salinity", "datetime") self.ctd = main_ctd self.setup()
[docs] def setup(self): self.operation_wait_time = 2 #self.max_samples = 230000 self.supports_commands_while_logging = False self.operation_wait_times["DS"] = 8
[docs] def set_datetime(self): dt = datetime.datetime.now(timezone.utc) return ["MMDDYY={}".format(dt.strftime("%m%d%y")), "HHMMSS={}".format(dt.strftime("%H%M%S"))]
[docs] def sample_interval(self, interval): return "SAMPLEINTERVAL={}".format(interval)
[docs] def txrealtime(self, value): return "MOOREDTXREALTIME={}".format(value) # a smart default for this command
[docs] def parse_status(self, status_message): status_message = self.clean_status(status_message) return_dict = {} model_parts = re.match("^(.+?)\s+SERIAL\s+NO.\s+(\d+)\s+.*$", status_message[0]) return_dict["full_model"] = model_parts.group(1) return_dict["serial_number"] = model_parts.group(2) voltages = re.match("vbatt\s+=\s+(\d+\.\d+),\s+vlith\s+=\s+(\d+\.\d+)", status_message[1]) return_dict["battery_voltage"] = voltages.group(1) # group zero is whole match, so start with group 1 return_dict["lithium_voltage"] = voltages.group(2) return_dict["sample_number"] = status_message[6].split(", ")[0].split(" = ")[1].replace(" ", "") return_dict["is_sampling"] = True if status_message[4] == "status = logging" else False return_dict["salinity_output"] = True if status_message[15].split(" ")[3] == "yes," else False return return_dict
[docs] def record_regex(self): """ Handles generation of the regex for the data records. If salinity output is turned on, handles that correctly. STILL TO DO - handle other optional values, such as sound velocity - what happens if sound velocity is on, but salinity is off, or if both are on? :return: """ if self.ctd.salinity_output: salinity_insert = "\s+(?P<salinity>\d+\.\d+)," else: salinity_insert = "" self.regex = "(?P<temperature>-?\d+\.\d+),\s+(?P<conductivity>-?\d+\.\d+),\s+(?P<pressure>-?\d+\.\d+),.*?"+salinity_insert+"\s+(?P<datetime>\d+\s\w+\s\d{4},\s\d{2}:\d{2}:\d{2})" return self.regex
supported_ctds = { "SBE37S": SBE37S, "SBE37SM-RS232": SBE37SM, "SBE 39": SBE39, "SBE39": SBE39, # should be OK to assign to main SBE 39 because it will try to detect if there's a more specific one to use "SBE39 1.5": SBE3915, "SBE391.5": SBE3915, "SeacatPlus V 1.6": SBE19plus, "SeacatPlusV1.6": SBE19plus, "SeacatPlus": SBE19plus, } # name the instrument will report, then class name. could also do this with 2-tuples.