diff --git a/examples/oled_animation_example.py b/examples/oled_animation_example.py index 50a0c7bdfa50344111c1cfc9c86968e5be436367..075438b7f0267f9678ffd42a3b0307f6d65264c7 100644 --- a/examples/oled_animation_example.py +++ b/examples/oled_animation_example.py @@ -1,7 +1,7 @@ import sys sys.path.insert(0, 'libraries') from machine import Pin, I2C -import ssd1306, framebuf +import ssd1306 # using default address 0x3C i2c = I2C(sda=Pin(21), scl=Pin(22)) diff --git a/libraries/sx127x.py b/libraries/sx127x.py new file mode 100644 index 0000000000000000000000000000000000000000..8e717562920b4093d18ba1fe1b211004b7c4de4d --- /dev/null +++ b/libraries/sx127x.py @@ -0,0 +1,499 @@ +from time import sleep, ticks_ms +from machine import SPI, Pin +from micropython import const +import gc + + +PA_OUTPUT_RFO_PIN = const(0) +PA_OUTPUT_PA_BOOST_PIN = const(1) + +# registers +REG_FIFO = const(0x00) +REG_OP_MODE = const(0x01) +REG_FRF_MSB = const(0x06) +REG_FRF_MID = const(0x07) +REG_FRF_LSB = const(0x08) +REG_PA_CONFIG = const(0x09) +REG_LNA = const(0x0C) +REG_FIFO_ADDR_PTR = const(0x0D) + +REG_FIFO_TX_BASE_ADDR = const(0x0E) +FifoTxBaseAddr = const(0x00) + +REG_FIFO_RX_BASE_ADDR = const(0x0F) +FifoRxBaseAddr = const(0x00) +REG_FIFO_RX_CURRENT_ADDR = const(0x10) +REG_IRQ_FLAGS_MASK = const(0x11) +REG_IRQ_FLAGS = const(0x12) +REG_RX_NB_BYTES = const(0x13) +REG_PKT_RSSI_VALUE = const(0x1A) +REG_PKT_SNR_VALUE = const(0x19) +REG_MODEM_CONFIG_1 = const(0x1D) +REG_MODEM_CONFIG_2 = const(0x1E) +REG_PREAMBLE_MSB = const(0x20) +REG_PREAMBLE_LSB = const(0x21) +REG_PAYLOAD_LENGTH = const(0x22) +REG_FIFO_RX_BYTE_ADDR = const(0x25) +REG_MODEM_CONFIG_3 = const(0x26) +REG_RSSI_WIDEBAND = const(0x2C) +REG_DETECTION_OPTIMIZE = const(0x31) +REG_DETECTION_THRESHOLD = const(0x37) +REG_SYNC_WORD = const(0x39) +REG_DIO_MAPPING_1 = const(0x40) +REG_VERSION = const(0x42) + +# invert IQ +REG_INVERTIQ = const(0x33) +RFLR_INVERTIQ_RX_MASK = const(0xBF) +RFLR_INVERTIQ_RX_OFF = const(0x00) +RFLR_INVERTIQ_RX_ON = const(0x40) +RFLR_INVERTIQ_TX_MASK = const(0xFE) +RFLR_INVERTIQ_TX_OFF = const(0x01) +RFLR_INVERTIQ_TX_ON = const(0x00) + +REG_INVERTIQ2 = const(0x3B) +RFLR_INVERTIQ2_ON = const(0x19) +RFLR_INVERTIQ2_OFF = const(0x1D) + +# modes +# bit 7: 1 => LoRa mode +MODE_LONG_RANGE_MODE = const(0x80) +MODE_SLEEP = const(0x00) +MODE_STDBY = const(0x01) +MODE_TX = const(0x03) +MODE_RX_CONTINUOUS = const(0x05) +MODE_RX_SINGLE = const(0x06) + +# PA config +PA_BOOST = const(0x80) + +# IRQ masks +IRQ_TX_DONE_MASK = const(0x08) +IRQ_PAYLOAD_CRC_ERROR_MASK = const(0x20) +IRQ_RX_DONE_MASK = const(0x40) +IRQ_RX_TIME_OUT_MASK = const(0x80) + +# Buffer size +MAX_PKT_LENGTH = const(255) + + +class SX127x: + + default_parameters = { + "frequency": 869525000, + "frequency_offset": 0, + "tx_power_level": 14, + "signal_bandwidth": 125e3, + "spreading_factor": 9, + "coding_rate": 5, + "preamble_length": 8, + "implicitHeader": False, + "sync_word": 0x12, + "enable_CRC": True, + "invert_IQ": False, + } + + def __init__(self, spi, pins, parameters={}): + self.spi = spi + self.pins = pins + self.parameters = parameters + + self.pin_ss = Pin(self.pins["ss"], Pin.OUT) + + self.lock = False + self.implicit_header_mode = None + + self.parameters = SX127x.default_parameters + if parameters: + self.parameters.update(parameters) + + # check version + version = None + for i in range(5): + version = self.readRegister(REG_VERSION) + if version: + break + # debug output + print("SX version: {}".format(version)) + + # put in LoRa and sleep mode + self.sleep() + # config + self.setFrequency(self.parameters["frequency"]) + self.setSignalBandwidth(self.parameters["signal_bandwidth"]) + + # set LNA boost + self.writeRegister(REG_LNA, self.readRegister(REG_LNA) | 0x03) + # set auto AGC + self.writeRegister(REG_MODEM_CONFIG_3, 0x04) + + self.setTxPower(self.parameters["tx_power_level"]) + self.implicitHeaderMode(self.parameters["implicitHeader"]) + self.setSpreadingFactor(self.parameters["spreading_factor"]) + self.setCodingRate(self.parameters["coding_rate"]) + self.setPreambleLength(self.parameters["preamble_length"]) + self.setSyncWord(self.parameters["sync_word"]) + self.enableCRC(self.parameters["enable_CRC"]) + self.invertIQ(self.parameters["invert_IQ"]) + + # set LowDataRateOptimize flag if symbol time > 16ms (default disable on reset) + # self.writeRegister(REG_MODEM_CONFIG_3, self.readRegister(REG_MODEM_CONFIG_3) & 0xF7) # default disable on reset + bw = self.parameters["signal_bandwidth"] + sf = self.parameters["spreading_factor"] + if 1000 / bw / 2 ** sf > 16: + self.writeRegister( + REG_MODEM_CONFIG_3, self.readRegister(REG_MODEM_CONFIG_3) | 0x08 + ) + + # set base addresses + self.writeRegister(REG_FIFO_TX_BASE_ADDR, FifoTxBaseAddr) + self.writeRegister(REG_FIFO_RX_BASE_ADDR, FifoRxBaseAddr) + + self.standby() + + def beginPacket(self, implicitHeaderMode=False): + self.standby() + self.implicitHeaderMode(implicitHeaderMode) + + # reset FIFO address and payload length + self.writeRegister(REG_FIFO_ADDR_PTR, FifoTxBaseAddr) + self.writeRegister(REG_PAYLOAD_LENGTH, 0) + + def endPacket(self): + # put in TX mode + self.writeRegister(REG_OP_MODE, MODE_LONG_RANGE_MODE | MODE_TX) + # wait for TX done, standby automatically on TX_DONE + while (self.readRegister(REG_IRQ_FLAGS) & IRQ_TX_DONE_MASK) == 0: + pass + # clear IRQ's + self.writeRegister(REG_IRQ_FLAGS, IRQ_TX_DONE_MASK) + + def write(self, buffer): + currentLength = self.readRegister(REG_PAYLOAD_LENGTH) + size = len(buffer) + + # check size + size = min(size, (MAX_PKT_LENGTH - FifoTxBaseAddr - currentLength)) + + # write data + for i in range(size): + self.writeRegister(REG_FIFO, buffer[i]) + + # update length + self.writeRegister(REG_PAYLOAD_LENGTH, currentLength + size) + return size + + def aquirelock(self, lock=False): + self.lock = False + + def println(self, message, implicitHeader=False, repeat=1): + # wait until RX_Done, lock and begin writing + self.aquirelock(True) + + if isinstance(message, str): + message = message.encode() + + self.beginPacket(implicitHeader) + self.write(message) + + for i in range(repeat): + self.endPacket() + + # unlock when done writing + self.aquirelock(False) + self.collectGarbage() + + def getIrqFlags(self): + irqFlags = self.readRegister(REG_IRQ_FLAGS) + self.writeRegister(REG_IRQ_FLAGS, irqFlags) + return irqFlags + + def packetRssi(self, rfi="hf"): + packet_rssi = self.readRegister(REG_PKT_RSSI_VALUE) + return packet_rssi - (157 if rfi == "hf" else 164) + + def packetSnr(self): + return (self.readRegister(REG_PKT_SNR_VALUE)) * 0.25 + + def standby(self): + self.writeRegister(REG_OP_MODE, MODE_LONG_RANGE_MODE | MODE_STDBY) + + def sleep(self): + self.writeRegister(REG_OP_MODE, MODE_LONG_RANGE_MODE | MODE_SLEEP) + + def setTxPower(self, level, outputPin=PA_OUTPUT_PA_BOOST_PIN): + self.parameters["tx_power_level"] = level + if outputPin == PA_OUTPUT_RFO_PIN: + # RFO + level = min(max(level, 0), 14) + self.writeRegister(REG_PA_CONFIG, 0x70 | level) + else: + # PA BOOST + level = min(max(level, 2), 17) + self.writeRegister(REG_PA_CONFIG, PA_BOOST | (level - 2)) + + def setFrequency(self, frequency): + # TODO min max limit + frequency = int(frequency) + self.parameters["frequency"] = frequency + frequency += self.parameters["frequency_offset"] + + frf = (frequency << 19) // 32000000 + self.writeRegister(REG_FRF_MSB, (frf >> 16) & 0xFF) + self.writeRegister(REG_FRF_MID, (frf >> 8) & 0xFF) + self.writeRegister(REG_FRF_LSB, (frf >> 0) & 0xFF) + + def setSpreadingFactor(self, sf): + sf = min(max(sf, 6), 12) + self.writeRegister(REG_DETECTION_OPTIMIZE, 0xC5 if sf == 6 else 0xC3) + self.writeRegister(REG_DETECTION_THRESHOLD, 0x0C if sf == 6 else 0x0A) + self.writeRegister( + REG_MODEM_CONFIG_2, + (self.readRegister(REG_MODEM_CONFIG_2) & 0x0F) | ((sf << 4) & 0xF0), + ) + + def setSignalBandwidth(self, sbw): + bins = ( + 7.8e3, + 10.4e3, + 15.6e3, + 20.8e3, + 31.25e3, + 41.7e3, + 62.5e3, + 125e3, + 250e3, + ) + bw = 9 + + if sbw < 10: + bw = sbw + else: + for i in range(len(bins)): + if sbw <= bins[i]: + bw = i + break + + self.writeRegister( + REG_MODEM_CONFIG_1, + (self.readRegister(REG_MODEM_CONFIG_1) & 0x0F) | (bw << 4), + ) + + def setCodingRate(self, denominator): + denominator = min(max(denominator, 5), 8) + cr = denominator - 4 + self.writeRegister( + REG_MODEM_CONFIG_1, + (self.readRegister(REG_MODEM_CONFIG_1) & 0xF1) | (cr << 1), + ) + + def setPreambleLength(self, length): + self.writeRegister(REG_PREAMBLE_MSB, (length >> 8) & 0xFF) + self.writeRegister(REG_PREAMBLE_LSB, (length >> 0) & 0xFF) + + def enableCRC(self, enable_CRC=False): + modem_config_2 = self.readRegister(REG_MODEM_CONFIG_2) + config = modem_config_2 | 0x04 if enable_CRC else modem_config_2 & 0xFB + self.writeRegister(REG_MODEM_CONFIG_2, config) + + def invertIQ(self, invertIQ): + self.parameters["invertIQ"] = invertIQ + if invertIQ: + self.writeRegister( + REG_INVERTIQ, + ( + ( + self.readRegister(REG_INVERTIQ) + & RFLR_INVERTIQ_TX_MASK + & RFLR_INVERTIQ_RX_MASK + ) + | RFLR_INVERTIQ_RX_ON + | RFLR_INVERTIQ_TX_ON + ), + ) + self.writeRegister(REG_INVERTIQ2, RFLR_INVERTIQ2_ON) + else: + self.writeRegister( + REG_INVERTIQ, + ( + ( + self.readRegister(REG_INVERTIQ) + & RFLR_INVERTIQ_TX_MASK + & RFLR_INVERTIQ_RX_MASK + ) + | RFLR_INVERTIQ_RX_OFF + | RFLR_INVERTIQ_TX_OFF + ), + ) + self.writeRegister(REG_INVERTIQ2, RFLR_INVERTIQ2_OFF) + + def setSyncWord(self, sw): + self.writeRegister(REG_SYNC_WORD, sw) + + def setChannel(self, parameters): + self.standby() + for key in parameters: + if key == "frequency": + self.setFrequency(parameters[key]) + continue + if key == "invert_IQ": + self.invertIQ(parameters[key]) + continue + if key == "tx_power_level": + self.setTxPower(parameters[key]) + continue + + def dumpRegisters(self): + # TODO end='' + for i in range(128): + print("0x{:02X}: {:02X}".format(i, self.readRegister(i)), end="") + if (i + 1) % 4 == 0: + print() + else: + print(" | ", end="") + + def implicitHeaderMode(self, implicitHeaderMode=False): + if ( + self.implicit_header_mode != implicitHeaderMode + ): # set value only if different. + self.implicit_header_mode = implicitHeaderMode + modem_config_1 = self.readRegister(REG_MODEM_CONFIG_1) + config = ( + modem_config_1 | 0x01 + if implicitHeaderMode + else modem_config_1 & 0xFE + ) + self.writeRegister(REG_MODEM_CONFIG_1, config) + + def receive(self, size=0): + self.implicitHeaderMode(size > 0) + if size > 0: + self.writeRegister(REG_PAYLOAD_LENGTH, size & 0xFF) + + # The last packet always starts at FIFO_RX_CURRENT_ADDR + # no need to reset FIFO_ADDR_PTR + self.writeRegister( + REG_OP_MODE, MODE_LONG_RANGE_MODE | MODE_RX_CONTINUOUS + ) + + def listen(self, time=1000): + time = min(max(time, 0), 10000) + self.receive() + + start = ticks_ms() + while True: + if self.receivedPacket(): + return self.readPayload() + if ticks_ms() - start > time: + return None + + def onReceive(self, callback): + self.onReceive = callback + + if "dio_0" in self.pins: + self.pin_rx_done = Pin(self.pins["dio_0"], Pin.IN) + + if self.pin_rx_done: + if callback: + self.writeRegister(REG_DIO_MAPPING_1, 0x00) + self.pin_rx_done.irq( + trigger=Pin.IRQ_RISING, handler=self.handleOnReceive + ) + else: + pass + # TODO detach irq + + def handleOnReceive(self, event_source): + # lock until TX_Done + self.aquirelock(True) + irqFlags = self.getIrqFlags() + # RX_DONE only, irqFlags should be 0x40 + if irqFlags & IRQ_RX_DONE_MASK == IRQ_RX_DONE_MASK: + # automatically standby when RX_DONE + if self.onReceive: + payload = self.readPayload() + self.onReceive(self, payload) + + elif self.readRegister(REG_OP_MODE) != ( + MODE_LONG_RANGE_MODE | MODE_RX_SINGLE + ): + # no packet received. + # reset FIFO address / # enter single RX mode + self.writeRegister(REG_FIFO_ADDR_PTR, FifoRxBaseAddr) + self.writeRegister( + REG_OP_MODE, MODE_LONG_RANGE_MODE | MODE_RX_SINGLE + ) + + self.aquirelock(False) # unlock in any case. + self.collectGarbage() + return True + + def receivedPacket(self, size=0): + irqFlags = self.getIrqFlags() + self.implicitHeaderMode(size > 0) + if size > 0: + self.writeRegister(REG_PAYLOAD_LENGTH, size & 0xFF) + + # if (irqFlags & IRQ_RX_DONE_MASK) and \ + # (irqFlags & IRQ_RX_TIME_OUT_MASK == 0) and \ + # (irqFlags & IRQ_PAYLOAD_CRC_ERROR_MASK == 0): + + if ( + irqFlags == IRQ_RX_DONE_MASK + ): # RX_DONE only, irqFlags should be 0x40 + # automatically standby when RX_DONE + return True + + elif self.readRegister(REG_OP_MODE) != ( + MODE_LONG_RANGE_MODE | MODE_RX_SINGLE + ): + # no packet received. + # reset FIFO address / # enter single RX mode + self.writeRegister(REG_FIFO_ADDR_PTR, FifoRxBaseAddr) + self.writeRegister( + REG_OP_MODE, MODE_LONG_RANGE_MODE | MODE_RX_SINGLE + ) + + def readPayload(self): + # set FIFO address to current RX address + # fifo_rx_current_addr = self.readRegister(REG_FIFO_RX_CURRENT_ADDR) + self.writeRegister( + REG_FIFO_ADDR_PTR, self.readRegister(REG_FIFO_RX_CURRENT_ADDR) + ) + + # read packet length + packet_length = 0 + if self.implicit_header_mode: + packet_length = self.readRegister(REG_PAYLOAD_LENGTH) + else: + packet_length = self.readRegister(REG_RX_NB_BYTES) + + payload = bytearray() + for i in range(packet_length): + payload.append(self.readRegister(REG_FIFO)) + + self.collectGarbage() + return bytes(payload) + + def readRegister(self, address, byteorder="big", signed=False): + response = self.transfer(address & 0x7F) + return int.from_bytes(response, byteorder) + + def writeRegister(self, address, value): + self.transfer(address | 0x80, value) + + def transfer(self, address, value=0x00): + response = bytearray(1) + + self.pin_ss.value(0) + + self.spi.write(bytes([address])) + self.spi.write_readinto(bytes([value]), response) + + self.pin_ss.value(1) + + return response + + def collectGarbage(self): + gc.collect() + # print('[Mem aft - free: {} allocated: {}]'.format(gc.mem_free(), gc.mem_alloc())) \ No newline at end of file