Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fixed linting errors reported by pyflakes and pycodestyle #51

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
166 changes: 67 additions & 99 deletions hx711.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import threading



class HX711:

def __init__(self, dout, pd_sck, gain=128):
Expand All @@ -16,7 +15,7 @@ def __init__(self, dout, pd_sck, gain=128):
# Mutex for reading from the HX711, in case multiple threads in client
# software try to access get values from the class at the same time.
self.readLock = threading.Lock()

GPIO.setmode(GPIO.BCM)
GPIO.setup(self.PD_SCK, GPIO.OUT)
GPIO.setup(self.DOUT, GPIO.IN)
Expand All @@ -42,29 +41,25 @@ def __init__(self, dout, pd_sck, gain=128):
# Think about whether this is necessary.
time.sleep(1)


def convertFromTwosComplement24bit(self, inputValue):
return -(inputValue & 0x800000) + (inputValue & 0x7fffff)


def is_ready(self):
return GPIO.input(self.DOUT) == 0


def set_gain(self, gain):
if gain is 128:
if gain == 128:
self.GAIN = 1
elif gain is 64:
elif gain == 64:
self.GAIN = 3
elif gain is 32:
elif gain == 32:
self.GAIN = 2

GPIO.output(self.PD_SCK, False)

# Read out a set of raw bytes and throw it away.
self.readRawBytes()


def get_gain(self):
if self.GAIN == 1:
return 128
Expand All @@ -75,36 +70,33 @@ def get_gain(self):

# Shouldn't get here.
return 0


def readNextBit(self):
# Clock HX711 Digital Serial Clock (PD_SCK). DOUT will be
# ready 1us after PD_SCK rising edge, so we sample after
# lowering PD_SCL, when we know DOUT will be stable.
GPIO.output(self.PD_SCK, True)
GPIO.output(self.PD_SCK, False)
value = GPIO.input(self.DOUT)

# Convert Boolean to int and return it.
return int(value)
# Clock HX711 Digital Serial Clock (PD_SCK). DOUT will be
# ready 1us after PD_SCK rising edge, so we sample after
# lowering PD_SCL, when we know DOUT will be stable.
GPIO.output(self.PD_SCK, True)
GPIO.output(self.PD_SCK, False)
value = GPIO.input(self.DOUT)

# Convert Boolean to int and return it.
return int(value)

def readNextByte(self):
byteValue = 0

# Read bits and build the byte from top, or bottom, depending
# on whether we are in MSB or LSB bit mode.
for x in range(8):
if self.bit_format == 'MSB':
byteValue <<= 1
byteValue |= self.readNextBit()
else:
byteValue >>= 1
byteValue |= self.readNextBit() * 0x80

# Return the packed byte.
return byteValue

byteValue = 0

# Read bits and build the byte from top, or bottom, depending
# on whether we are in MSB or LSB bit mode.
for x in range(8):
if self.bit_format == 'MSB':
byteValue <<= 1
byteValue |= self.readNextBit()
else:
byteValue >>= 1
byteValue |= self.readNextBit() * 0x80

# Return the packed byte.
return byteValue

def readRawBytes(self):
# Wait for and get the Read Lock, incase another thread is already
Expand All @@ -113,47 +105,45 @@ def readRawBytes(self):

# Wait until HX711 is ready for us to read a sample.
while not self.is_ready():
pass
pass

# Read three bytes of data from the HX711.
firstByte = self.readNextByte()
firstByte = self.readNextByte()
secondByte = self.readNextByte()
thirdByte = self.readNextByte()
thirdByte = self.readNextByte()

# HX711 Channel and gain factor are set by number of bits read
# after 24 data bits.
for i in range(self.GAIN):
# Clock a bit out of the HX711 and throw it away.
self.readNextBit()
# Clock a bit out of the HX711 and throw it away.
self.readNextBit()

# Release the Read Lock, now that we've finished driving the HX711
# serial interface.
self.readLock.release()
self.readLock.release()

# Depending on how we're configured, return an orderd list of raw byte
# values.
if self.byte_format == 'LSB':
return [thirdByte, secondByte, firstByte]
return [thirdByte, secondByte, firstByte]
else:
return [firstByte, secondByte, thirdByte]

return [firstByte, secondByte, thirdByte]

def read_long(self):
# Get a sample from the HX711 in the form of raw bytes.
dataBytes = self.readRawBytes()


if self.DEBUG_PRINTING:
print(dataBytes,)

# Join the raw bytes into a single 24bit 2s complement value.
twosComplementValue = ((dataBytes[0] << 16) |
(dataBytes[1] << 8) |
(dataBytes[1] << 8) |
dataBytes[2])

if self.DEBUG_PRINTING:
print("Twos: 0x%06x" % twosComplementValue)

# Convert from 24bit twos-complement to a signed value.
signedIntValue = self.convertFromTwosComplement24bit(twosComplementValue)

Expand All @@ -163,7 +153,6 @@ def read_long(self):
# Return the sample value we've read from the HX711.
return int(signedIntValue)


def read_average(self, times=3):
# Make sure we've been asked to take a rational amount of samples.
if times <= 0:
Expand All @@ -178,16 +167,18 @@ def read_average(self, times=3):
if times < 5:
return self.read_median(times)

# If we're taking a lot of samples, we'll collect them in a list, remove
# the outliers, then take the mean of the remaining set.
# If we're taking a lot of samples, we'll collect them in a
# list, remove the outliers, then take the mean of the
# remaining set.
valueList = []

