Okay, we have success, using the LGPL 3+ licensed
python-dali repository as of commit
82fcf1e72aa37e1a83282f0f516f74559d5320e7 with the following script (
created by the project owner!) placed into the
examples sub-directory on a Raspberry Pi (2B in my case) connected to a LED-Warrior14(-01) at the default I2C address of 0x23 as a file which I chose to call
commission.py and with the user being a member of the
i2c group and having the
python3-smbus package installed in the system:
Code: Select all
#!/usr/bin/env python3
# Sample code for CodeMercs LW14 with python-dali on Raspberry Pi I2C
from dali.command import Command
from dali.gear.general import EnableDeviceType, Off
from dali.address import GearBroadcast
from dali.sequences import sleep as seq_sleep
from dali.sequences import progress as seq_progress
from dali.exceptions import CommunicationError, UnsupportedFrameTypeError
from dali.sequences import Commissioning
import dali.frame
import time
import logging
import struct
struct_signature = struct.Struct(">HHH")
class I2C_RPI:
def __init__(self, address):
import smbus
self.bus = smbus.SMBus(1)
self.address = address
def ping(self):
self.bus.write_quick(self.address)
def write_byte(self, register, data):
self.bus.write_byte_data(self.address, register, data)
def read_byte(self, register):
return self.bus.read_byte_data(self.address, register)
def write_bytes(self, register, data):
self.bus.write_i2c_block_data(self.address, register, data)
def read_bytes(self, register, length):
return bytes(self.bus.read_i2c_block_data(
self.address, register, length))
class LW14:
def __init__(self, i2c, logger=None):
self.bus = i2c
self.log = logger or logging.getLogger(__name__)
# Simple presence check — raises IOError if device absent
self.bus.ping()
# Read and check signature
vendor, product, version = struct_signature.unpack(
self.bus.read_bytes(0xf0, 6))
self.log.debug("LW14 vendor: %d", vendor)
self.log.debug("LW14 product: %d", product)
self.log.debug("LW14 version: %x", version)
if product != 14:
raise Exception(f"Unsupported product ID {product}; expected 14")
if version < 0x2000:
raise Exception(f"Unsupported LW14 version {version:02x}; minimum "
f"version required is 2000")
# Clear "telegram received", "overrun", "valid reply" bits
self.bus.read_byte(0x01)
# Read status register to clear the "frame error" bit
self._read_status()
# Check that no unexpected status bits are set
status = self._read_status()
if status & 0x3b:
raise Exception("Unexpected bits set in status register")
@staticmethod
def _wrap(command):
response = yield command
return response
def _send_frame(self, frame, wait_for_response=False):
if len(frame) not in (16, 24):
raise UnsupportedFrameTypeError
# Wait for device to be ready
status = 0x40
while status & 0x40:
status = self._read_status()
if status & 0x80:
# Bus fault — we can't continue
raise CommunicationError(
"Bus fault while waiting to transmit")
if status & 0x0b:
# "Valid reply" or one of the "telegram byte(s) received" bits
# is still set — read command register to clear
self.log.debug("Reading command reg to clear old reply")
self.bus.read_byte(0x01)
if status & 0x40:
# Still busy — wait a bit before polling again
time.sleep(0.002)
log.debug("Status before sending frame is %x", status)
# Write frame to command register; the LW14 will transmit as
# soon as the bus is free, according to the priority in the
# config register
self.bus.write_bytes(0x01, frame.as_byte_sequence)
if wait_for_response:
frame_error = False
log.debug("Waiting for response")
# Poll the status register. We are waiting for bit 0x04
# ("Reply timeframe") and bit 0x40 ("Busy") to become unset.
status = 0x44
while status & 0x44:
status = self._read_status()
if status & 0x80:
# Bus fault — we can't continue
raise CommunicationError(
"Bus fault while waiting for response")
if status & 0x10:
# Frame error
log.debug("Frame error bit set while waiting for response")
frame_error = True
if status & 0x44:
# Still waiting
time.sleep(0.0002)
log.debug("Finished waiting, status is %x", status)
if frame_error:
return dali.frame.BackwardFrameError(0)
# If "valid reply" is set, check that it was a single
# byte. If so, it's a response. Otherwise it's an error.
if status & 0x08:
while status & 0x03 == 0:
# Poll until the reply is fully received and its length
# is available
time.sleep(0.001)
status = self._read_status()
if status & 0x10:
# Frame error
log.debug("Frame error bit set while reading response")
return dali.frame.BackwardFrameError(0)
if (status & 0x03) == 1:
return dali.frame.BackwardFrame(self.bus.read_byte(0x01))
raise CommunicationError(
"Forward frame received while waiting for backward frame")
def _read_status(self):
return self.bus.read_byte(0x00)
def _set_priority(self, priority):
self.log.debug("Setting priority %d", priority)
self.bus.write_byte(0x02, priority)
def send(self, command, priority=2, progress=None):
# If "command" is not a sequence, wrap it in a trivial sequence
seq = self._wrap(command) if isinstance(command, Command) else command
self._set_priority(priority)
response = None
while True:
try:
cmd = seq.send(response)
except StopIteration as r:
return r.value
if isinstance(cmd, seq_sleep):
log.debug("Waiting for %f seconds", cmd.delay)
time.sleep(cmd.delay)
elif isinstance(cmd, seq_progress):
if progress:
progress(cmd)
else:
self.log.debug("Sending %s", cmd)
if cmd.devicetype != 0:
self._send_frame(EnableDeviceType(cmd.devicetype).frame)
self._set_priority(1)
if cmd.sendtwice:
self._send_frame(cmd.frame)
self._set_priority(1)
response_frame = self._send_frame(
cmd.frame, cmd.response is not None)
self._set_priority(1)
response = None
if cmd.response:
response = cmd.response(response_frame)
self.log.debug("Response is %s", response)
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
log = logging.getLogger()
d = LW14(I2C_RPI(0x23))
# Turn everything on while we work...
d.send(Off(GearBroadcast()))
# Assign addresses to all attached control gear
d.send(Commissioning(readdress=True), priority=3, progress=log.info)
I was able to reliably (re)set all the short addresses for the devices on my DALI system and assign a random short-address starting at
0 and increasing monotonically for each and all of them. The behaviour should there be a collision between two of the sixty four possible devices in the 2^24 possible random addresses they might choose to use is not clear to me but the chance of it happening more than once seems vanishingly small and fixable by running it again. So this seems a viable starting point for setting up a system where you don't want to have to set a short address by connecting to each device individually ...