#!/usr/bin/env python3 """rpPoscdaq: fast data acquistion using the oscilloscope client of the MCPHA application running on a RedPitaya FPGA board When the network connection is established, the oscilloscope window is activated in data-acquisition mode, i.e. restarted continously. Optionally, triggered waveforms can be stored a file. Code derived from mcpha.py by Pavel Demin This code is compatible with release 20240204 of the alpine linux image https://github.com/pavel-demin/red-pitaya-notes/releases/tag/20240204 """ # Communication with server process is achieved via command codes: # command(code, number, data) # number typically is channel number # # code values # code 2: reset/initialze oscilloscope # code 4: set decimation factor # code 11: read status # code 13: set trigger source chan 1/2 # code 14: set trigger slope rise/fall # code 15: set trigger mode norm/auto # code 16: set trigger level in ADC counts # code 17: set number of pre-trigger samples # code 18: set number of total samples # code 19: start oscilloscope # code 20: read oscilloscope data (two channels at once) # code 21: set pulse fall-time for generator in µs # code 22: set pulse rise-time in ns # code 25: set pulse rate in kHz # code 26: fixed frequency or poisson # code 27_ reset generator # code 28: set bin for pulse distribution, as a histogram with 4096 channels, 0-500 mV # code 29: start generator # code 30: stop generator import argparse import os import sys import time import struct from functools import partial import numpy as np import matplotlib from matplotlib.figure import Figure from multiprocessing import Event from npy_append_array import NpyAppendArray from matplotlib.backends.backend_qt5agg import FigureCanvasQTAgg as FigureCanvas from matplotlib.backends.backend_qt5agg import NavigationToolbar2QT as NavigationToolbar if "PyQt5" in sys.modules: from PyQt5.uic import loadUiType from PyQt5.QtCore import Qt, QTimer, QEventLoop, QRegExp from PyQt5.QtGui import QPalette, QColor, QRegExpValidator from PyQt5.QtWidgets import QApplication, QMainWindow, QDialog, QFileDialog from PyQt5.QtWidgets import QWidget, QLabel, QCheckBox, QComboBox from PyQt5.QtNetwork import QAbstractSocket, QTcpSocket else: from PySide2.QtUiTools import loadUiType from PySide2.QtCore import Qt, QTimer, QEventLoop, QRegExp from PySide2.QtGui import QPalette, QColor, QRegExpValidator from PySide2.QtWidgets import QApplication, QMainWindow, QDialog, QFileDialog from PySide2.QtWidgets import QWidget, QLabel, QCheckBox, QComboBox from PySide2.QtNetwork import QAbstractSocket, QTcpSocket Ui_MCPHA, QMainWindow = loadUiType("rpControl.ui") Ui_LogDisplay, QWidget = loadUiType("mcpha_log.ui") Ui_OscDisplay, QWidget = loadUiType("mcpha_daq.ui") Ui_GenDisplay, QWidget = loadUiType("mcpha_gen.ui") if sys.platform != "win32": path = "." else: path = os.path.expanduser("~") class rpControl(QMainWindow, Ui_MCPHA): """ control Pavel Demin's MCHPA application on RedPitaya Modes of operation: - in oscilloscope mode to just display data - in daq mode, i.e. read data at maximum rate, optionally recording data to disk or calling an external function """ rates = {0:1, 1: 4, 2: 8, 3: 16, 4: 32, 5: 64, 6: 128, 7: 256} def __init__(self, callback=None): """ Args: callback: function receiving recorded waveforms """ super(rpControl, self).__init__() self.callback_function = callback self.callback = True # get command line arguments self.parse_args() # set physical units (for axis labels) self.get_physical_units() self.setupUi(self) # initialize variables self.osc_ready = False self.idle = True self.osc_waiting = False self.state = 0 self.status = np.zeros(9, np.uint32) self.timers = self.status[:4].view(np.uint64) # create tabs self.log = LogDisplay() self.osc_daq = OscDAQ(self, self.log) self.gen = GenDisplay(self, self.log) self.tabindex_log = self.tabWidget.addTab(self.log, "Messages") self.tabindex_osc = self.tabWidget.addTab(self.osc_daq, "Oscilloscope") self.tabindex_gen = self.tabWidget.addTab(self.gen, "Pulse generator") # configure controls self.connectButton.clicked.connect(self.start) self.neg1Check.toggled.connect(partial(self.set_negator, 0)) self.neg2Check.toggled.connect(partial(self.set_negator, 1)) self.rateValue.addItems(map(str, rpControl.rates.values())) self.rateValue.setEditable(True) self.rateValue.lineEdit().setReadOnly(True) self.rateValue.lineEdit().setAlignment(Qt.AlignRight) for i in range(self.rateValue.count()): self.rateValue.setItemData(i, Qt.AlignRight, Qt.TextAlignmentRole) self.rateValue.setCurrentIndex(1) self.rateValue.currentIndexChanged.connect(self.set_rate) rx = QRegExp(r"^(([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.){3}([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])|rp-[0-9A-Fa-f]{6}\.local$") self.addrValue.setValidator(QRegExpValidator(rx, self.addrValue)) # create TCP socket self.socket = QTcpSocket(self) self.socket.connected.connect(self.connected) self.socket.error.connect(self.display_error) # create event loop self.loop = QEventLoop() self.socket.readyRead.connect(self.loop.quit) self.socket.error.connect(self.loop.quit) # create timers self.startTimer = QTimer(self) # timer for network connectin self.startTimer.timeout.connect(self.start_timeout) self.readTimer = QTimer(self) # for readout self.readTimer.timeout.connect(self.update_oscDisplay) # transfer command-line arguments to gui self.osc_daq.levelValue.setValue(self.trigger_level) self.osc_daq.autoButton.setChecked(self.trigger_mode) self.osc_daq.ch2Button.setChecked(self.trigger_source) self.osc_daq.fallingButton.setChecked(self.trigger_slope) # automatically connect if self.ip_address is not None: self.addrValue.setText(self.ip_address) self.start() else: self.addrValue.setStyleSheet("color: darkorange") def parse_args(self): parser = argparse.ArgumentParser(description=__doc__) parser.add_argument('-c', '--connect_ip', type=str, help='connect IP address of RedPitaya') # for oscilloscope display parser.add_argument('-i', '--interval', type=int, default=500, help='interval for readout (in ms)') # trigger mode parser.add_argument('-t', '--trigger_level', type=int, default=500, help='trigger level in ADC counts') parser.add_argument('--trigger_slope', type=str, choices={'rising','falling'}, default='rising', help='trigger slope') parser.add_argument('--trigger_source', type=int, choices={1, 2}, default=1, help='trigger channel') parser.add_argument('--trigger_mode', type=str, choices={'norm','auto'}, default='norm', help='trigger mode') parser.add_argument('-s', '--sample_size', type=int, default=2048, help='size of waveform sample') parser.add_argument('-p', '--pretrigger_fraction', type=float, default=0.05, help='pretrigger fraction') # for daq mode parser.add_argument('--no-daq', dest='daq_mode', action='store_false', help='do not start in DAQ mode' ) parser.add_argument('-f', '--file', type=str, default='', help='file name') args = parser.parse_args() # all relevant parameters are here self.ip_address = args.connect_ip self.daq_mode = args.daq_mode self.sample_size = args.sample_size self.pretrigger_fraction = args.pretrigger_fraction self.readInterval = args.interval # used to control oscilloscope display # self.trigger_level = args.trigger_level self.trigger_source = args.trigger_source - 1 self.trigger_mode = 0 if args.trigger_mode == 'norm' else 1 self.trigger_slope = 0 if args.trigger_slope == 'rising' else 1 # other parameters self.filename = args.file def get_physical_units(self): """get physical units corresponding to ADC units and channel numbers """ # Voltages: 4096 channels/Volt self.adc_unit = 1000/4096 # time per sample at sampling rate of 125 MHz / decimation_factor self.time_bin = 0.008 # !gq end def start(self): self.socket.connectToHost(self.addrValue.text(), 1001) # connect to port 1001 (mcpha_server on RP) self.startTimer.start(1000) # shorter time-out self.connectButton.setText("Disconnect") self.connectButton.clicked.disconnect() self.connectButton.clicked.connect(self.stop) def stop(self): self.osc_daq.stop() self.gen.stop() self.readTimer.stop() self.startTimer.stop() self.loop.quit() self.socket.abort() self.connectButton.setText("Connect") self.connectButton.clicked.disconnect() self.connectButton.clicked.connect(self.start) self.addrValue.setStyleSheet("color: red") self.log.print("IO stopped") self.idle = True def closeEvent(self, event): self.stop() def start_timeout(self): self.log.print("error: connection timeout") self.stop() def display_error(self): self.log.print("error: %s" % self.socket.errorString()) self.stop() def connected(self): # coming here when connection is established self.startTimer.stop() self.log.print("IO started") self.addrValue.setStyleSheet("color: green") self.tabWidget.setCurrentIndex(self.tabindex_osc) # initialize variables for readout self.idle = False self.osc_waiting = False self.daq_waiting = False self.osc_reset = False self.osc_start = False self.state = 0 self.set_rate(self.rateValue.currentIndex()) self.set_negator(0, self.neg1Check.isChecked()) self.set_negator(1, self.neg2Check.isChecked()) # # finally, start readout timer calling update_oscDisplay() self.readTimer.start(self.readInterval) #### start oscilloscope in DAQ mode #! if self.interactive and self.daq_mode: if self.daq_mode: self.osc_daq() # __call__ method of osc_daq class def command(self, code, number, value): self.socket.write(struct.pack("<Q", code << 56 | number << 52 | (int(value) & 0xFFFFFFFFFFFFF))) def read_data(self, data): view = data.view(np.uint8) size = view.size while self.socket.state() == QAbstractSocket.ConnectedState and self.socket.bytesAvailable() < size: self.loop.exec_() if self.socket.bytesAvailable() < size: return False else: view[:] = np.frombuffer(self.socket.read(size), np.uint8) return True def update_oscDisplay(self): """get new data from RP and update oscilloscope display (triggered by QTimer:readTimer) """ if not self.osc_waiting: return if self.osc_reset: self.reset_osc() if self.osc_start: self.start_osc() # check status self.read_status() if not self.read_data(self.status): self.log.print("failed to read status") return if not self.status[8] & 1: self.command(20, 0, 0) # read oscilloscope data if self.read_data(self.osc_daq.buffer): self.osc_daq.update_osci_display() self.mark_reset_osc() self.mark_start_osc() else: self.log.print("failed to read oscilloscope data") return def run_oscDaq(self): """continuous data transfer from RedPitaya """ # depends on # self.start_daq() # osc.process_data() # initialize trigger mode etc. self.osc_daq.start_daq() # self.reset_osc() self.start_osc() while self.daq_waiting: # check status self.read_status() if not self.read_data(self.status): self.log.print("failed to read status") return if not self.status[8] & 1: self.command(20, 0, 0) # read oscilloscope data if self.read_data(self.osc_daq.buffer): self.timestamp = time.time() self.reset_osc() self.osc_daq.process_data() if self.callback: self.callback_function(self.osc_daq.data) self.start_osc() self.osc_daq.deadT += time.time() - self.timestamp else: self.log.print("failed to read oscilloscope data") return def mark_start_osc(self): self.osc_start = True # self.osc_waiting = True def mark_reset_osc(self): self.osc_reset = True def reset_osc(self): self.command(2, 0, 0) self.osc_reset=False def start_osc(self): self.command(19, 0, 0) self.osc_start = False def read_status(self): self.command(11, 0, 0) def stop_osc(self): self.reset_osc() self.osc_waiting = False self.daq_waiting = False def set_rate(self, index): # set RP decimation factor self.command(4, 0, rpControl.rates[index]) def set_negator(self, number, value): self.command(5, number, value) def set_trg_source(self, number): self.command(13, number, 0) def set_trg_slope(self, value): self.command(14, 0, value) def set_trg_mode(self, value): self.command(15, 0, value) def set_trg_level(self, value): self.command(16, 0, value) def set_osc_pre(self, value): self.command(17, 0, value) def set_osc_tot(self, value): self.command(18, 0, value) def set_gen_fall(self, value): self.command(21, 0, value) def set_gen_rise(self, value): self.command(22, 0, value) def set_gen_rate(self, value): self.command(25, 0, value) def set_gen_dist(self, value): self.command(26, 0, value) def reset_gen(self): self.command(27, 0) def set_gen_bin(self, value): self.command(28, 0, value) def start_gen(self): self.command(29, 0, 1) def stop_gen(self): self.command(30, 0, 0) class LogDisplay(QWidget, Ui_LogDisplay): def __init__(self): super(LogDisplay, self).__init__() self.setupUi(self) def print(self, text): self.logViewer.appendPlainText(text) class OscDAQ(QWidget, Ui_OscDisplay): """Oscilloscope display with daq mode: in daq_mode: - data is read from the RedPitaya at maximum rate - statistics is accumulated and displayed - optinally, data is output to a file or sent to an external function """ def __init__(self, rpControl, log): super(OscDAQ, self).__init__() self.setupUi(self) # initialize variables self.rpControl = rpControl self.log = log self.l_tot = self.rpControl.sample_size self.pre = self.rpControl.pretrigger_fraction * self.l_tot self.buffer = np.zeros(self.l_tot * 2, np.int16) # same memory with different view of shape(2, samples_per_channel) self.data = self.buffer.reshape(2, self.l_tot, order='F') self.filename = rpControl.filename if self.rpControl.filename != '' else None # create figure self.figure = Figure() if sys.platform != "win32": self.figure.set_facecolor("none") # gq self.figure.subplots_adjust(left=0.18, bottom=0.08, right=0.98, top=0.95) self.figure.subplots_adjust(left=0.10, bottom=0.08, right=0.90, top=0.92) self.canvas = FigureCanvas(self.figure) self.plotLayout.addWidget(self.canvas) self.ax = self.figure.add_subplot(111) self.ax.grid() self.ax.set_xlim(-0.02*self.l_tot, 1.02*self.l_tot) self.ax.set_ylim(-4500, 4500) self.xunit = "[{:d} ns / sample]".format(4*8) self.ax.set_xlabel("sample number " + self.xunit) self.yunit = "[{:.3f} mV]".format(1./4.096) self.ax.set_ylabel("ADC units " + self.yunit) # self.ax.set_xlabel("sample number") # self.ax.set_ylabel("ADC units") # gq self.ax_x2=self.ax.secondary_xaxis('top', functions=(self.tbin2t, self.t2tbin)) self.ax_x2.set_xlabel("time [µs]", color='grey', size='x-large') self.ax_y2=self.ax.secondary_yaxis('right', functions=(self.adc2mV, self.mV2adc)) self.ax_y2.set_ylabel("Voltage [mV]", color='grey', size='x-large') # gq end self.osctxt= self.ax.text(0.1, 0.96, ' ', transform=self.ax.transAxes, color="darkblue", alpha=0.7) x = np.arange(self.l_tot) (self.curve2,) = self.ax.plot(x, self.buffer[1::2], color="#00CCCC", label="chan 2") (self.curve1,) = self.ax.plot(x, self.buffer[0::2], color="#FFAA00", label="chan 1") self.ax.legend(handles=[self.curve1, self.curve2]) self.line = [None, None] self.line[0] = self.ax.axvline(self.pre, linestyle="dotted") self.line[1] = self.ax.axhline(self.levelValue.value(), linestyle="dotted") self.canvas.draw() # create navigation toolbar self.toolbar = NavigationToolbar(self.canvas, None, False) self.toolbar.layout().setSpacing(6) # remove subplots action actions = self.toolbar.actions() self.toolbar.removeAction(actions[6]) self.toolbar.removeAction(actions[7]) # configure colors self.plotLayout.addWidget(self.toolbar) palette = QPalette(self.ch1Label.palette()) palette.setColor(QPalette.Window, QColor("#FFAA00")) palette.setColor(QPalette.WindowText, QColor("black")) self.ch1Label.setAutoFillBackground(True) self.ch1Label.setPalette(palette) self.ch1Value.setAutoFillBackground(True) self.ch1Value.setPalette(palette) palette.setColor(QPalette.Window, QColor("#00CCCC")) palette.setColor(QPalette.WindowText, QColor("black")) self.ch2Label.setAutoFillBackground(True) self.ch2Label.setPalette(palette) self.ch2Value.setAutoFillBackground(True) self.ch2Value.setPalette(palette) # configure controls self.autoButton.toggled.connect(self.rpControl.set_trg_mode) self.ch2Button.toggled.connect(self.rpControl.set_trg_source) self.fallingButton.toggled.connect(self.rpControl.set_trg_slope) self.levelValue.valueChanged.connect(self.set_trg_level) self.startButton.clicked.connect(self.start) self.startDAQButton.clicked.connect(self.rpControl.run_oscDaq) self.saveButton.clicked.connect(self.save) self.loadButton.clicked.connect(self.load) self.canvas.mpl_connect("motion_notify_event", self.on_motion) self.rpControl.osc_ready = True def __call__(self): # run in data acquisition mode self.rpControl.run_oscDaq() # gq helper functions to convert acd channels and sampling time to physical units def tbin2t(self, tbin): """convert time bin to time in µs """ dfi = self.rpControl.rateValue.currentIndex() # current decimation factor index df = 4 if dfi == -1 else rpControl.rates[dfi] # decimation factor return (tbin-self.pre)*self.rpControl.time_bin * df def t2tbin(self, t): """convert time in µs to time bin """ dfi = self.rpControl.rateValue.currentIndex() # current decimation factor index df = 4 if dfi == -1 else rpControl.rates[dfi] # decimation factor return t/(self.rpControl.time_bin * df)+self.pre def adc2mV(self, c): """convert adc-count to Voltage in mV """ return c * self.rpControl.adc_unit def mV2adc(self, v): """convert voltage in mV to adc-count """ return v / self.rpControl.adc_unit # gq end def setup_trigger(self): # extract trigger parameters from gui and send to server self.trg_mode = int(self.autoButton.isChecked()) self.trg_source = int(self.ch2Button.isChecked()) self.trg_slope = int(self.fallingButton.isChecked()) self.trg_level = self.levelValue.value() self.rpControl.set_trg_mode(self.trg_mode) self.rpControl.set_trg_source(self.trg_source) self.rpControl.set_trg_slope(self.trg_slope) self.rpControl.set_trg_level(self.trg_level) self.rpControl.set_osc_pre(self.pre) self.rpControl.set_osc_tot(self.l_tot) def set_gui4start(self): self.startButton.setText("Start") self.startButton.setStyleSheet("") self.startButton.clicked.disconnect() self.startButton.clicked.connect(self.start) self.startDAQButton.setEnabled(True) def set_gui4stop(self): # set start and stop buttons self.startButton.setText("Stop") self.startButton.setStyleSheet("color: red") self.startButton.clicked.disconnect() self.startButton.clicked.connect(self.stop) self.startDAQButton.setEnabled(False) def start(self): """start oscilloscope display """ if self.rpControl.idle: return # self.setup_trigger() self.set_gui4stop() # self.rpControl.mark_reset_osc() self.rpControl.mark_start_osc() self.rpControl.osc_waiting = True self.osctxt.set_text('') self.log.print("oscilloscope started") def start_daq(self): """start oscilloscope in daq mode """ if self.rpControl.idle: return # initialize daq statisics self.NTrig = 0 self.dN = 0 self.Nprev = self.NTrig self.T0 = time.time() self.deadT = 0. self.Tprev = self.T0 self.dT = 0. # set-up trigger and GUI Buttons self.setup_trigger() self.set_gui4stop() self.rpControl.daq_waiting = True self.log.print("daq started") def stop(self): self.rpControl.stop_osc() self.set_gui4start() self.log.print("oscilloscope stopped") def process_data(self): """ statistics recorded data: - count number of Triggers and keep account of timing; - graphical display of a subset of the data """ self.NTrig += 1 t = time.time() dt = t - self.Tprev self.Tprev = t self.dT += dt dead_time_fraction = self.deadT/dt self.deadT = 0. # # do something with data (e.g. pass to sub-process via mpQueue) # write to file: if self.filename is not None: with NpyAppendArray(self.filename) as npa: npa.append( np.array([self.data]) ) # # output status and update scope display once per second if 1000*self.dT >= self.rpControl.readInterval: dN = self.NTrig - self.Nprev r = dN/self.dT self.Nprev = self.NTrig T_active = t - self.T0 self.dT = 0. status_txt = "active: {:.1f}s trigger rate: {:.2f} Hz, data rate: {:.4g} MB/s".format(T_active, r, r*self.l_tot*4e-6) # print(status_txt, end='\r') self.osctxt.set_text(status_txt) # update graph on display self.curve1.set_ydata(self.data[0]) self.curve2.set_ydata(self.data[1]) self.xunit = "[{:d} ns / sample]".format(8*rpControl.rates[self.rpControl.rateValue.currentIndex()]) self.ax.set_xlabel("sample number " + self.xunit) self.canvas.draw() def update_osci_display(self): self.curve1.set_ydata(self.buffer[0::2]) self.curve2.set_ydata(self.buffer[1::2]) self.xunit = "[{:d} ns / sample]".format(8*rpControl.rates[self.rpControl.rateValue.currentIndex()]) self.ax.set_xlabel("sample number " + self.xunit) self.canvas.draw() def set_trg_level(self, value): self.line[1].set_ydata([value, value]) self.canvas.draw() self.rpControl.set_trg_level(value) def on_motion(self, event): if event.inaxes != self.ax: return x = int(event.xdata + 0.5) if x < 0: x = 0 if x >= self.l_tot: x = self.l_tot - 1 y1 = self.curve1.get_ydata(True)[x] y2 = self.curve2.get_ydata(True)[x] self.timeValue.setText("%d" % x) self.ch1Value.setText("%d" % y1) self.ch2Value.setText("%d" % y2) def save(self): try: dialog = QFileDialog(self, "Save osc file", path, "*.osc") dialog.setDefaultSuffix("osc") name = "oscillogram-%s.osc" % time.strftime("%Y%m%d-%H%M%S") dialog.selectFile(name) dialog.setAcceptMode(QFileDialog.AcceptSave) if dialog.exec() == QDialog.Accepted: name = dialog.selectedFiles() self.buffer.tofile(name[0]) self.log.print("histogram %d saved to file %s" % ((self.number + 1), name[0])) except: self.log.print("error: %s" % sys.exc_info()[1]) def load(self): try: dialog = QFileDialog(self, "Load osc file", path, "*.osc") dialog.setDefaultSuffix("osc") dialog.setAcceptMode(QFileDialog.AcceptOpen) if dialog.exec() == QDialog.Accepted: name = dialog.selectedFiles() self.buffer[:] = np.fromfile(name[0], np.int16) self.update() except: self.log.print("error: %s" % sys.exc_info()[1]) class GenDisplay(QWidget, Ui_GenDisplay): def __init__(self, rpControl, log): super(GenDisplay, self).__init__() self.setupUi(self) # initialize variables self.rpControl = rpControl self.log = log self.bins = 4096 self.buffer = np.zeros(self.bins, np.uint32) for i in range(16): # initialize with delta-functions self.buffer[(i+1)*256-1] = 1 # create figure self.figure = Figure() if sys.platform != "win32": self.figure.set_facecolor("none") # !gq self.figure.subplots_adjust(left=0.18, bottom=0.08, right=0.98, top=0.95) self.figure.subplots_adjust(left=0.08, bottom=0.08, right=0.98, top=0.92) self.canvas = FigureCanvas(self.figure) self.plotLayout.addWidget(self.canvas) self.ax = self.figure.add_subplot(111) self.ax.grid() self.ax.set_ylabel("counts") self.xunit = "[0.122 mV /channel]" self.ax.set_xlabel("channel number " + self.xunit) # self.ax.set_xlabel("channel number") # !gq self.ax_x2=self.ax.secondary_xaxis('top', functions=(self.adc2mV, self.mV2adc)) self.ax_x2.set_xlabel("Voltage [mV]", color='grey') x = np.arange(self.bins) (self.curve,) = self.ax.plot(x, self.buffer, drawstyle="steps-mid", color="#FFAA00") # create navigation toolbar self.toolbar = NavigationToolbar(self.canvas, None, False) self.toolbar.layout().setSpacing(6) # remove subplots action actions = self.toolbar.actions() self.toolbar.removeAction(actions[6]) self.toolbar.removeAction(actions[7]) self.logCheck = QCheckBox("log scale") self.logCheck.setChecked(False) self.toolbar.addSeparator() self.toolbar.addWidget(self.logCheck) self.plotLayout.addWidget(self.toolbar) # configure controls actions[0].triggered.disconnect() actions[0].triggered.connect(self.home) self.logCheck.toggled.connect(self.set_scale) self.startButton.clicked.connect(self.start) self.loadButton.clicked.connect(self.load) # gq def adc2mV(self, c): """convert adc-count to Voltage in mV !!! there is a factor of two betwenn channel definition here and in hstDisplay ! """ return c * self.rpControl.adc_unit/2 def mV2adc(self, v): """convert voltage in mV to adc-count """ return v * self.rpControl.adc_unit*2 # gq end def start(self): if self.rpControl.idle: return self.rpControl.set_gen_fall(self.fallValue.value()) self.rpControl.set_gen_rise(self.riseValue.value()) self.rpControl.set_gen_rate(self.rateValue.value() * 1000) self.rpControl.set_gen_dist(self.poissonButton.isChecked()) for value in np.arange(self.bins, dtype=np.uint64) << 32 | self.buffer: self.rpControl.set_gen_bin(value) self.rpControl.start_gen() self.startButton.setText("Stop") self.startButton.clicked.disconnect() self.startButton.clicked.connect(self.stop) self.log.print("generator started") def stop(self): self.rpControl.stop_gen() self.startButton.setText("Start") self.startButton.clicked.disconnect() self.startButton.clicked.connect(self.start) self.log.print("generator stopped") def home(self): self.set_scale(self.logCheck.isChecked()) def set_scale(self, checked): self.toolbar.home() self.toolbar.update() if checked: self.ax.set_ylim(1, 1e10) self.ax.set_yscale("log") else: self.ax.set_ylim(auto=True) self.ax.set_yscale("linear") self.ax.relim() self.ax.autoscale_view(scalex=True, scaley=True) self.canvas.draw() def load(self): try: dialog = QFileDialog(self, "Load gen file", path, "*.gen") dialog.setDefaultSuffix("gen") dialog.setAcceptMode(QFileDialog.AcceptOpen) if dialog.exec() == QDialog.Accepted: name = dialog.selectedFiles() self.buffer[:] = np.loadtxt(name[0], np.uint32) self.curve.set_ydata(self.buffer) self.ax.relim() self.ax.autoscale_view(scalex=False, scaley=True) self.canvas.draw() except: self.log.print("error: %s" % sys.exc_info()[1]) class redP_consumer(): def __init__(self): self.NTrig = 0 self.dN = 0 self.Nprev = self.NTrig self.T0 = time.time() self.Tprev = self.T0 self.dT = 0. def data_sink(self, data): """function called by redPoscdaq this simple version calculates statistics only """ self.databuffer = data # analyze data self.NTrig += 1 t = time.time() dt = t - self.Tprev self.Tprev = t self.dT += dt l_tot = len(self.databuffer[0]) # output status and update scope display once per second if self.dT >= 1.: dN = self.NTrig - self.Nprev r = dN/self.dT self.Nprev = self.NTrig T_active = t - self.T0 self.dT = 0. status_txt = "active: {:.1f}s trigger rate: {:.2f} Hz, data rate: {:.4g} MB/s".format(T_active, r, r*l_tot*4e-6) print(status_txt, end='\r') from mimocorb.buffer_control import rbPut class redP_mimocorb(): def __init__(self, source_list=None, sink_list=None, observe_list=None, config_dict=None, **rb_info): # initialize mimoCoRB interface self.action = rbPut(config_dict=config_dict, sink_list=sink_list, **rb_info) self.number_of_channels = len(self.action.sink.dtype) self.events_required = 1000 if "eventcount" not in config_dict else config_dict["eventcount"] self.event_count = 0 self.active=True def data_sink(self, data): """function called by redPoscdaq this simple version calculates statistics only """ while (self.events_required == 0 or self.event_count < self.events_required) and self.active: # deliver pulse data and no metadata active = self.action(data/5000., None) self.event_count += 1 def run_rpControl(callback=None): # start redPidaya GUI under Qt5 app = QApplication(sys.argv) dpi = app.primaryScreen().logicalDotsPerInch() matplotlib.rcParams["figure.dpi"] = dpi application = rpControl(callback=callback) application.show() sys.exit(app.exec_()) if __name__ == '__main__': # -------------------------------------------- # run_rpControl() sink = redP_consumer() print(sys.argv) run_rpControl(callback=sink.data_sink)