for x in range(times):
valueList += [self.read_long()]

valueList.sort()

# We'll be trimming 20% of outlier samples from top and bottom of collected set.
# We'll be trimming 20% of outlier samples from top and bottom
# of collected set.
trimAmount = int(len(valueList) * 0.2)

# Trim the edge case values.
Expand All @@ -196,43 +187,39 @@ def read_average(self, times=3):
# Return the mean of remaining samples.
return sum(valueList) / len(valueList)


# A median-based read method, might help when getting random value spikes
# for unknown or CPU-related reasons
def read_median(self, times=3):
if times <= 0:
raise ValueError("HX711::read_median(): times must be greater than zero!")

# If times == 1, just return a single reading.
if times == 1:
return self.read_long()
if times <= 0:
raise ValueError("HX711::read_median(): times must be greater than zero!")

valueList = []
# If times == 1, just return a single reading.
if times == 1:
return self.read_long()

for x in range(times):
valueList += [self.read_long()]
valueList = []

valueList.sort()
for x in range(times):
valueList += [self.read_long()]

# If times is odd we can just take the centre value.
if (times & 0x1) == 0x1:
return valueList[len(valueList) // 2]
else:
# If times is even we have to take the arithmetic mean of
# the two middle values.
midpoint = len(valueList) / 2
return sum(valueList[midpoint:midpoint+2]) / 2.0
valueList.sort()

# If times is odd we can just take the centre value.
if (times & 0x1) == 0x1:
return valueList[len(valueList) // 2]
else:
# If times is even we have to take the arithmetic mean of
# the two middle values.
midpoint = len(valueList) / 2
return sum(valueList[midpoint:midpoint+2]) / 2.0

# Compatibility function, uses channel A version
def get_value(self, times=3):
return self.get_value_A(times)


def get_value_A(self, times=3):
return self.read_median(times) - self.get_offset_A()


def get_value_B(self, times=3):
# for channel B, we need to set_gain(32)
g = self.get_gain()
Expand All @@ -245,7 +232,6 @@ def get_value_B(self, times=3):
def get_weight(self, times=3):
return self.get_weight_A(times)


def get_weight_A(self, times=3):
value = self.get_value_A(times)
value = value / self.REFERENCE_UNIT
Expand All @@ -256,30 +242,27 @@ def get_weight_B(self, times=3):
value = value / self.REFERENCE_UNIT_B
return value


# Sets tare for channel A for compatibility purposes
def tare(self, times=15):
return self.tare_A(times)



def tare_A(self, times=15):
# Backup REFERENCE_UNIT value
backupReferenceUnit = self.get_reference_unit_A()
self.set_reference_unit_A(1)

value = self.read_average(times)

if self.DEBUG_PRINTING:
print("Tare A value:", value)

self.set_offset_A(value)

# Restore the reference unit, now that we've got our offset.
self.set_reference_unit_A(backupReferenceUnit)

return value


def tare_B(self, times=15):
# Backup REFERENCE_UNIT value
backupReferenceUnit = self.get_reference_unit_B()
Expand All @@ -293,17 +276,15 @@ def tare_B(self, times=15):

if self.DEBUG_PRINTING:
print("Tare B value:", value)

self.set_offset_B(value)

# Restore gain/channel/reference unit settings.
self.set_gain(backupGain)
self.set_reference_unit_B(backupReferenceUnit)

return value

return value


def set_reading_format(self, byte_format="LSB", bit_format="MSB"):
if byte_format == "LSB":
self.byte_format = byte_format
Expand All @@ -319,9 +300,6 @@ def set_reading_format(self, byte_format="LSB", bit_format="MSB"):
else:
raise ValueError("Unrecognised bitformat: \"%s\"" % bit_format)




# sets offset for channel A for compatibility reasons
def set_offset(self, offset):
self.set_offset_A(offset)
Expand All @@ -341,12 +319,9 @@ def get_offset_A(self):
def get_offset_B(self):
return self.OFFSET_B



def set_reference_unit(self, reference_unit):
self.set_reference_unit_A(reference_unit)


def set_reference_unit_A(self, reference_unit):
# Make sure we aren't asked to use an invalid reference unit.
if reference_unit == 0:
Expand All @@ -355,7 +330,6 @@ def set_reference_unit_A(self, reference_unit):

self.REFERENCE_UNIT = reference_unit


def set_reference_unit_B(self, reference_unit):
# Make sure we aren't asked to use an invalid reference unit.
if reference_unit == 0:
Expand All @@ -364,19 +338,15 @@ def set_reference_unit_B(self, reference_unit):

self.REFERENCE_UNIT_B = reference_unit


def get_reference_unit(self):
return get_reference_unit_A()
return self.get_reference_unit_A()


def get_reference_unit_A(self):
return self.REFERENCE_UNIT


def get_reference_unit_B(self):
return self.REFERENCE_UNIT_B



def power_down(self):
# Wait for and get the Read Lock, incase another thread is already
# driving the HX711 serial interface.
Expand All @@ -392,8 +362,7 @@ def power_down(self):

# Release the Read Lock, now that we've finished driving the HX711
# serial interface.
self.readLock.release()

self.readLock.release()

def power_up(self):
# Wait for and get the Read Lock, incase another thread is already
Expand All @@ -417,7 +386,6 @@ def power_up(self):
if self.get_gain() != 128:
self.readRawBytes()


def reset(self):
self.power_down()
self.power_up()
Expand Down