import inspect
import os
import logging
-from ctypes import *
+import struct
+import fcntl
from time import sleep
DEBUG=0
+I2C_FILENAME = "/dev/i2c-2"
logger = logging.getLogger(__name__)
class i2c:
- libc = CDLL("libc.so.6")
I2C_SLAVE = 0x0703 # Use this slave address
__single = None
__lock = threading.Lock()
logger.warning("Error: (%s) I2C blocked %fs by %s!", parent, count*0.001, parent_owner)
i2c.__parent_owner = inspect.stack()[1]
i2c.__single = True
- self.dev = i2c.libc.open("/dev/i2c-2", os.O_RDWR)
+ self.dev = os.open(I2C_FILENAME, os.O_RDWR)
if self.dev < 0:
raise IOError("open")
- err = i2c.libc.ioctl(self.dev, i2c.I2C_SLAVE, addr>>1)
- if err < 0:
- raise IOError("ioctl")
+ fcntl.flock(self.dev, fcntl.LOCK_EX)
+ fcntl.ioctl(self.dev, i2c.I2C_SLAVE, addr>>1)
def write(self, s):
- num_write = i2c.libc.write(self.dev, s, len(s))
+ num_write = os.write(self.dev, s)
if num_write != len(s):
self.close()
raise IOError("write: %d" % (num_write))
def read(self, num):
- buf = create_string_buffer(num)
- num_read = i2c.libc.read(self.dev, buf, num)
- if num_read != num:
+ buf = os.read(self.dev, num)
+ if len(buf) != num:
self.close()
- raise IOError("read: %d" % (num_read))
- return buf.raw
+ raise IOError("read: %d" % (len(buf)))
+ return buf
def close(self):
if self.dev:
- i2c.libc.close(self.dev)
+ os.close(self.dev)
self.dev = None
- #i2c.__parent_owner = None
i2c.__single = None
- def __del__(self):
- self.close()
+
+def i2c_write_reg(addr, reg, buf=""):
+ dev = i2c(addr)
+ s = struct.pack(">B", reg) + buf
+ dev.write(s)
+ dev.close()
+
+
+def i2c_read_reg(addr, reg, num=1):
+ i2c_write_reg(addr, reg)
+
+ dev = i2c(addr)
+ s = dev.read(num)
+ dev.close()
+ return s
+
if __name__ == "__main__":
- import struct
import sys
dev = i2c(0x50)