Skip to content
GitLab
Explore
Sign in
Primary navigation
Search or go to…
Project
R
redpitaya-MCPHA
Manage
Activity
Members
Labels
Plan
Issues
Issue boards
Milestones
Wiki
Requirements
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Locked files
Build
Pipelines
Jobs
Pipeline schedules
Test cases
Artifacts
Deploy
Releases
Package registry
Model registry
Operate
Environments
Terraform modules
Monitor
Incidents
Analyze
Value stream analytics
Contributor analytics
CI/CD analytics
Repository analytics
Code review analytics
Issue analytics
Insights
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
Community forum
Contribute to GitLab
Provide feedback
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
Günter Quast
redpitaya-MCPHA
Commits
1b958cf9
Commit
1b958cf9
authored
10 months ago
by
Guenter Quast
Browse files
Options
Downloads
Patches
Plain Diff
redPoscdaq: data acquistition with special version of MCPHA oscilloscope
parent
3268f8c8
No related branches found
Branches containing commit
No related tags found
No related merge requests found
Changes
1
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
redPoscdaq.py
+783
-0
783 additions, 0 deletions
redPoscdaq.py
with
783 additions
and
0 deletions
redPoscdaq.py
0 → 100755
+
783
−
0
View file @
1b958cf9
#!/usr/bin/env python3
"""
rpPoscidaq: fast data acquistion using the oscilloscope client of the MCPHA
application running on a RedPitaya FPGA board
When the network connection is established, the oscilloscope window
is activated in data-acquisition mode
Derived from mcpha.py by Pavel Demin
This code is compatible with release 20240204 of the alpine linux image
https://github.com/pavel-demin/red-pitaya-notes/releases/tag/20240204
"""
# Communication with server process is achieved via command codes:
# command(code, number, data) # number typically is channel number
#
# code values
# code 2: reset/initialze oscilloscope
# code 4: set decimation factor
# code 11: read status
# code 13: set trigger source chan 1/2
# code 14: set trigger slope rise/fall
# code 15: set trigger mode norm/auto
# code 16: set trigger level in ADC counts
# code 17: set number of pre-trigger samples
# code 18: set number of total samples
# code 19: start oscilloscope
# code 20: read oscilloscope data (two channels at once)
# code 21: set pulse fall-time for generator in µs
# code 22: set pulse rise-time in ns
# code 25: set pulse rate in kHz
# code 26: fixed frequency or poisson
# code 27_ reset generator
# code 28: set bin for pulse distribution, as a histogram with 4096 channels, 0-500 mV
# code 29: start generator
# code 30: stop generator
import
argparse
import
os
import
sys
import
time
import
struct
from
functools
import
partial
import
numpy
as
np
import
matplotlib
from
matplotlib.figure
import
Figure
from
matplotlib.backends.backend_qt5agg
import
FigureCanvasQTAgg
as
FigureCanvas
from
matplotlib.backends.backend_qt5agg
import
NavigationToolbar2QT
as
NavigationToolbar
if
"
PyQt5
"
in
sys
.
modules
:
from
PyQt5.uic
import
loadUiType
from
PyQt5.QtCore
import
Qt
,
QTimer
,
QEventLoop
,
QRegExp
from
PyQt5.QtGui
import
QPalette
,
QColor
,
QRegExpValidator
from
PyQt5.QtWidgets
import
QApplication
,
QMainWindow
,
QDialog
,
QFileDialog
from
PyQt5.QtWidgets
import
QWidget
,
QLabel
,
QCheckBox
,
QComboBox
from
PyQt5.QtNetwork
import
QAbstractSocket
,
QTcpSocket
else
:
from
PySide2.QtUiTools
import
loadUiType
from
PySide2.QtCore
import
Qt
,
QTimer
,
QEventLoop
,
QRegExp
from
PySide2.QtGui
import
QPalette
,
QColor
,
QRegExpValidator
from
PySide2.QtWidgets
import
QApplication
,
QMainWindow
,
QDialog
,
QFileDialog
from
PySide2.QtWidgets
import
QWidget
,
QLabel
,
QCheckBox
,
QComboBox
from
PySide2.QtNetwork
import
QAbstractSocket
,
QTcpSocket
Ui_MCPHA
,
QMainWindow
=
loadUiType
(
"
rpControl.ui
"
)
Ui_LogDisplay
,
QWidget
=
loadUiType
(
"
mcpha_log.ui
"
)
Ui_OscDisplay
,
QWidget
=
loadUiType
(
"
mcpha_daq.ui
"
)
Ui_GenDisplay
,
QWidget
=
loadUiType
(
"
mcpha_gen.ui
"
)
if
sys
.
platform
!=
"
win32
"
:
path
=
"
.
"
else
:
path
=
os
.
path
.
expanduser
(
"
~
"
)
class
rpControl
(
QMainWindow
,
Ui_MCPHA
):
rates
=
{
0
:
1
,
1
:
4
,
2
:
8
,
3
:
16
,
4
:
32
,
5
:
64
,
6
:
128
,
7
:
256
}
def
__init__
(
self
):
super
(
rpControl
,
self
).
__init__
()
# get command line arguments
self
.
parse_args
()
# set physical units (for axis labels)
self
.
get_physical_units
()
self
.
setupUi
(
self
)
# initialize variables
self
.
idle
=
True
self
.
osc_waiting
=
False
self
.
state
=
0
self
.
status
=
np
.
zeros
(
9
,
np
.
uint32
)
self
.
timers
=
self
.
status
[:
4
].
view
(
np
.
uint64
)
# create tabs
self
.
log
=
LogDisplay
()
self
.
osc_daq
=
OscDAQ
(
self
,
self
.
log
)
self
.
gen
=
GenDisplay
(
self
,
self
.
log
)
self
.
tabindex_log
=
self
.
tabWidget
.
addTab
(
self
.
log
,
"
Messages
"
)
self
.
tabindex_osc
=
self
.
tabWidget
.
addTab
(
self
.
osc_daq
,
"
Oscilloscope
"
)
self
.
tabindex_gen
=
self
.
tabWidget
.
addTab
(
self
.
gen
,
"
Pulse generator
"
)
# configure controls
self
.
connectButton
.
clicked
.
connect
(
self
.
start
)
self
.
neg1Check
.
toggled
.
connect
(
partial
(
self
.
set_negator
,
0
))
self
.
neg2Check
.
toggled
.
connect
(
partial
(
self
.
set_negator
,
1
))
self
.
rateValue
.
addItems
(
map
(
str
,
rpControl
.
rates
.
values
()))
self
.
rateValue
.
setEditable
(
True
)
self
.
rateValue
.
lineEdit
().
setReadOnly
(
True
)
self
.
rateValue
.
lineEdit
().
setAlignment
(
Qt
.
AlignRight
)
for
i
in
range
(
self
.
rateValue
.
count
()):
self
.
rateValue
.
setItemData
(
i
,
Qt
.
AlignRight
,
Qt
.
TextAlignmentRole
)
self
.
rateValue
.
setCurrentIndex
(
1
)
self
.
rateValue
.
currentIndexChanged
.
connect
(
self
.
set_rate
)
rx
=
QRegExp
(
r
"
^(([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.){3}([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])|rp-[0-9A-Fa-f]{6}\.local$
"
)
self
.
addrValue
.
setValidator
(
QRegExpValidator
(
rx
,
self
.
addrValue
))
# create TCP socket
self
.
socket
=
QTcpSocket
(
self
)
self
.
socket
.
connected
.
connect
(
self
.
connected
)
self
.
socket
.
error
.
connect
(
self
.
display_error
)
# create event loop
self
.
loop
=
QEventLoop
()
self
.
socket
.
readyRead
.
connect
(
self
.
loop
.
quit
)
self
.
socket
.
error
.
connect
(
self
.
loop
.
quit
)
# create timers
self
.
startTimer
=
QTimer
(
self
)
# timer for network connectin
self
.
startTimer
.
timeout
.
connect
(
self
.
start_timeout
)
self
.
readTimer
=
QTimer
(
self
)
# for readout
self
.
readTimer
.
timeout
.
connect
(
self
.
update_oscDisplay
)
# transfer command-line arguments to gui
self
.
osc_daq
.
levelValue
.
setValue
(
self
.
trigger_level
)
self
.
osc_daq
.
autoButton
.
setChecked
(
self
.
trigger_mode
)
self
.
osc_daq
.
ch2Button
.
setChecked
(
self
.
trigger_source
)
self
.
osc_daq
.
fallingButton
.
setChecked
(
self
.
trigger_slope
)
# automatically connect
if
self
.
ip_address
is
not
None
:
self
.
addrValue
.
setText
(
self
.
ip_address
)
self
.
start
()
else
:
self
.
addrValue
.
setStyleSheet
(
"
color: darkorange
"
)
def
parse_args
(
self
):
parser
=
argparse
.
ArgumentParser
(
description
=
__doc__
)
parser
.
add_argument
(
'
--no-daq
'
,
dest
=
'
daq_mode
'
,
action
=
'
store_false
'
,
help
=
'
do not start in DAQ mode
'
)
parser
.
add_argument
(
'
-c
'
,
'
--connect_ip
'
,
type
=
str
,
help
=
'
connect IP address of RedPitaya
'
)
parser
.
add_argument
(
'
-i
'
,
'
--interval
'
,
type
=
int
,
default
=
100
,
help
=
'
interval for readout (in ms)
'
)
parser
.
add_argument
(
'
-s
'
,
'
--sample_size
'
,
type
=
int
,
default
=
4096
,
help
=
'
size of waveform sample
'
)
parser
.
add_argument
(
'
-p
'
,
'
--pretrigger_fraction
'
,
type
=
float
,
default
=
0.05
,
help
=
'
pretrigger fraction
'
)
parser
.
add_argument
(
'
-t
'
,
'
--trigger_level
'
,
type
=
int
,
default
=
500
,
help
=
'
trigger level in ADC counts
'
)
parser
.
add_argument
(
'
--trigger_slope
'
,
type
=
str
,
choices
=
{
'
rising
'
,
'
falling
'
},
default
=
'
rising
'
,
help
=
'
trigger slope
'
)
parser
.
add_argument
(
'
--trigger_source
'
,
type
=
int
,
choices
=
{
1
,
2
},
default
=
1
,
help
=
'
trigger channel
'
)
parser
.
add_argument
(
'
--trigger_mode
'
,
type
=
str
,
choices
=
{
'
norm
'
,
'
auto
'
},
default
=
'
norm
'
,
help
=
'
trigger mode
'
)
args
=
parser
.
parse_args
()
# all relevant parameters are here
self
.
ip_address
=
args
.
connect_ip
self
.
daq_mode
=
args
.
daq_mode
self
.
sample_size
=
args
.
sample_size
self
.
pretrigger_fraction
=
args
.
pretrigger_fraction
self
.
readInterval
=
args
.
interval
# used to control oscilloscope display
#
self
.
trigger_level
=
args
.
trigger_level
self
.
trigger_source
=
args
.
trigger_source
-
1
self
.
trigger_mode
=
0
if
args
.
trigger_mode
==
'
norm
'
else
1
self
.
trigger_slope
=
0
if
args
.
trigger_slope
==
'
rising
'
else
1
# other parameters
def
get_physical_units
(
self
):
"""
get physical units corresponding to ADC units and channel numbers
"""
# Voltages: 4096 channels/Volt
self
.
adc_unit
=
1000
/
4096
# time per sample at sampling rate of 125 MHz / decimation_factor
self
.
time_bin
=
0.008
# !gq end
def
start
(
self
):
self
.
socket
.
connectToHost
(
self
.
addrValue
.
text
(),
1001
)
# connect to port 1001 (mcpha_server on RP)
self
.
startTimer
.
start
(
1000
)
# shorter time-out
self
.
connectButton
.
setText
(
"
Disconnect
"
)
self
.
connectButton
.
clicked
.
disconnect
()
self
.
connectButton
.
clicked
.
connect
(
self
.
stop
)
def
stop
(
self
):
self
.
osc_daq
.
stop
()
self
.
gen
.
stop
()
self
.
readTimer
.
stop
()
self
.
startTimer
.
stop
()
self
.
loop
.
quit
()
self
.
socket
.
abort
()
self
.
connectButton
.
setText
(
"
Connect
"
)
self
.
connectButton
.
clicked
.
disconnect
()
self
.
connectButton
.
clicked
.
connect
(
self
.
start
)
self
.
addrValue
.
setStyleSheet
(
"
color: red
"
)
self
.
log
.
print
(
"
IO stopped
"
)
self
.
idle
=
True
def
closeEvent
(
self
,
event
):
self
.
stop
()
def
start_timeout
(
self
):
self
.
log
.
print
(
"
error: connection timeout
"
)
self
.
stop
()
def
display_error
(
self
):
self
.
log
.
print
(
"
error: %s
"
%
self
.
socket
.
errorString
())
self
.
stop
()
def
connected
(
self
):
# coming here when connection is established
self
.
startTimer
.
stop
()
self
.
log
.
print
(
"
IO started
"
)
self
.
addrValue
.
setStyleSheet
(
"
color: green
"
)
self
.
tabWidget
.
setCurrentIndex
(
self
.
tabindex_osc
)
# initialize variables for readout
self
.
idle
=
False
self
.
osc_waiting
=
False
self
.
daq_waiting
=
False
self
.
osc_reset
=
False
self
.
osc_start
=
False
self
.
state
=
0
self
.
set_rate
(
self
.
rateValue
.
currentIndex
())
self
.
set_negator
(
0
,
self
.
neg1Check
.
isChecked
())
self
.
set_negator
(
1
,
self
.
neg2Check
.
isChecked
())
#
# finally, start readout timer calling update_oscDisplay()
self
.
readTimer
.
start
(
self
.
readInterval
)
#### start oscilloscope in DAQ mode
if
self
.
daq_mode
:
self
.
osc_daq
()
# __call__ methood of osc_daq class
def
command
(
self
,
code
,
number
,
value
):
self
.
socket
.
write
(
struct
.
pack
(
"
<Q
"
,
code
<<
56
|
number
<<
52
|
(
int
(
value
)
&
0xFFFFFFFFFFFFF
)))
def
read_data
(
self
,
data
):
view
=
data
.
view
(
np
.
uint8
)
size
=
view
.
size
while
self
.
socket
.
state
()
==
QAbstractSocket
.
ConnectedState
and
self
.
socket
.
bytesAvailable
()
<
size
:
self
.
loop
.
exec_
()
if
self
.
socket
.
bytesAvailable
()
<
size
:
return
False
else
:
view
[:]
=
np
.
frombuffer
(
self
.
socket
.
read
(
size
),
np
.
uint8
)
return
True
def
update_oscDisplay
(
self
):
"""
get new data from RP and update oscilloscope display (triggered by QTimer:readTimer)
"""
if
not
self
.
osc_waiting
:
return
if
self
.
osc_reset
:
self
.
reset_osc
()
if
self
.
osc_start
:
self
.
start_osc
()
# check status
self
.
read_status
()
if
not
self
.
read_data
(
self
.
status
):
self
.
log
.
print
(
"
failed to read status
"
)
return
if
not
self
.
status
[
8
]
&
1
:
self
.
command
(
20
,
0
,
0
)
# read oscilloscope data
if
self
.
read_data
(
self
.
osc_daq
.
buffer
):
self
.
osc_daq
.
update_osci_display
()
self
.
mark_reset_osc
()
self
.
mark_start_osc
()
else
:
self
.
log
.
print
(
"
failed to read oscilloscope data
"
)
return
def
run_oscDaq
(
self
):
"""
continuous data transfer from RedPitaya
"""
# depends on
# self.start_daq()
# osc.process_data()
# initialize trigger mode etc.
self
.
osc_daq
.
start_daq
()
#
self
.
reset_osc
()
self
.
start_osc
()
while
self
.
daq_waiting
:
# check status
self
.
read_status
()
if
not
self
.
read_data
(
self
.
status
):
self
.
log
.
print
(
"
failed to read status
"
)
return
if
not
self
.
status
[
8
]
&
1
:
self
.
command
(
20
,
0
,
0
)
# read oscilloscope data
if
self
.
read_data
(
self
.
osc_daq
.
buffer
):
self
.
reset_osc
()
self
.
osc_daq
.
process_data
()
self
.
start_osc
()
else
:
self
.
log
.
print
(
"
failed to read oscilloscope data
"
)
return
def
mark_start_osc
(
self
):
self
.
osc_start
=
True
# self.osc_waiting = True
def
mark_reset_osc
(
self
):
self
.
osc_reset
=
True
def
reset_osc
(
self
):
self
.
command
(
2
,
0
,
0
)
self
.
osc_reset
=
False
def
start_osc
(
self
):
self
.
command
(
19
,
0
,
0
)
self
.
osc_start
=
False
def
read_status
(
self
):
self
.
command
(
11
,
0
,
0
)
def
stop_osc
(
self
):
self
.
reset_osc
()
self
.
osc_waiting
=
False
self
.
daq_waiting
=
False
def
set_rate
(
self
,
index
):
# set RP decimation factor
self
.
command
(
4
,
0
,
rpControl
.
rates
[
index
])
def
set_negator
(
self
,
number
,
value
):
self
.
command
(
5
,
number
,
value
)
def
set_trg_source
(
self
,
number
):
self
.
command
(
13
,
number
,
0
)
def
set_trg_slope
(
self
,
value
):
self
.
command
(
14
,
0
,
value
)
def
set_trg_mode
(
self
,
value
):
self
.
command
(
15
,
0
,
value
)
def
set_trg_level
(
self
,
value
):
self
.
command
(
16
,
0
,
value
)
def
set_osc_pre
(
self
,
value
):
self
.
command
(
17
,
0
,
value
)
def
set_osc_tot
(
self
,
value
):
self
.
command
(
18
,
0
,
value
)
def
set_gen_fall
(
self
,
value
):
self
.
command
(
21
,
0
,
value
)
def
set_gen_rise
(
self
,
value
):
self
.
command
(
22
,
0
,
value
)
def
set_gen_rate
(
self
,
value
):
self
.
command
(
25
,
0
,
value
)
def
set_gen_dist
(
self
,
value
):
self
.
command
(
26
,
0
,
value
)
def
reset_gen
(
self
):
self
.
command
(
27
,
0
)
def
set_gen_bin
(
self
,
value
):
self
.
command
(
28
,
0
,
value
)
def
start_gen
(
self
):
self
.
command
(
29
,
0
,
1
)
def
stop_gen
(
self
):
self
.
command
(
30
,
0
,
0
)
class
LogDisplay
(
QWidget
,
Ui_LogDisplay
):
def
__init__
(
self
):
super
(
LogDisplay
,
self
).
__init__
()
self
.
setupUi
(
self
)
def
print
(
self
,
text
):
self
.
logViewer
.
appendPlainText
(
text
)
class
OscDAQ
(
QWidget
,
Ui_OscDisplay
):
def
__init__
(
self
,
rpControl
,
log
):
super
(
OscDAQ
,
self
).
__init__
()
self
.
setupUi
(
self
)
# initialize variables
self
.
rpControl
=
rpControl
self
.
log
=
log
self
.
l_tot
=
self
.
rpControl
.
sample_size
self
.
pre
=
self
.
rpControl
.
pretrigger_fraction
*
self
.
l_tot
self
.
buffer
=
np
.
zeros
(
self
.
l_tot
*
2
,
np
.
int16
)
# create figure
self
.
figure
=
Figure
()
if
sys
.
platform
!=
"
win32
"
:
self
.
figure
.
set_facecolor
(
"
none
"
)
# gq self.figure.subplots_adjust(left=0.18, bottom=0.08, right=0.98, top=0.95)
self
.
figure
.
subplots_adjust
(
left
=
0.10
,
bottom
=
0.08
,
right
=
0.90
,
top
=
0.92
)
self
.
canvas
=
FigureCanvas
(
self
.
figure
)
self
.
plotLayout
.
addWidget
(
self
.
canvas
)
self
.
ax
=
self
.
figure
.
add_subplot
(
111
)
self
.
ax
.
grid
()
self
.
ax
.
set_xlim
(
-
0.02
*
self
.
l_tot
,
1.02
*
self
.
l_tot
)
self
.
ax
.
set_ylim
(
-
4500
,
4500
)
self
.
xunit
=
"
[{:d} ns / sample]
"
.
format
(
4
*
8
)
self
.
ax
.
set_xlabel
(
"
sample number
"
+
self
.
xunit
)
self
.
yunit
=
"
[{:.3f} mV]
"
.
format
(
1.
/
4.096
)
self
.
ax
.
set_ylabel
(
"
ADC units
"
+
self
.
yunit
)
# self.ax.set_xlabel("sample number")
# self.ax.set_ylabel("ADC units")
# gq
self
.
ax_x2
=
self
.
ax
.
secondary_xaxis
(
'
top
'
,
functions
=
(
self
.
tbin2t
,
self
.
t2tbin
))
self
.
ax_x2
.
set_xlabel
(
"
time [µs]
"
,
color
=
'
grey
'
,
size
=
'
x-large
'
)
self
.
ax_y2
=
self
.
ax
.
secondary_yaxis
(
'
right
'
,
functions
=
(
self
.
adc2mV
,
self
.
mV2adc
))
self
.
ax_y2
.
set_ylabel
(
"
Voltage [mV]
"
,
color
=
'
grey
'
,
size
=
'
x-large
'
)
# gq end
self
.
osctxt
=
self
.
ax
.
text
(
0.1
,
0.96
,
'
'
,
transform
=
self
.
ax
.
transAxes
,
color
=
"
darkblue
"
,
alpha
=
0.7
)
x
=
np
.
arange
(
self
.
l_tot
)
(
self
.
curve2
,)
=
self
.
ax
.
plot
(
x
,
self
.
buffer
[
1
::
2
],
color
=
"
#00CCCC
"
,
label
=
"
chan 2
"
)
(
self
.
curve1
,)
=
self
.
ax
.
plot
(
x
,
self
.
buffer
[
0
::
2
],
color
=
"
#FFAA00
"
,
label
=
"
chan 1
"
)
self
.
ax
.
legend
(
handles
=
[
self
.
curve1
,
self
.
curve2
])
self
.
line
=
[
None
,
None
]
self
.
line
[
0
]
=
self
.
ax
.
axvline
(
self
.
pre
,
linestyle
=
"
dotted
"
)
self
.
line
[
1
]
=
self
.
ax
.
axhline
(
self
.
levelValue
.
value
(),
linestyle
=
"
dotted
"
)
self
.
canvas
.
draw
()
# create navigation toolbar
self
.
toolbar
=
NavigationToolbar
(
self
.
canvas
,
None
,
False
)
self
.
toolbar
.
layout
().
setSpacing
(
6
)
# remove subplots action
actions
=
self
.
toolbar
.
actions
()
self
.
toolbar
.
removeAction
(
actions
[
6
])
self
.
toolbar
.
removeAction
(
actions
[
7
])
# configure colors
self
.
plotLayout
.
addWidget
(
self
.
toolbar
)
palette
=
QPalette
(
self
.
ch1Label
.
palette
())
palette
.
setColor
(
QPalette
.
Window
,
QColor
(
"
#FFAA00
"
))
palette
.
setColor
(
QPalette
.
WindowText
,
QColor
(
"
black
"
))
self
.
ch1Label
.
setAutoFillBackground
(
True
)
self
.
ch1Label
.
setPalette
(
palette
)
self
.
ch1Value
.
setAutoFillBackground
(
True
)
self
.
ch1Value
.
setPalette
(
palette
)
palette
.
setColor
(
QPalette
.
Window
,
QColor
(
"
#00CCCC
"
))
palette
.
setColor
(
QPalette
.
WindowText
,
QColor
(
"
black
"
))
self
.
ch2Label
.
setAutoFillBackground
(
True
)
self
.
ch2Label
.
setPalette
(
palette
)
self
.
ch2Value
.
setAutoFillBackground
(
True
)
self
.
ch2Value
.
setPalette
(
palette
)
# configure controls
self
.
autoButton
.
toggled
.
connect
(
self
.
rpControl
.
set_trg_mode
)
self
.
ch2Button
.
toggled
.
connect
(
self
.
rpControl
.
set_trg_source
)
self
.
fallingButton
.
toggled
.
connect
(
self
.
rpControl
.
set_trg_slope
)
self
.
levelValue
.
valueChanged
.
connect
(
self
.
set_trg_level
)
self
.
startButton
.
clicked
.
connect
(
self
.
start
)
self
.
startDAQButton
.
clicked
.
connect
(
self
.
rpControl
.
run_oscDaq
)
self
.
saveButton
.
clicked
.
connect
(
self
.
save
)
self
.
loadButton
.
clicked
.
connect
(
self
.
load
)
self
.
canvas
.
mpl_connect
(
"
motion_notify_event
"
,
self
.
on_motion
)
def
__call__
(
self
):
# run in data acquisition mode
self
.
rpControl
.
run_oscDaq
()
# gq helper functions to convert acd channels and sampling time to physical units
def
tbin2t
(
self
,
tbin
):
"""
convert time bin to time in µs
"""
dfi
=
self
.
rpControl
.
rateValue
.
currentIndex
()
# current decimation factor index
df
=
4
if
dfi
==
-
1
else
rpControl
.
rates
[
dfi
]
# decimation factor
return
(
tbin
-
self
.
pre
)
*
self
.
rpControl
.
time_bin
*
df
def
t2tbin
(
self
,
t
):
"""
convert time in µs to time bin
"""
dfi
=
self
.
rpControl
.
rateValue
.
currentIndex
()
# current decimation factor index
df
=
4
if
dfi
==
-
1
else
rpControl
.
rates
[
dfi
]
# decimation factor
return
t
/
(
self
.
rpControl
.
time_bin
*
df
)
+
self
.
pre
def
adc2mV
(
self
,
c
):
"""
convert adc-count to Voltage in mV
"""
return
c
*
self
.
rpControl
.
adc_unit
def
mV2adc
(
self
,
v
):
"""
convert voltage in mV to adc-count
"""
return
v
/
self
.
rpControl
.
adc_unit
# gq end
def
setup_trigger
(
self
):
# extract trigger parameters from gui and send to server
self
.
trg_mode
=
int
(
self
.
autoButton
.
isChecked
())
self
.
trg_source
=
int
(
self
.
ch2Button
.
isChecked
())
self
.
trg_slope
=
int
(
self
.
fallingButton
.
isChecked
())
self
.
trg_level
=
self
.
levelValue
.
value
()
self
.
rpControl
.
set_trg_mode
(
self
.
trg_mode
)
self
.
rpControl
.
set_trg_source
(
self
.
trg_source
)
self
.
rpControl
.
set_trg_slope
(
self
.
trg_slope
)
self
.
rpControl
.
set_trg_level
(
self
.
trg_level
)
self
.
rpControl
.
set_osc_pre
(
self
.
pre
)
self
.
rpControl
.
set_osc_tot
(
self
.
l_tot
)
def
set_gui4start
(
self
):
self
.
startButton
.
setText
(
"
Start
"
)
self
.
startButton
.
setStyleSheet
(
""
)
self
.
startButton
.
clicked
.
disconnect
()
self
.
startButton
.
clicked
.
connect
(
self
.
start
)
self
.
startDAQButton
.
setEnabled
(
True
)
def
set_gui4stop
(
self
):
# set start and stop buttons
self
.
startButton
.
setText
(
"
Stop
"
)
self
.
startButton
.
setStyleSheet
(
"
color: red
"
)
self
.
startButton
.
clicked
.
disconnect
()
self
.
startButton
.
clicked
.
connect
(
self
.
stop
)
self
.
startDAQButton
.
setEnabled
(
False
)
def
start
(
self
):
"""
start oscilloscope display
"""
if
self
.
rpControl
.
idle
:
return
#
self
.
setup_trigger
()
self
.
set_gui4stop
()
#
self
.
rpControl
.
mark_reset_osc
()
self
.
rpControl
.
mark_start_osc
()
self
.
rpControl
.
osc_waiting
=
True
self
.
osctxt
.
set_text
(
''
)
self
.
log
.
print
(
"
oscilloscope started
"
)
def
start_daq
(
self
):
"""
start oscilloscope in daq mode
"""
if
self
.
rpControl
.
idle
:
return
# initialize daq statisics
self
.
NTrig
=
0
self
.
dN
=
0
self
.
Nprev
=
self
.
NTrig
self
.
T0
=
time
.
time
()
self
.
Tprev
=
self
.
T0
self
.
dT
=
0.
# set-up trigger and GUI Buttons
self
.
setup_trigger
()
self
.
set_gui4stop
()
self
.
rpControl
.
daq_waiting
=
True
self
.
log
.
print
(
"
daq started
"
)
def
stop
(
self
):
self
.
rpControl
.
stop_osc
()
self
.
set_gui4start
()
self
.
log
.
print
(
"
oscilloscope stopped
"
)
def
process_data
(
self
):
"""
dummy processing of recorded data:
- count number of Triggers and keep account of timing;
- graphical display of a subset of the data
"""
self
.
NTrig
+=
1
t
=
time
.
time
()
dt
=
t
-
self
.
Tprev
self
.
Tprev
=
t
self
.
dT
+=
dt
# reshape to an array (2, self.ltot)
data
=
self
.
buffer
.
reshape
(
2
,
self
.
l_tot
,
order
=
'
F
'
)
# chan1 = self.buffer[0::2]
# chan2 = self.buffer[1::2]
#
# do something with data (e.g. pass to sub-process via mpQueue)
#
# ....
#
# output status once per second
if
self
.
dT
>=
1.
:
dN
=
self
.
NTrig
-
self
.
Nprev
r
=
dN
/
self
.
dT
self
.
Nprev
=
self
.
NTrig
T_active
=
t
-
self
.
T0
self
.
dT
=
0.
status_txt
=
"
active: {:.1f}s trigger rate: {:.2f} Hz, data rate: {:.4g} MB/s
"
.
format
(
T_active
,
r
,
r
*
self
.
l_tot
*
4e-6
)
print
(
status_txt
,
end
=
'
\r
'
)
self
.
osctxt
.
set_text
(
status_txt
)
# update graph on display
self
.
curve1
.
set_ydata
(
data
[
0
])
self
.
curve2
.
set_ydata
(
data
[
1
])
self
.
xunit
=
"
[{:d} ns / sample]
"
.
format
(
8
*
rpControl
.
rates
[
self
.
rpControl
.
rateValue
.
currentIndex
()])
self
.
ax
.
set_xlabel
(
"
sample number
"
+
self
.
xunit
)
self
.
canvas
.
draw
()
def
update_osci_display
(
self
):
self
.
curve1
.
set_ydata
(
self
.
buffer
[
0
::
2
])
self
.
curve2
.
set_ydata
(
self
.
buffer
[
1
::
2
])
self
.
xunit
=
"
[{:d} ns / sample]
"
.
format
(
8
*
rpControl
.
rates
[
self
.
rpControl
.
rateValue
.
currentIndex
()])
self
.
ax
.
set_xlabel
(
"
sample number
"
+
self
.
xunit
)
self
.
canvas
.
draw
()
def
set_trg_level
(
self
,
value
):
self
.
line
[
1
].
set_ydata
([
value
,
value
])
self
.
canvas
.
draw
()
self
.
rpControl
.
set_trg_level
(
value
)
def
on_motion
(
self
,
event
):
if
event
.
inaxes
!=
self
.
ax
:
return
x
=
int
(
event
.
xdata
+
0.5
)
if
x
<
0
:
x
=
0
if
x
>=
self
.
l_tot
:
x
=
self
.
l_tot
-
1
y1
=
self
.
curve1
.
get_ydata
(
True
)[
x
]
y2
=
self
.
curve2
.
get_ydata
(
True
)[
x
]
self
.
timeValue
.
setText
(
"
%d
"
%
x
)
self
.
ch1Value
.
setText
(
"
%d
"
%
y1
)
self
.
ch2Value
.
setText
(
"
%d
"
%
y2
)
def
save
(
self
):
try
:
dialog
=
QFileDialog
(
self
,
"
Save osc file
"
,
path
,
"
*.osc
"
)
dialog
.
setDefaultSuffix
(
"
osc
"
)
name
=
"
oscillogram-%s.osc
"
%
time
.
strftime
(
"
%Y%m%d-%H%M%S
"
)
dialog
.
selectFile
(
name
)
dialog
.
setAcceptMode
(
QFileDialog
.
AcceptSave
)
if
dialog
.
exec
()
==
QDialog
.
Accepted
:
name
=
dialog
.
selectedFiles
()
self
.
buffer
.
tofile
(
name
[
0
])
self
.
log
.
print
(
"
histogram %d saved to file %s
"
%
((
self
.
number
+
1
),
name
[
0
]))
except
:
self
.
log
.
print
(
"
error: %s
"
%
sys
.
exc_info
()[
1
])
def
load
(
self
):
try
:
dialog
=
QFileDialog
(
self
,
"
Load osc file
"
,
path
,
"
*.osc
"
)
dialog
.
setDefaultSuffix
(
"
osc
"
)
dialog
.
setAcceptMode
(
QFileDialog
.
AcceptOpen
)
if
dialog
.
exec
()
==
QDialog
.
Accepted
:
name
=
dialog
.
selectedFiles
()
self
.
buffer
[:]
=
np
.
fromfile
(
name
[
0
],
np
.
int16
)
self
.
update
()
except
:
self
.
log
.
print
(
"
error: %s
"
%
sys
.
exc_info
()[
1
])
class
GenDisplay
(
QWidget
,
Ui_GenDisplay
):
def
__init__
(
self
,
rpControl
,
log
):
super
(
GenDisplay
,
self
).
__init__
()
self
.
setupUi
(
self
)
# initialize variables
self
.
rpControl
=
rpControl
self
.
log
=
log
self
.
bins
=
4096
self
.
buffer
=
np
.
zeros
(
self
.
bins
,
np
.
uint32
)
for
i
in
range
(
16
):
# initialize with delta-functions
self
.
buffer
[(
i
+
1
)
*
256
-
1
]
=
1
# create figure
self
.
figure
=
Figure
()
if
sys
.
platform
!=
"
win32
"
:
self
.
figure
.
set_facecolor
(
"
none
"
)
# !gq self.figure.subplots_adjust(left=0.18, bottom=0.08, right=0.98, top=0.95)
self
.
figure
.
subplots_adjust
(
left
=
0.08
,
bottom
=
0.08
,
right
=
0.98
,
top
=
0.92
)
self
.
canvas
=
FigureCanvas
(
self
.
figure
)
self
.
plotLayout
.
addWidget
(
self
.
canvas
)
self
.
ax
=
self
.
figure
.
add_subplot
(
111
)
self
.
ax
.
grid
()
self
.
ax
.
set_ylabel
(
"
counts
"
)
self
.
xunit
=
"
[0.122 mV /channel]
"
self
.
ax
.
set_xlabel
(
"
channel number
"
+
self
.
xunit
)
# self.ax.set_xlabel("channel number")
# !gq
self
.
ax_x2
=
self
.
ax
.
secondary_xaxis
(
'
top
'
,
functions
=
(
self
.
adc2mV
,
self
.
mV2adc
))
self
.
ax_x2
.
set_xlabel
(
"
Voltage [mV]
"
,
color
=
'
grey
'
)
x
=
np
.
arange
(
self
.
bins
)
(
self
.
curve
,)
=
self
.
ax
.
plot
(
x
,
self
.
buffer
,
drawstyle
=
"
steps-mid
"
,
color
=
"
#FFAA00
"
)
# create navigation toolbar
self
.
toolbar
=
NavigationToolbar
(
self
.
canvas
,
None
,
False
)
self
.
toolbar
.
layout
().
setSpacing
(
6
)
# remove subplots action
actions
=
self
.
toolbar
.
actions
()
self
.
toolbar
.
removeAction
(
actions
[
6
])
self
.
toolbar
.
removeAction
(
actions
[
7
])
self
.
logCheck
=
QCheckBox
(
"
log scale
"
)
self
.
logCheck
.
setChecked
(
False
)
self
.
toolbar
.
addSeparator
()
self
.
toolbar
.
addWidget
(
self
.
logCheck
)
self
.
plotLayout
.
addWidget
(
self
.
toolbar
)
# configure controls
actions
[
0
].
triggered
.
disconnect
()
actions
[
0
].
triggered
.
connect
(
self
.
home
)
self
.
logCheck
.
toggled
.
connect
(
self
.
set_scale
)
self
.
startButton
.
clicked
.
connect
(
self
.
start
)
self
.
loadButton
.
clicked
.
connect
(
self
.
load
)
# gq
def
adc2mV
(
self
,
c
):
"""
convert adc-count to Voltage in mV
!!! there is a factor of two betwenn channel definition here and in hstDisplay !
"""
return
c
*
self
.
rpControl
.
adc_unit
/
2
def
mV2adc
(
self
,
v
):
"""
convert voltage in mV to adc-count
"""
return
v
*
self
.
rpControl
.
adc_unit
*
2
# gq end
def
start
(
self
):
if
self
.
rpControl
.
idle
:
return
self
.
rpControl
.
set_gen_fall
(
self
.
fallValue
.
value
())
self
.
rpControl
.
set_gen_rise
(
self
.
riseValue
.
value
())
self
.
rpControl
.
set_gen_rate
(
self
.
rateValue
.
value
()
*
1000
)
self
.
rpControl
.
set_gen_dist
(
self
.
poissonButton
.
isChecked
())
for
value
in
np
.
arange
(
self
.
bins
,
dtype
=
np
.
uint64
)
<<
32
|
self
.
buffer
:
self
.
rpControl
.
set_gen_bin
(
value
)
self
.
rpControl
.
start_gen
()
self
.
startButton
.
setText
(
"
Stop
"
)
self
.
startButton
.
clicked
.
disconnect
()
self
.
startButton
.
clicked
.
connect
(
self
.
stop
)
self
.
log
.
print
(
"
generator started
"
)
def
stop
(
self
):
self
.
rpControl
.
stop_gen
()
self
.
startButton
.
setText
(
"
Start
"
)
self
.
startButton
.
clicked
.
disconnect
()
self
.
startButton
.
clicked
.
connect
(
self
.
start
)
self
.
log
.
print
(
"
generator stopped
"
)
def
home
(
self
):
self
.
set_scale
(
self
.
logCheck
.
isChecked
())
def
set_scale
(
self
,
checked
):
self
.
toolbar
.
home
()
self
.
toolbar
.
update
()
if
checked
:
self
.
ax
.
set_ylim
(
1
,
1e10
)
self
.
ax
.
set_yscale
(
"
log
"
)
else
:
self
.
ax
.
set_ylim
(
auto
=
True
)
self
.
ax
.
set_yscale
(
"
linear
"
)
self
.
ax
.
relim
()
self
.
ax
.
autoscale_view
(
scalex
=
True
,
scaley
=
True
)
self
.
canvas
.
draw
()
def
load
(
self
):
try
:
dialog
=
QFileDialog
(
self
,
"
Load gen file
"
,
path
,
"
*.gen
"
)
dialog
.
setDefaultSuffix
(
"
gen
"
)
dialog
.
setAcceptMode
(
QFileDialog
.
AcceptOpen
)
if
dialog
.
exec
()
==
QDialog
.
Accepted
:
name
=
dialog
.
selectedFiles
()
self
.
buffer
[:]
=
np
.
loadtxt
(
name
[
0
],
np
.
uint32
)
self
.
curve
.
set_ydata
(
self
.
buffer
)
self
.
ax
.
relim
()
self
.
ax
.
autoscale_view
(
scalex
=
False
,
scaley
=
True
)
self
.
canvas
.
draw
()
except
:
self
.
log
.
print
(
"
error: %s
"
%
sys
.
exc_info
()[
1
])
def
runQt
():
# start Qt5
app
=
QApplication
(
sys
.
argv
)
dpi
=
app
.
primaryScreen
().
logicalDotsPerInch
()
matplotlib
.
rcParams
[
"
figure.dpi
"
]
=
dpi
application
=
rpControl
()
application
.
show
()
sys
.
exit
(
app
.
exec_
())
if
__name__
==
'
__main__
'
:
# --------------------------------------------
# run the application with Qt5 interface
runQt
()
This diff is collapsed.
Click to expand it.
Preview
0%
Loading
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Save comment
Cancel
Please
register
or
sign in
to comment