import logging
import struct
import fcntl
-from ctypes import *
from time import sleep
DEBUG=0
logger = logging.getLogger(__name__)
class i2c:
- libc = CDLL("libc.so.6")
I2C_SLAVE = 0x0703 # Use this slave address
__single = None
__lock = threading.Lock()
logger.warning("Error: (%s) I2C blocked %fs by %s!", parent, count*0.001, parent_owner)
i2c.__parent_owner = inspect.stack()[1]
i2c.__single = True
- self.dev = i2c.libc.open(I2C_FILENAME, os.O_RDWR)
+ self.dev = os.open(I2C_FILENAME, os.O_RDWR)
if self.dev < 0:
raise IOError("open")
fcntl.flock(self.dev, fcntl.LOCK_EX)
- err = i2c.libc.ioctl(self.dev, i2c.I2C_SLAVE, addr>>1)
- if err < 0:
- raise IOError("ioctl")
+ fcntl.ioctl(self.dev, i2c.I2C_SLAVE, addr>>1)
def write(self, s):
- num_write = i2c.libc.write(self.dev, s, len(s))
+ num_write = os.write(self.dev, s)
if num_write != len(s):
self.close()
raise IOError("write: %d" % (num_write))
def read(self, num):
- buf = create_string_buffer(num)
- num_read = i2c.libc.read(self.dev, buf, num)
- if num_read != num:
+ buf = os.read(self.dev, num)
+ if len(buf) != num:
self.close()
- raise IOError("read: %d" % (num_read))
- return buf.raw
+ raise IOError("read: %d" % (len(buf)))
+ return buf
def close(self):
if self.dev:
- i2c.libc.close(self.dev)
+ os.close(self.dev)
self.dev = None
- #i2c.__parent_owner = None
i2c.__single = None
def __del__(self):
if __name__ == "__main__":
- import struct
import sys
dev = i2c(0x50)