-
Notifications
You must be signed in to change notification settings - Fork 811
/
interrupt.py
executable file
·235 lines (189 loc) · 7.31 KB
/
interrupt.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
# Copyright (c) 2017, Xilinx, Inc.
# SPDX-License-Identifier: BSD-3-Clause
import asyncio
import weakref
import warnings
from .pl import PL
from .ps import CPU_ARCH, ZU_ARCH, ZYNQ_ARCH
from .mmio import MMIO
from .uio import get_uio_device, UioController
def get_uio_irq(irq):
"""Returns the UIO device path for a specified interrupt.
If the IRQ either cannot be found or does not correspond to a
UIO device, None is returned.
Parameters
----------
irq : int
The desired physical interrupt line
Returns
-------
str
The path of the device in /dev list.
"""
dev_name = None
with open('/proc/interrupts', 'r') as f:
for line in f:
cols = line.split()
if len(cols) >= 7:
if cols[-3] == str(irq):
dev_name = cols[-1]
if dev_name is None:
return None
else:
return get_uio_device(dev_name)
class Interrupt(object):
"""Class that provides the core wait-based API to end users
Provides a single coroutine wait that waits until the interrupt
signal goes high. If the Overlay is changed or re-downloaded this
object is invalidated and waiting results in undefined behaviour."""
def __init__(self, pinname):
"""Initialise an Interrupt object attached to the specified pin
Parameters
----------
pinname : string
Fully qualified name of the pin in the block diagram of the
for ${cell}/${pin}. Raises an exception if the pin cannot
be found in the currently active Overlay
"""
if pinname not in PL.interrupt_pins:
raise ValueError("No Pin of name {} found".format(pinname))
parent, self.number = _InterruptController.get_parent(
PL.interrupt_pins[pinname])
self.parent = weakref.ref(parent)
self.event = asyncio.Event()
self.waiting = False
async def wait(self):
"""Wait for the interrupt to be active
May raise an exception if the Overlay has been changed since
initialisation.
"""
parent = self.parent()
if parent is None:
raise RuntimeError("Interrupt invalidated by Overlay change")
if not self.waiting:
self.event.clear()
parent.add_event(self.event, self.number)
self.waiting = True
await self.event.wait()
self.waiting = False
class _InterruptController(object):
"""Class that interacts with an AXI interrupt controller
This class is not designed to be interacted with by end users directly -
most uses will be via the register_interrupt API which will handle the
creation and registration of _InterruptController instances
"""
_controllers = []
_uio_devices = {}
_last_timestamp = None
@staticmethod
def get_controller(name):
"""Returns the _InterruptController corresponding to the AXI interrupt
controller with the specified name. Will invalidate all interrupt
controllers if the Overlay has been changed. Should not be accessed
by user code.
Parameters
----------
name : str
Name of the interrupt controller to return
"""
bitstream_timestamp = PL.timestamp
if bitstream_timestamp != _InterruptController._last_timestamp:
_InterruptController._controllers.clear()
_InterruptController._last_timestamp = bitstream_timestamp
for con in _InterruptController._controllers:
if con.name == name:
return con
ret = _InterruptController(name)
_InterruptController._controllers.append(ret)
return ret
@staticmethod
def get_parent(entry):
"""Return the parent and index of an interrupt.
This can either be an interrupt controller or a UIO interface
Parameters
----------
entry : dict
The entry in the interrupt_pins dict for the pin
"""
parent = entry['parent'] if 'parent' in entry else entry['controller']
number = entry['index']
if parent == "":
raw_irq = entry['raw_irq']
if raw_irq in _InterruptController._uio_devices:
return _InterruptController._uio_devices[raw_irq], 0
uiodev = get_uio_irq(raw_irq)
if uiodev is None:
raise ValueError('Could not find UIO device for interrupt pin '
'for IRQ number {}'.format(raw_irq))
dev = UioController(uiodev)
_InterruptController._uio_devices[raw_irq] = dev
return dev, 0
else:
return _InterruptController.get_controller(parent), number
def __init__(self, name):
"""Return a new _InterruptController
Returns a new _InterruptController. As these are singleton objects,
this should never be called directly, instead register_interrupt
should be used, or get_controller if direct access is required
Parameters
----------
name : str
Name of the interrupt controller to return
"""
self.name = name
self.mmio = MMIO(PL.ip_dict[name]['phys_addr'], 32)
self.wait_handles = [[] for _ in range(32)]
self.event_number = 0
self.waiting = False
# Disable Interrupt lines
self.mmio.write(0x08, 0)
self.parent, self.number = _InterruptController.get_parent(
PL.interrupt_controllers[name])
def set(self):
"""Mimics the set function of an event. Should not be called by
user code
Allows for chaining of interrupt controllers by looking like an
event to the parent controller. Will re-add the event if there
are still interrupts left outstanding
"""
# Pull pending interrupts
irqs = self.mmio.read(0x04)
# Call all active IRQs
work = irqs
irq = 0
while work != 0:
if work % 2 == 1:
# Disable the interrupt
self.mmio.write(0x14, 1 << irq)
events = self.wait_handles[irq]
self.wait_handles[irq] = []
for e in events:
e.set()
self.event_number -= len(events)
work = work >> 1
irq = irq + 1
# Acknowledge the interrupts
self.mmio.write(0x0C, irqs)
if self.event_number:
self.parent.add_event(self, self.number)
def add_event(self, event, number):
"""Registers an event against an interrupt line
When the interrupt is active, all events are signaled and the
interrupt line is disabled. End user classes should clear the
interrupt before re-adding the event.
Parameters
----------
event : object
Any object that provides a set method to notify of
an active interrupt
number : int
Interrupt number to register event against
"""
if not self.wait_handles[number]:
self.mmio.write(0x10, 1 << number)
if not self.event_number:
self.parent.add_event(self, self.number)
self.wait_handles[number].append(event)
self.event_number += 1
# Enable global interrupt
self.mmio.write(0x1C, 0x00000003)