208 lines
6.0 KiB
Python
208 lines
6.0 KiB
Python
# Copyright 2021 The Chromium OS Authors. All rights reserved.
|
|
# Use of this source code is governed by a BSD-style license that can be
|
|
# found in the LICENSE file.
|
|
"""Python module for Abstract Instrument Library."""
|
|
|
|
import logging
|
|
import requests
|
|
import socket
|
|
|
|
|
|
class SocketInstrumentError(Exception):
|
|
"""Abstract Instrument Error Class, via Socket and SCPI."""
|
|
|
|
def __init__(self, error, command=None):
|
|
"""Init method for Socket Instrument Error.
|
|
|
|
Args:
|
|
error: Exception error.
|
|
command: Additional information on command,
|
|
Type, Str.
|
|
"""
|
|
super(SocketInstrumentError, self).__init__(error)
|
|
self._error_code = error
|
|
self._error_message = self._error_code
|
|
if command is not None:
|
|
self._error_message = 'Command {} returned the error: {}.'.format(
|
|
repr(command), repr(self._error_message))
|
|
|
|
def __str__(self):
|
|
return self._error_message
|
|
|
|
|
|
class SocketInstrument(object):
|
|
"""Abstract Instrument Class, via Socket and SCPI."""
|
|
|
|
def __init__(self, ip_addr, ip_port):
|
|
"""Init method for Socket Instrument.
|
|
|
|
Args:
|
|
ip_addr: IP Address.
|
|
Type, str.
|
|
ip_port: TCPIP Port.
|
|
Type, str.
|
|
"""
|
|
self._logger = logging.getLogger(__name__)
|
|
self._socket_timeout = 120
|
|
self._socket_buffer_size = 1024
|
|
|
|
self._ip_addr = ip_addr
|
|
self._ip_port = ip_port
|
|
|
|
self._escseq = '\n'
|
|
self._codefmt = 'utf-8'
|
|
|
|
self._socket = None
|
|
|
|
def _connect_socket(self):
|
|
"""Init and Connect to socket."""
|
|
try:
|
|
self._socket = socket.create_connection(
|
|
(self._ip_addr, self._ip_port),
|
|
timeout=self._socket_timeout)
|
|
|
|
infmsg = 'Opened Socket connection to {}:{} with handle {}.'.format(
|
|
repr(self._ip_addr), repr(self._ip_port),
|
|
repr(self._socket))
|
|
|
|
except socket.timeout:
|
|
errmsg = 'Socket timeout while connecting to instrument.'
|
|
raise SocketInstrumentError(errmsg)
|
|
|
|
except socket.error:
|
|
errmsg = 'Socket error while connecting to instrument.'
|
|
raise SocketInstrumentError(errmsg)
|
|
|
|
def _send(self, cmd):
|
|
"""Send command via Socket.
|
|
|
|
Args:
|
|
cmd: Command to send,
|
|
Type, Str.
|
|
"""
|
|
if not self._socket:
|
|
self._connect_socket()
|
|
|
|
cmd_es = cmd + self._escseq
|
|
|
|
try:
|
|
self._socket.sendall(cmd_es.encode(self._codefmt))
|
|
|
|
except socket.timeout:
|
|
errmsg = ('Socket timeout while sending command {} '
|
|
'to instrument.').format(repr(cmd))
|
|
raise SocketInstrumentError(errmsg)
|
|
|
|
except socket.error:
|
|
errmsg = ('Socket error while sending command {} '
|
|
'to instrument.').format(repr(cmd))
|
|
raise SocketInstrumentError(errmsg)
|
|
|
|
except Exception as err:
|
|
errmsg = ('Error {} while sending command {} '
|
|
'to instrument.').format(repr(cmd), repr(err))
|
|
raise SocketInstrumentError(errmsg)
|
|
|
|
def _recv(self):
|
|
"""Receive response via Socket.
|
|
|
|
Returns:
|
|
resp: Response from Instrument via Socket,
|
|
Type, Str.
|
|
"""
|
|
if not self._socket:
|
|
self._connect_socket()
|
|
|
|
resp = ''
|
|
|
|
try:
|
|
while True:
|
|
resp_tmp = self._socket.recv(self._socket_buffer_size)
|
|
resp_tmp = resp_tmp.decode(self._codefmt)
|
|
resp += resp_tmp
|
|
if len(resp_tmp) < self._socket_buffer_size:
|
|
break
|
|
|
|
except socket.timeout:
|
|
errmsg = 'Socket timeout while receiving response from instrument.'
|
|
raise SocketInstrumentError(errmsg)
|
|
|
|
except socket.error:
|
|
errmsg = 'Socket error while receiving response from instrument.'
|
|
raise SocketInstrumentError(errmsg)
|
|
|
|
except Exception as err:
|
|
errmsg = ('Error {} while receiving response '
|
|
'from instrument').format(repr(err))
|
|
raise SocketInstrumentError(errmsg)
|
|
|
|
resp = resp.rstrip(self._escseq)
|
|
|
|
return resp
|
|
|
|
def _close_socket(self):
|
|
"""Close Socket Instrument."""
|
|
if not self._socket:
|
|
return
|
|
|
|
try:
|
|
self._socket.shutdown(socket.SHUT_RDWR)
|
|
self._socket.close()
|
|
self._socket = None
|
|
|
|
except Exception as err:
|
|
errmsg = 'Error {} while closing instrument.'.format(repr(err))
|
|
raise SocketInstrumentError(errmsg)
|
|
|
|
def _query(self, cmd):
|
|
"""query instrument via Socket.
|
|
|
|
Args:
|
|
cmd: Command to send,
|
|
Type, Str.
|
|
|
|
Returns:
|
|
resp: Response from Instrument via Socket,
|
|
Type, Str.
|
|
"""
|
|
self._send(cmd + ';*OPC?')
|
|
resp = self._recv()
|
|
return resp
|
|
|
|
|
|
class RequestInstrument(object):
|
|
"""Abstract Instrument Class, via Request."""
|
|
|
|
def __init__(self, ip_addr):
|
|
"""Init method for request instrument.
|
|
|
|
Args:
|
|
ip_addr: IP Address.
|
|
Type, Str.
|
|
"""
|
|
self._request_timeout = 120
|
|
self._request_protocol = 'http'
|
|
self._ip_addr = ip_addr
|
|
self._escseq = '\r\n'
|
|
|
|
def _query(self, cmd):
|
|
"""query instrument via request.
|
|
|
|
Args:
|
|
cmd: Command to send,
|
|
Type, Str.
|
|
|
|
Returns:
|
|
resp: Response from Instrument via request,
|
|
Type, Str.
|
|
"""
|
|
request_cmd = '{}://{}/{}'.format(self._request_protocol,
|
|
self._ip_addr, cmd)
|
|
resp_raw = requests.get(request_cmd, timeout=self._request_timeout)
|
|
|
|
resp = resp_raw.text
|
|
for char_del in self._escseq:
|
|
resp = resp.replace(char_del, '')
|
|
|
|
return resp
|