]> defiant.homedns.org Git - pyshared.git/blob - i2c.py
i2c: use python low level api instead ob c calls
[pyshared.git] / i2c.py
1 #!/usr/bin/env python
2 # -*- coding: iso-8859-15 -*-
3
4 import threading
5 import inspect
6 import os
7 import logging
8 import struct
9 import fcntl
10 from time import sleep
11
12 DEBUG=0
13 I2C_FILENAME = "/dev/i2c-2"
14 logger = logging.getLogger(__name__)
15
16 class i2c:
17         I2C_SLAVE = 0x0703  # Use this slave address
18         __single = None
19         __lock = threading.Lock()
20         __parent_owner = None
21
22         def __init__(self, addr):
23                 with i2c.__lock:
24                         count = 0
25                         while(i2c.__single):
26                                 parent = inspect.stack()[1][3]
27                                 count += 1
28                                 sleep(0.001)
29                         if DEBUG:
30                                 if count > 10:
31                                         parent_owner = "%s (%d), %s()" % (self.__parent_owner[1], self.__parent_owner[2], self.__parent_owner[3])
32                                         logger.warning("Error: (%s) I2C blocked %fs by %s!", parent, count*0.001, parent_owner)
33                                 i2c.__parent_owner = inspect.stack()[1]
34                         i2c.__single = True
35                 self.dev = os.open(I2C_FILENAME, os.O_RDWR)
36                 if self.dev < 0:
37                         raise IOError("open")
38                 fcntl.flock(self.dev, fcntl.LOCK_EX)
39                 fcntl.ioctl(self.dev, i2c.I2C_SLAVE, addr>>1)
40
41         def write(self, s):
42                 num_write = os.write(self.dev, s)
43                 if num_write != len(s):
44                         self.close()
45                         raise IOError("write: %d" % (num_write))
46         
47         def read(self, num):
48                 buf = os.read(self.dev, num)
49                 if len(buf) != num:
50                         self.close()
51                         raise IOError("read: %d" % (len(buf)))
52                 return buf
53
54         def close(self):
55                 if self.dev:
56                         os.close(self.dev)
57                         self.dev = None
58                         i2c.__single = None
59
60         def __del__(self):
61                 self.close()
62
63
64 def i2c_write_reg(addr, reg, buf):
65         dev = i2c(addr)
66         s = struct.pack(">B", reg) + buf
67         dev.write(s)
68         dev.close()
69
70
71 def i2c_read_reg(addr, reg, num=1):
72         dev = i2c(addr)
73         s = struct.pack(">B", reg)
74         dev.write(s)
75         s = dev.read(num)
76         dev.close()
77         return s
78
79
80 if __name__ == "__main__":
81         import sys
82
83         dev = i2c(0x50)
84         s = struct.pack(">Bh", int(sys.argv[1]), int(sys.argv[2]))
85         dev.write(s)
86         dev.close()