diff --git a/examples/lora_rangefinder/functions.py b/examples/lora_rangefinder/functions.py index 5251ca3c3541f9c421ebcf66d69d7b095f74efcd..db8ceed431bd6b88aa53aaef814947f661125ea6 100644 --- a/examples/lora_rangefinder/functions.py +++ b/examples/lora_rangefinder/functions.py @@ -1,54 +1,76 @@ -from re import L import axp202 -import sdcard import ssd1306 import sx127x import os, sys import machine +from gps import GPStime, getGPS from machine import RTC +from math import radians, cos, sin, asin, sqrt +import time rtc = RTC() +def Convert(numbers): + return [ -i for i in numbers] - -def power(on=True) +def power(): + axp = axp202.PMU(address=axp202.AXP192_SLAVE_ADDRESS) axp.setLDO2Voltage(3300) # T-Beam LORA VDD 3v3 axp.setLDO3Voltage(3300) # T-Beam GPS VDD 3v3 - axp = axp202.PMU(address=axp202.AXP192_SLAVE_ADDRESS) - if on == True: - axp.enablePower(axp202.AXP192_LDO3) - axp.enablePower(axp202.AXP192_LDO2) - else: - axp.disablePower(axp202.AXP192_LDO3) - axp.disablePower(axp202.AXP192_LDO2) + axp.enablePower(axp202.AXP192_LDO3) + axp.enablePower(axp202.AXP192_LDO2) def displayfix(long, lat, alt, time, hs = 0): pass def updateOLED(): + pass - -def writefix(long, lat, alt, time, hs = 0): +def log(rx, fix, dist): file = 'gps.txt' dir = os.listdir() if file not in dir: f = open(file, 'w') else: f= open(file, 'a') - f.write(long+','+lat+','+alt+','+time) + line1 = 'Time: ' + str(time.time()) + " Dist: " + dist + line2 = str(rx) + line3 = str(fix) + + f.write(line1 + '\n') + f.write('remote: ' + line2 + '\n') + f.write('local: ' + line3 + '\n') + f.write('\n') f.close() +def haversine(lon1, lat1, lon2, lat2): + """ + Calculate the great circle distance in kilometers between two points + on the earth (specified in decimal degrees) + """ + # convert decimal degrees to radians + lon1, lat1, lon2, lat2 = map(radians, [lon1, lat1, lon2, lat2]) + + # haversine formula + dlon = lon2 - lon1 + dlat = lat2 - lat1 + a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2 + c = 2 * asin(sqrt(a)) + r = 6378100 # Radius of earth in kilometers. Use 3956 for miles. Determines return value units. + return c * r + -def joiner(): - hs.haversine(loc1,loc2,unit=Unit.METERS) -def setTime(i): - rtc.datetime((int(i['year']), int(i['month']), int(i['day']), - None, int(i['hour']), int(i['minute']), - int(i['second']), None)) - return rtc.datetime() +def setTime(): + global gpsTime + print("Getting GPS Time...") + getGPS(outtime = 120) + rtc.datetime((2022, 8, 2, 0, GPStime['hours'], GPStime['minutes'], GPStime['seconds'], 0)) + print("GPS Time Obtained: " + str(rtc.datetime())) + + def bootup(): - power(True) - setTime(gpsTime()) - return rtc + power() + setTime() + diff --git a/examples/lora_rangefinder/gps.py b/examples/lora_rangefinder/gps.py index c87361229b0476d9ea387c92e6d950d2ab877e07..c579d35d224f0c69b2f1b35f9ba133678c621717 100644 --- a/examples/lora_rangefinder/gps.py +++ b/examples/lora_rangefinder/gps.py @@ -4,8 +4,7 @@ import time, utime import machine from machine import Pin, UART, SoftI2C from ssd1306 import SSD1306_I2C -from haversine import Unit -import haversine as hs +#from haversine import Unit i2c = SoftI2C(scl=Pin(22), sda=Pin(21), freq=10000) #initializing the I2C method for ESP32 oled = SSD1306_I2C(128, 64, i2c) @@ -14,52 +13,52 @@ GPS_RX_PIN = 34 GPS_TX_PIN = 12 gpsModule = machine.UART(2, rx=GPS_RX_PIN, tx=GPS_TX_PIN, baudrate=9600, bits=8, parity=None, stop=1) + print(gpsModule) buff = bytearray(255) TIMEOUT = False FIX_STATUS = False +#GPStime = "" +GPStime = {'hours': 0, 'minutes': 0, 'seconds': 0} latitude = "" longitude = "" altitude = "" satellites = "" -GPStime = "" +HDOP = "" -b'$GPGGA,121235.00,3833.54539,N,08628.63027,W,1,04,3.66,174.5,M,-32.7,M,,*62\r\n' +#b'$GPGGA,121235.00,3833.54539,N,08628.63027,W,1,04,3.66,174.5,M,-32.7,M,,*62\r\n' -def getGPS(gpsModule): - global FIX_STATUS, TIMEOUT, latitude, longitude, altitude, satellites, GPStime - timeout = time.time() + 8 +def getGPS(gpsModule = gpsModule, outtime = 8): + global FIX_STATUS, TIMEOUT, latitude, longitude, altitude, satellites, GPStime, HDOP + timeout = time.time() + outtime while True: gpsModule.readline() buff = str(gpsModule.readline()) parts = buff.split(',') - if (parts[0] == "b'$GPGGA" and len(parts) == 15): if(parts[1] and parts[2] and parts[3] and parts[4] and parts[5] and parts[6] and parts[7]): - print(buff) - latitude = convertToDegree(parts[2]) - if (parts[3] == 'S'): - latitude = str(-float(latitude)) - longitude = convertToDegree(parts[4]) - if (parts[5] == 'W'): - latitude = str(-float(latitude)) +# print(buff) + parts[2] = convertToDegree(parts[2]) + parts[4] = convertToDegree(parts[4]) altitude = parts[6] satellites = parts[7] - GPStime = parts[1][0:2] + ":" + parts[1][2:4] + ":" + parts[1][4:6] + HDOP = parts[8] + #GPStime = parts[1][0:2] + ":" + parts[1][2:4] + ":" + parts[1][4:6] + GPStime['hours'] = int(parts[1][0:2]) + GPStime['minutes'] = int(parts[1][2:4]) + GPStime['seconds'] = int(parts[1][4:6]) FIX_STATUS = True return parts break - if (time.time() > timeout): - TIMEOUT = True - return None + print('gps timeout') break - utime.sleep_ms(500) + time.sleep_ms(500) def gpsTime(): # $GPZDA,hhmmss.ss,xx,xx,xxxx,xx,xx\r\n timeGot = False - while timeGot = False: + while timeGot == False: gpsModule.readline() buff = str(gpsModule.readline()) parts = buff.split(',') @@ -74,8 +73,9 @@ def gpsTime(): gpsTime['day'].append(parts[2]) gpsTime['month'].append(parts[3]) gpsTime['year'].append(parts[4]) + timeGot = True else: - pass + time.sleep_ms(500) return gpsTime @@ -85,20 +85,4 @@ def convertToDegree(RawDegrees): nexttwodigits = RawAsFloat - float(firstdigits*100) Converted = float(firstdigits + nexttwodigits/60.0) Converted = '{0:.6f}'.format(Converted) - return str(Converted) - - - -def gps(): - fix = bool(False) - results = [latitude, longitude, altitude, satellites, GPStime, fix] - while True: - getGPS(gpsModule) - if (FIX_STATUS == True): - results = [latitude, longitude, altitude, satellites, GPStime, True] - return results - if(TIMEOUT == True): - results = ['0.0','0.0','0.0','0','', False] - TIMEOUT = False - return results - + return str(Converted) \ No newline at end of file diff --git a/examples/lora_rangefinder/haversine.py b/examples/lora_rangefinder/haversine.py deleted file mode 100644 index 56ac3fd418676f9e656cd85e1a113c04aed3b5d5..0000000000000000000000000000000000000000 --- a/examples/lora_rangefinder/haversine.py +++ /dev/null @@ -1,210 +0,0 @@ -from math import radians, cos, sin, asin, sqrt, degrees, pi, atan2 -from enum import Enum -from typing import Union, Tuple - - -# mean earth radius - https://en.wikipedia.org/wiki/Earth_radius#Mean_radius -_AVG_EARTH_RADIUS_KM = 6371.0088 - - -class Unit(Enum): - """ - Enumeration of supported units. - The full list can be checked by iterating over the class; e.g. - the expression `tuple(Unit)`. - """ - - KILOMETERS = 'km' - METERS = 'm' - MILES = 'mi' - NAUTICAL_MILES = 'nmi' - FEET = 'ft' - INCHES = 'in' - RADIANS = 'rad' - DEGREES = 'deg' - - -class Direction(Enum): - """ - Enumeration of supported directions. - The full list can be checked by iterating over the class; e.g. - the expression `tuple(Direction)`. - Angles expressed in radians. - """ - - NORTH = 0 - NORTHEAST = pi * 0.25 - EAST = pi * 0.5 - SOUTHEAST = pi * 0.75 - SOUTH = pi - SOUTHWEST = pi * 1.25 - WEST = pi * 1.5 - NORTHWEST = pi * 1.75 - - -# Unit values taken from http://www.unitconversion.org/unit_converter/length.html -_CONVERSIONS = { - Unit.KILOMETERS: 1.0, - Unit.METERS: 1000.0, - Unit.MILES: 0.621371192, - Unit.NAUTICAL_MILES: 0.539956803, - Unit.FEET: 3280.839895013, - Unit.INCHES: 39370.078740158, - Unit.RADIANS: 1/_AVG_EARTH_RADIUS_KM, - Unit.DEGREES: (1/_AVG_EARTH_RADIUS_KM)*(180.0/pi) -} - - -def get_avg_earth_radius(unit): - unit = Unit(unit) - return _AVG_EARTH_RADIUS_KM * _CONVERSIONS[unit] - - -def _normalize(lat: float, lon: float) -> Tuple[float, float]: - """ - Normalize point to [-90, 90] latitude and [-180, 180] longitude. - """ - lat = (lat + 90) % 360 - 90 - if lat > 90: - lat = 180 - lat - lon += 180 - lon = (lon + 180) % 360 - 180 - return lat, lon - - -def _ensure_lat_lon(lat: float, lon: float): - """ - Ensure that the given latitude and longitude have proper values. An exception is raised if they are not. - """ - if lat < -90 or lat > 90: - raise ValueError(f"Latitude {lat} is out of range [-90, 90]") - if lon < -180 or lon > 180: - raise ValueError(f"Longitude {lon} is out of range [-180, 180]") - - -def haversine(point1, point2, unit=Unit.KILOMETERS, normalize=False): - """ Calculate the great-circle distance between two points on the Earth surface. - Takes two 2-tuples, containing the latitude and longitude of each point in decimal degrees, - and, optionally, a unit of length. - :param point1: first point; tuple of (latitude, longitude) in decimal degrees - :param point2: second point; tuple of (latitude, longitude) in decimal degrees - :param unit: a member of haversine.Unit, or, equivalently, a string containing the - initials of its corresponding unit of measurement (i.e. miles = mi) - default 'km' (kilometers). - :param normalize: if True, normalize the points to [-90, 90] latitude and [-180, 180] longitude. - Example: ``haversine((45.7597, 4.8422), (48.8567, 2.3508), unit=Unit.METERS)`` - Precondition: ``unit`` is a supported unit (supported units are listed in the `Unit` enum) - :return: the distance between the two points in the requested unit, as a float. - The default returned unit is kilometers. The default unit can be changed by - setting the unit parameter to a member of ``haversine.Unit`` - (e.g. ``haversine.Unit.INCHES``), or, equivalently, to a string containing the - corresponding abbreviation (e.g. 'in'). All available units can be found in the ``Unit`` enum. - """ - - # unpack latitude/longitude - lat1, lng1 = point1 - lat2, lng2 = point2 - - # normalize points or ensure they are proper lat/lon, i.e., in [-90, 90] and [-180, 180] - if normalize: - lat1, lng1 = _normalize(lat1, lng1) - lat2, lng2 = _normalize(lat2, lng2) - else: - _ensure_lat_lon(lat1, lng1) - _ensure_lat_lon(lat2, lng2) - - # convert all latitudes/longitudes from decimal degrees to radians - lat1 = radians(lat1) - lng1 = radians(lng1) - lat2 = radians(lat2) - lng2 = radians(lng2) - - # calculate haversine - lat = lat2 - lat1 - lng = lng2 - lng1 - d = sin(lat * 0.5) ** 2 + cos(lat1) * cos(lat2) * sin(lng * 0.5) ** 2 - - return 2 * get_avg_earth_radius(unit) * asin(sqrt(d)) - - -def haversine_vector(array1, array2, unit=Unit.KILOMETERS, comb=False, normalize=False): - ''' - The exact same function as "haversine", except that this - version replaces math functions with numpy functions. - This may make it slightly slower for computing the haversine - distance between two points, but is much faster for computing - the distance between two vectors of points due to vectorization. - ''' - try: - import numpy - except ModuleNotFoundError: - return 'Error, unable to import Numpy,\ - consider using haversine instead of haversine_vector.' - - # ensure arrays are numpy ndarrays - if not isinstance(array1, numpy.ndarray): - array1 = numpy.array(array1) - if not isinstance(array2, numpy.ndarray): - array2 = numpy.array(array2) - - # ensure will be able to iterate over rows by adding dimension if needed - if array1.ndim == 1: - array1 = numpy.expand_dims(array1, 0) - if array2.ndim == 1: - array2 = numpy.expand_dims(array2, 0) - - # Asserts that both arrays have same dimensions if not in combination mode - if not comb: - if array1.shape != array2.shape: - raise IndexError( - "When not in combination mode, arrays must be of same size. If mode is required, use comb=True as argument.") - - # normalize points or ensure they are proper lat/lon, i.e., in [-90, 90] and [-180, 180] - if normalize: - array1 = numpy.array([_normalize(p[0], p[1]) for p in array1]) - array2 = numpy.array([_normalize(p[0], p[1]) for p in array2]) - else: - [_ensure_lat_lon(p[0], p[1]) for p in array1] - [_ensure_lat_lon(p[0], p[1]) for p in array2] - - # unpack latitude/longitude - lat1, lng1 = array1[:, 0], array1[:, 1] - lat2, lng2 = array2[:, 0], array2[:, 1] - - # convert all latitudes/longitudes from decimal degrees to radians - lat1 = numpy.radians(lat1) - lng1 = numpy.radians(lng1) - lat2 = numpy.radians(lat2) - lng2 = numpy.radians(lng2) - - # If in combination mode, turn coordinates of array1 into column vectors for broadcasting - if comb: - lat1 = numpy.expand_dims(lat1, axis=0) - lng1 = numpy.expand_dims(lng1, axis=0) - lat2 = numpy.expand_dims(lat2, axis=1) - lng2 = numpy.expand_dims(lng2, axis=1) - - # calculate haversine - lat = lat2 - lat1 - lng = lng2 - lng1 - d = (numpy.sin(lat * 0.5) ** 2 - + numpy.cos(lat1) * numpy.cos(lat2) * numpy.sin(lng * 0.5) ** 2) - - return 2 * get_avg_earth_radius(unit) * numpy.arcsin(numpy.sqrt(d)) - - -def inverse_haversine(point, distance, direction: Union[Direction, float], unit=Unit.KILOMETERS): - - lat, lng = point - lat, lng = map(radians, (lat, lng)) - d = distance - r = get_avg_earth_radius(unit) - brng = direction.value if isinstance(direction, Direction) else direction - - return_lat = asin(sin(lat) * cos(d / r) + cos(lat) - * sin(d / r) * cos(brng)) - return_lng = lng + atan2(sin(brng) * sin(d / r) * - cos(lat), cos(d / r) - sin(lat) * sin(return_lat)) - - return_lat, return_lng = map(degrees, (return_lat, return_lng)) - return return_lat, return_lng \ No newline at end of file diff --git a/examples/lora_rangefinder/lora.py b/examples/lora_rangefinder/lora.py index 8d6eda5284987f324d55f79208b91c34e359cedf..ee558329456dc1e1b751d5db035164725b4841c7 100644 --- a/examples/lora_rangefinder/lora.py +++ b/examples/lora_rangefinder/lora.py @@ -2,19 +2,19 @@ from machine import Pin, SPI from sx127x import SX127x import sys from sx127x import SX127x -from time import sleep +import time lora_default = { 'frequency': 418500000, 'frequency_offset':0, 'tx_power_level': 1, 'signal_bandwidth': 250e3, - 'spreading_factor': 7, - 'coding_rate': 5, + 'spreading_factor': 9, + 'coding_rate': 8, 'preamble_length': 8, 'implicitHeader': False, - 'sync_word': 0x33, - 'enable_CRC': True, + 'sync_word': 0x12, + 'enable_CRC': False, 'invert_IQ': False, 'debug': False, } @@ -30,26 +30,20 @@ lora_spi = SPI( lora = SX127x(lora_spi, pins=lora_pins, parameters=lora_default) def loraTX(payload): - print('TX: {}'.format(payload)) - lora1.println(payload) + print('TX >>> {}'.format(payload)) + lora.println(payload) def loraRX(): + if lora.receivedPacket(): + curTime = time.time() try: payload = lora.readPayload().decode() rssi = lora.packetRssi() snr = lora.packetSnr() - print("RX: {} | RSSI: {}".format(payload, rssi, snr)) - result = [payload, rssi, snr] - return result + #print("RX: {} | RSSI: {} | SNR: {}".format(payload, rssi, snr)) + parts = payload.split() + rx = {'time': curTime, 'team': parts[0],'name': parts[1],'long': parts[3],'lat': parts[2], 'alt': parts[4],'sats': parts[5], 'err': parts[6], 'seq': parts[7], 'rssi': rssi, 'snr': snr} + return rx except Exception as e: - print(e) - - -counter = 0 -while True: - payload = 'Hello ({0})'.format(counter) - print('TX: {}'.format(payload)) - lora1.println(payload) - counter += 1 - sleep(3) \ No newline at end of file + print(e) \ No newline at end of file diff --git a/examples/lora_rangefinder/main.py b/examples/lora_rangefinder/main.py index 92c0571c296cb036a0faeaed6f22bd220ca13f99..e29192d690297b2d66d8be57c57026ea415868c0 100644 --- a/examples/lora_rangefinder/main.py +++ b/examples/lora_rangefinder/main.py @@ -1,77 +1,98 @@ -import functions -import uasyncio +from functions import bootup, haversine, rtc, log +import _thread import random import time import lora +from gps import getGPS, gpsModule +from machine import UART, Pin, I2C +import math +import time +import ssd1306 +# using default address 0x3C +i2c = I2C(sda=Pin(21), scl=Pin(22)) +display = ssd1306.SSD1306_I2C(128, 64, i2c) - -fix = [0.0,0.0,0.0,0,'',False] -staleTime = 60 +fix = {'time': 0, 'long':0.0, 'lat':0.0, 'alt':0.0, 'sats':0, 'err':0} +maxStale = 60 team = 'green' name = 'fox' seq = 0 +lastTX = 0 +rx = {'time': 0, 'team':'', 'name':'','long':'','lat':'', 'alt':'','sats':'', 'err':'', 'seq':'', 'rssi':'', 'snr':''} -async def staleGPS(): - while True: - if 0 <= staleTime <= 60: - time.sleep(1) - staleTime -= 1 - else: - fix[5] = False - - - -async def lora(): # green,fox,40.73777627962344,-74.05247938671874,170.4,12,5,269 - while True: - if random.randint(1, 1000) is 1 and fix[6] is True: - elements = [team, name, str(fix[0]), str(fix[1]), - str(fix[2]), str(fix[3]), str(fix[4]), - str(fix[5]), str(seq) ] - payload = ','.join(elements) - loraTX(payload) - else: - result = loraRX() - - - +def get_sec(time_str): + """Get seconds from time.""" + time = rtc.datetime() + h = time[0][3] + m = time[0][4] + s = time[0][5] + return int(h) * 3600 + int(m) * 60 + int(s) -async def getGPS(): +def gps(): + global fix while True: - g = gps() - if g[6]: - fix[0] = g[0] - fix[1] = g[1] - fix[2] = g[2] - fix[3] = g[3] - fix[4] = g[4] - fix[5] = g[5] - return + parts = getGPS() + if type(parts) is list and float(parts[8]) <= 10.0: + fix['time'] = time.time() + fix['lat'] = parts[2] + fix['long'] = '-' + parts[4] + fix['sats'] = parts[7] + fix['err'] = parts[8] + fix['alt'] = parts[9] + #fix['stale'] = time.time() + print('LAT: ' + str(fix['lat']) + ' LONG: ' + str(fix['long']) + ' ALT: ' + str(fix['alt']) + ' ERR: ' + str(fix['err'])) else: - time.sleep_ms(100) - - -uasyncio.run(main(Pin(1), Pin(2))) - - - - - - - - - + time.sleep(1) def main(): - - - - - - - - -main() \ No newline at end of file + global fix, rx, lastTX, maxStale, seq + while True: + time.sleep_ms(70) + curTime = time.time() + if (curTime % 2) == 0: + display.fill_rect(0, 121, 6, 127, 1) + display.show() + else: + display.fill_rect(0, 121, 6, 127, 1) + display.show() + if curTime - lastTX >= 10 and curTime - fix['time'] <= 30: + seq += 1 + elements = [team, name, str(fix['lat']), str(fix['long']), str(fix['alt']), str(fix['sats']), str(fix['err']), str(seq)] + payload = ' '.join(elements) + lora.loraTX(payload) + lastTX = time.time() + if rx := lora.loraRX(): + #display.rect(0,0,127,63, 1) + print("RX >>> " + str(rx)) + if curTime - fix['time'] <= 10: + #lpos = (float(fix['lat']), float(fix['long'])) + #rpos = (float(rx['lat']), float(rx['long'])) + dist = haversine(float(fix['long']), float(fix['lat']), float(rx['long']), float(rx['lat'])) + print(dist) + print("log!") + display.fill(0) + display.text(rx['name'] + ' seq:' + rx['seq'], 0, 0, 1) + display.text('d:' + str(round(dist, 0)) + 'M ' + 'e:' + rx['err'], 0, 9, 1) + #display.text('y' + rx['lat'][0:6] + 'x' + rx['long'][0:6], 2, 12, 1) + #display.text(' err:' + rx['err'], 0, 18, 1) + display.text('rssi:' + str(rx['rssi']) + ' snr:' + str(rx['snr']), 0, 18, 1 ) + display.fill_rect(0, 26, 127, 63, 1) + display.text(name, 0,26,0) + display.text('lat: ' + fix['lat'], 0, 33, 0) + display.text('long: ' + fix['long'], 0, 42, 0) + display.text('sats:' + fix['sats'] + ' e:' + fix['err'], 0, 51, 0) + display.show() + log(rx, fix, str(round(dist, 0))) + + + #print('.', end='\r') + print(str(time.time()), end = '\r') + +bootup() +_thread.start_new_thread(gps, ()) +_thread.start_new_thread(main, ()) +#_thread.start_new_thread(logger, ()) \ No newline at end of file diff --git a/examples/lora_rangefinder/sdcard.py b/examples/lora_rangefinder/sdcard.py deleted file mode 100644 index 93e644a3512f57a34b21e028264b9a9c19c12a1c..0000000000000000000000000000000000000000 --- a/examples/lora_rangefinder/sdcard.py +++ /dev/null @@ -1,292 +0,0 @@ -""" -MicroPython driver for SD cards using SPI bus. -Requires an SPI bus and a CS pin. Provides readblocks and writeblocks -methods so the device can be mounted as a filesystem. -Example usage on pyboard: - import pyb, sdcard, os - sd = sdcard.SDCard(pyb.SPI(1), pyb.Pin.board.X5) - pyb.mount(sd, '/sd2') - os.listdir('/') -Example usage on ESP8266: - import machine, sdcard, os - sd = sdcard.SDCard(machine.SPI(1), machine.Pin(15)) - os.mount(sd, '/sd') - os.listdir('/') -""" - -from micropython import const -import time - - -_CMD_TIMEOUT = const(100) - -_R1_IDLE_STATE = const(1 << 0) -# R1_ERASE_RESET = const(1 << 1) -_R1_ILLEGAL_COMMAND = const(1 << 2) -# R1_COM_CRC_ERROR = const(1 << 3) -# R1_ERASE_SEQUENCE_ERROR = const(1 << 4) -# R1_ADDRESS_ERROR = const(1 << 5) -# R1_PARAMETER_ERROR = const(1 << 6) -_TOKEN_CMD25 = const(0xFC) -_TOKEN_STOP_TRAN = const(0xFD) -_TOKEN_DATA = const(0xFE) - - -class SDCard: - def __init__(self, spi, cs, baudrate=1320000): - self.spi = spi - self.cs = cs - - self.cmdbuf = bytearray(6) - self.dummybuf = bytearray(512) - self.tokenbuf = bytearray(1) - for i in range(512): - self.dummybuf[i] = 0xFF - self.dummybuf_memoryview = memoryview(self.dummybuf) - - # initialise the card - self.init_card(baudrate) - - def init_spi(self, baudrate): - try: - master = self.spi.MASTER - except AttributeError: - # on ESP8266 - self.spi.init(baudrate=baudrate, phase=0, polarity=0) - else: - # on pyboard - self.spi.init(master, baudrate=baudrate, phase=0, polarity=0) - - def init_card(self, baudrate): - - # init CS pin - self.cs.init(self.cs.OUT, value=1) - - # init SPI bus; use low data rate for initialisation - self.init_spi(100000) - - # clock card at least 100 cycles with cs high - for i in range(16): - self.spi.write(b"\xff") - - # CMD0: init card; should return _R1_IDLE_STATE (allow 5 attempts) - for _ in range(5): - if self.cmd(0, 0, 0x95) == _R1_IDLE_STATE: - break - else: - raise OSError("no SD card") - - # CMD8: determine card version - r = self.cmd(8, 0x01AA, 0x87, 4) - if r == _R1_IDLE_STATE: - self.init_card_v2() - elif r == (_R1_IDLE_STATE | _R1_ILLEGAL_COMMAND): - self.init_card_v1() - else: - raise OSError("couldn't determine SD card version") - - # get the number of sectors - # CMD9: response R2 (R1 byte + 16-byte block read) - if self.cmd(9, 0, 0, 0, False) != 0: - raise OSError("no response from SD card") - csd = bytearray(16) - self.readinto(csd) - if csd[0] & 0xC0 == 0x40: # CSD version 2.0 - self.sectors = ((csd[8] << 8 | csd[9]) + 1) * 1024 - elif csd[0] & 0xC0 == 0x00: # CSD version 1.0 (old, <=2GB) - c_size = (csd[6] & 0b11) << 10 | csd[7] << 2 | csd[8] >> 6 - c_size_mult = (csd[9] & 0b11) << 1 | csd[10] >> 7 - read_bl_len = csd[5] & 0b1111 - capacity = (c_size + 1) * (2 ** (c_size_mult + 2)) * (2**read_bl_len) - self.sectors = capacity // 512 - else: - raise OSError("SD card CSD format not supported") - # print('sectors', self.sectors) - - # CMD16: set block length to 512 bytes - if self.cmd(16, 512, 0) != 0: - raise OSError("can't set 512 block size") - - # set to high data rate now that it's initialised - self.init_spi(baudrate) - - def init_card_v1(self): - for i in range(_CMD_TIMEOUT): - self.cmd(55, 0, 0) - if self.cmd(41, 0, 0) == 0: - # SDSC card, uses byte addressing in read/write/erase commands - self.cdv = 512 - # print("[SDCard] v1 card") - return - raise OSError("timeout waiting for v1 card") - - def init_card_v2(self): - for i in range(_CMD_TIMEOUT): - time.sleep_ms(50) - self.cmd(58, 0, 0, 4) - self.cmd(55, 0, 0) - if self.cmd(41, 0x40000000, 0) == 0: - self.cmd(58, 0, 0, -4) # 4-byte response, negative means keep the first byte - ocr = self.tokenbuf[0] # get first byte of response, which is OCR - if not ocr & 0x40: - # SDSC card, uses byte addressing in read/write/erase commands - self.cdv = 512 - else: - # SDHC/SDXC card, uses block addressing in read/write/erase commands - self.cdv = 1 - # print("[SDCard] v2 card") - return - raise OSError("timeout waiting for v2 card") - - def cmd(self, cmd, arg, crc, final=0, release=True, skip1=False): - self.cs(0) - - # create and send the command - buf = self.cmdbuf - buf[0] = 0x40 | cmd - buf[1] = arg >> 24 - buf[2] = arg >> 16 - buf[3] = arg >> 8 - buf[4] = arg - buf[5] = crc - self.spi.write(buf) - - if skip1: - self.spi.readinto(self.tokenbuf, 0xFF) - - # wait for the response (response[7] == 0) - for i in range(_CMD_TIMEOUT): - self.spi.readinto(self.tokenbuf, 0xFF) - response = self.tokenbuf[0] - if not (response & 0x80): - # this could be a big-endian integer that we are getting here - # if final<0 then store the first byte to tokenbuf and discard the rest - if final < 0: - self.spi.readinto(self.tokenbuf, 0xFF) - final = -1 - final - for j in range(final): - self.spi.write(b"\xff") - if release: - self.cs(1) - self.spi.write(b"\xff") - return response - - # timeout - self.cs(1) - self.spi.write(b"\xff") - return -1 - - def readinto(self, buf): - self.cs(0) - - # read until start byte (0xff) - for i in range(_CMD_TIMEOUT): - self.spi.readinto(self.tokenbuf, 0xFF) - if self.tokenbuf[0] == _TOKEN_DATA: - break - time.sleep_ms(1) - else: - self.cs(1) - raise OSError("timeout waiting for response") - - # read data - mv = self.dummybuf_memoryview - if len(buf) != len(mv): - mv = mv[: len(buf)] - self.spi.write_readinto(mv, buf) - - # read checksum - self.spi.write(b"\xff") - self.spi.write(b"\xff") - - self.cs(1) - self.spi.write(b"\xff") - - def write(self, token, buf): - self.cs(0) - - # send: start of block, data, checksum - self.spi.read(1, token) - self.spi.write(buf) - self.spi.write(b"\xff") - self.spi.write(b"\xff") - - # check the response - if (self.spi.read(1, 0xFF)[0] & 0x1F) != 0x05: - self.cs(1) - self.spi.write(b"\xff") - return - - # wait for write to finish - while self.spi.read(1, 0xFF)[0] == 0: - pass - - self.cs(1) - self.spi.write(b"\xff") - - def write_token(self, token): - self.cs(0) - self.spi.read(1, token) - self.spi.write(b"\xff") - # wait for write to finish - while self.spi.read(1, 0xFF)[0] == 0x00: - pass - - self.cs(1) - self.spi.write(b"\xff") - - def readblocks(self, block_num, buf): - nblocks = len(buf) // 512 - assert nblocks and not len(buf) % 512, "Buffer length is invalid" - if nblocks == 1: - # CMD17: set read address for single block - if self.cmd(17, block_num * self.cdv, 0, release=False) != 0: - # release the card - self.cs(1) - raise OSError(5) # EIO - # receive the data and release card - self.readinto(buf) - else: - # CMD18: set read address for multiple blocks - if self.cmd(18, block_num * self.cdv, 0, release=False) != 0: - # release the card - self.cs(1) - raise OSError(5) # EIO - offset = 0 - mv = memoryview(buf) - while nblocks: - # receive the data and release card - self.readinto(mv[offset : offset + 512]) - offset += 512 - nblocks -= 1 - if self.cmd(12, 0, 0xFF, skip1=True): - raise OSError(5) # EIO - - def writeblocks(self, block_num, buf): - nblocks, err = divmod(len(buf), 512) - assert nblocks and not err, "Buffer length is invalid" - if nblocks == 1: - # CMD24: set write address for single block - if self.cmd(24, block_num * self.cdv, 0) != 0: - raise OSError(5) # EIO - - # send the data - self.write(_TOKEN_DATA, buf) - else: - # CMD25: set write address for first block - if self.cmd(25, block_num * self.cdv, 0) != 0: - raise OSError(5) # EIO - # send the data - offset = 0 - mv = memoryview(buf) - while nblocks: - self.write(_TOKEN_CMD25, mv[offset : offset + 512]) - offset += 512 - nblocks -= 1 - self.write_token(_TOKEN_STOP_TRAN) - - def ioctl(self, op, arg): - if op == 4: # get number of blocks - return self.sectors - if op == 5: # get block size in bytes - return 512 \ No newline at end of file