summaryrefslogtreecommitdiff
path: root/pybluez/advanced
diff options
context:
space:
mode:
authorTim Redfern <tim@eclectronics.org>2011-12-19 18:20:33 +0000
committerTim Redfern <tim@eclectronics.org>2011-12-19 18:20:33 +0000
commite9a73bbb3c14af340999f70146747787785f4fee (patch)
treea125452f7d641673286542497da051b810427880 /pybluez/advanced
initial commit
Diffstat (limited to 'pybluez/advanced')
-rw-r--r--pybluez/advanced/inquiry-with-rssi.py148
-rw-r--r--pybluez/advanced/l2-mtu.py60
-rw-r--r--pybluez/advanced/l2-unreliable-client.py76
-rw-r--r--pybluez/advanced/l2-unreliable-server.py34
-rw-r--r--pybluez/advanced/read-local-bdaddr.py35
-rw-r--r--pybluez/advanced/write-inquiry-scan.py90
6 files changed, 443 insertions, 0 deletions
diff --git a/pybluez/advanced/inquiry-with-rssi.py b/pybluez/advanced/inquiry-with-rssi.py
new file mode 100644
index 0000000..814f7ed
--- /dev/null
+++ b/pybluez/advanced/inquiry-with-rssi.py
@@ -0,0 +1,148 @@
+# performs a simple device inquiry, followed by a remote name request of each
+# discovered device
+
+import os
+import sys
+import struct
+import bluetooth._bluetooth as bluez
+
+def printpacket(pkt):
+ for c in pkt:
+ sys.stdout.write("%02x " % struct.unpack("B",c)[0])
+ print
+
+
+def read_inquiry_mode(sock):
+ """returns the current mode, or -1 on failure"""
+ # save current filter
+ old_filter = sock.getsockopt( bluez.SOL_HCI, bluez.HCI_FILTER, 14)
+
+ # Setup socket filter to receive only events related to the
+ # read_inquiry_mode command
+ flt = bluez.hci_filter_new()
+ opcode = bluez.cmd_opcode_pack(bluez.OGF_HOST_CTL,
+ bluez.OCF_READ_INQUIRY_MODE)
+ bluez.hci_filter_set_ptype(flt, bluez.HCI_EVENT_PKT)
+ bluez.hci_filter_set_event(flt, bluez.EVT_CMD_COMPLETE);
+ bluez.hci_filter_set_opcode(flt, opcode)
+ sock.setsockopt( bluez.SOL_HCI, bluez.HCI_FILTER, flt )
+
+ # first read the current inquiry mode.
+ bluez.hci_send_cmd(sock, bluez.OGF_HOST_CTL,
+ bluez.OCF_READ_INQUIRY_MODE )
+
+ pkt = sock.recv(255)
+
+ status,mode = struct.unpack("xxxxxxBB", pkt)
+ if status != 0: mode = -1
+
+ # restore old filter
+ sock.setsockopt( bluez.SOL_HCI, bluez.HCI_FILTER, old_filter )
+ return mode
+
+def write_inquiry_mode(sock, mode):
+ """returns 0 on success, -1 on failure"""
+ # save current filter
+ old_filter = sock.getsockopt( bluez.SOL_HCI, bluez.HCI_FILTER, 14)
+
+ # Setup socket filter to receive only events related to the
+ # write_inquiry_mode command
+ flt = bluez.hci_filter_new()
+ opcode = bluez.cmd_opcode_pack(bluez.OGF_HOST_CTL,
+ bluez.OCF_WRITE_INQUIRY_MODE)
+ bluez.hci_filter_set_ptype(flt, bluez.HCI_EVENT_PKT)
+ bluez.hci_filter_set_event(flt, bluez.EVT_CMD_COMPLETE);
+ bluez.hci_filter_set_opcode(flt, opcode)
+ sock.setsockopt( bluez.SOL_HCI, bluez.HCI_FILTER, flt )
+
+ # send the command!
+ bluez.hci_send_cmd(sock, bluez.OGF_HOST_CTL,
+ bluez.OCF_WRITE_INQUIRY_MODE, struct.pack("B", mode) )
+
+ pkt = sock.recv(255)
+
+ status = struct.unpack("xxxxxxB", pkt)[0]
+
+ # restore old filter
+ sock.setsockopt( bluez.SOL_HCI, bluez.HCI_FILTER, old_filter )
+ if status != 0: return -1
+ return 0
+
+def device_inquiry_with_with_rssi(sock):
+ # save current filter
+ old_filter = sock.getsockopt( bluez.SOL_HCI, bluez.HCI_FILTER, 14)
+
+ # perform a device inquiry on bluetooth device #0
+ # The inquiry should last 8 * 1.28 = 10.24 seconds
+ # before the inquiry is performed, bluez should flush its cache of
+ # previously discovered devices
+ flt = bluez.hci_filter_new()
+ bluez.hci_filter_all_events(flt)
+ bluez.hci_filter_set_ptype(flt, bluez.HCI_EVENT_PKT)
+ sock.setsockopt( bluez.SOL_HCI, bluez.HCI_FILTER, flt )
+
+ duration = 4
+ max_responses = 255
+ cmd_pkt = struct.pack("BBBBB", 0x33, 0x8b, 0x9e, duration, max_responses)
+ bluez.hci_send_cmd(sock, bluez.OGF_LINK_CTL, bluez.OCF_INQUIRY, cmd_pkt)
+
+ results = []
+
+ done = False
+ while not done:
+ pkt = sock.recv(255)
+ ptype, event, plen = struct.unpack("BBB", pkt[:3])
+ if event == bluez.EVT_INQUIRY_RESULT_WITH_RSSI:
+ pkt = pkt[3:]
+ nrsp = struct.unpack("B", pkt[0])[0]
+ for i in range(nrsp):
+ addr = bluez.ba2str( pkt[1+6*i:1+6*i+6] )
+ rssi = struct.unpack("b", pkt[1+13*nrsp+i])[0]
+ results.append( ( addr, rssi ) )
+ print "[%s] RSSI: [%d]" % (addr, rssi)
+ elif event == bluez.EVT_INQUIRY_COMPLETE:
+ done = True
+ elif event == bluez.EVT_CMD_STATUS:
+ status, ncmd, opcode = struct.unpack("BBH", pkt[3:7])
+ if status != 0:
+ print "uh oh..."
+ printpacket(pkt[3:7])
+ done = True
+ else:
+ print "unrecognized packet type 0x%02x" % ptype
+
+
+ # restore old filter
+ sock.setsockopt( bluez.SOL_HCI, bluez.HCI_FILTER, old_filter )
+
+ return results
+
+dev_id = 0
+try:
+ sock = bluez.hci_open_dev(dev_id)
+except:
+ print "error accessing bluetooth device..."
+ sys.exit(1)
+
+try:
+ mode = read_inquiry_mode(sock)
+except Exception, e:
+ print "error reading inquiry mode. "
+ print "Are you sure this a bluetooth 1.2 device?"
+ print e
+ sys.exit(1)
+print "current inquiry mode is %d" % mode
+
+if mode != 1:
+ print "writing inquiry mode..."
+ try:
+ result = write_inquiry_mode(sock, 1)
+ except Exception, e:
+ print "error writing inquiry mode. Are you sure you're root?"
+ print e
+ sys.exit(1)
+ if result != 0:
+ print "error while setting inquiry mode"
+ print "result: %d" % result
+
+device_inquiry_with_with_rssi(sock)
diff --git a/pybluez/advanced/l2-mtu.py b/pybluez/advanced/l2-mtu.py
new file mode 100644
index 0000000..1ffe04c
--- /dev/null
+++ b/pybluez/advanced/l2-mtu.py
@@ -0,0 +1,60 @@
+import sys
+import struct
+import bluetooth
+
+def usage():
+ print "usage: l2-mtu < server | client > [options]"
+ print ""
+ print "l2-mtu server to start in server mode"
+ print "l2-mtu client <addr> to start in client mode and connect to addr"
+ sys.exit(2)
+
+if len(sys.argv) < 2: usage()
+
+mode = sys.argv[1]
+if mode not in [ "client", "server" ]: usage()
+
+if mode == "server":
+ server_sock=bluetooth.BluetoothSocket( bluetooth.L2CAP )
+ server_sock.bind(("",0x1001))
+ server_sock.listen(1)
+ while True:
+ print "waiting for incoming connection"
+ client_sock,address = server_sock.accept()
+ print "Accepted connection from %s" % str(address)
+
+ bluetooth.set_l2cap_mtu( client_sock, 65535 )
+
+ print "waiting for data"
+ total = 0
+ while True:
+ try:
+ data = client_sock.recv(65535)
+ except bluetooth.BluetoothError, e:
+ break
+ if len(data) == 0: break
+ print "received packet of size %d" % len(data)
+
+ client_sock.close()
+
+ print "connection closed"
+
+ server_sock.close()
+else:
+ sock=bluetooth.BluetoothSocket(bluetooth.L2CAP)
+
+ bt_addr = sys.argv[2]
+ print "trying to connect to %s:1001" % bt_addr
+ port = 0x1001
+ sock.connect((bt_addr, port))
+
+ print "connected. Adjusting link parameters."
+ bluetooth.set_l2cap_mtu( sock, 65535 )
+
+ totalsent = 0
+ for i in range(1, 65535, 100):
+ pkt = "0" * i
+ sent = sock.send(pkt)
+ print "sent packet of size %d (tried %d)" % (sent, len(pkt))
+
+ sock.close()
diff --git a/pybluez/advanced/l2-unreliable-client.py b/pybluez/advanced/l2-unreliable-client.py
new file mode 100644
index 0000000..5b10a48
--- /dev/null
+++ b/pybluez/advanced/l2-unreliable-client.py
@@ -0,0 +1,76 @@
+import sys
+import fcntl
+import struct
+import array
+import bluetooth
+import bluetooth._bluetooth as bt # low level bluetooth wrappers.
+
+def __get_acl_conn_handle(sock, addr):
+ hci_fd = sock.fileno()
+ reqstr = struct.pack( "6sB17s", bt.str2ba(addr), bt.ACL_LINK, "\0" * 17)
+ request = array.array( "c", reqstr )
+ fcntl.ioctl( hci_fd, bt.HCIGETCONNINFO, request, 1 )
+ handle = struct.unpack("8xH14x", request.tostring())[0]
+ return handle
+
+def write_flush_timeout( addr, timeout ):
+ hci_sock = bt.hci_open_dev()
+ # get the ACL connection handle to the remote device
+ handle = __get_acl_conn_handle(hci_sock, addr)
+ pkt = struct.pack("HH", handle, bt.htobs(timeout))
+ response = bt.hci_send_req(hci_sock, bt.OGF_HOST_CTL,
+ 0x0028, bt.EVT_CMD_COMPLETE, 3, pkt)
+ status = struct.unpack("B", response[0])[0]
+ rhandle = struct.unpack("H", response[1:3])[0]
+ assert rhandle == handle
+ assert status == 0
+
+def read_flush_timeout( addr ):
+ hci_sock = bt.hci_open_dev()
+ # get the ACL connection handle to the remote device
+ handle = __get_acl_conn_handle(hci_sock, addr)
+ pkt = struct.pack("H", handle)
+ response = bt.hci_send_req(hci_sock, bt.OGF_HOST_CTL,
+ 0x0027, bt.EVT_CMD_COMPLETE, 5, pkt)
+ status = struct.unpack("B", response[0])[0]
+ rhandle = struct.unpack("H", response[1:3])[0]
+ assert rhandle == handle
+ assert status == 0
+ fto = struct.unpack("H", response[3:5])[0]
+ return fto
+
+# Create the client socket
+sock=bluetooth.BluetoothSocket(bluetooth.L2CAP)
+
+if len(sys.argv) < 4:
+ print "usage: l2capclient.py <addr> <timeout> <num_packets>"
+ print " address - device that l2-unreliable-server is running on"
+ print " timeout - wait timeout * 0.625ms before dropping unACK'd packets"
+ print " num_packets - number of 627-byte packets to send on connect"
+ sys.exit(2)
+
+bt_addr=sys.argv[1]
+timeout = int(sys.argv[2])
+num_packets = int(sys.argv[3])
+
+print "trying to connect to %s:1001" % bt_addr
+port = 0x1001
+sock.connect((bt_addr, port))
+
+print "connected. Adjusting link parameters."
+print "current flush timeout is %d ms" % read_flush_timeout( bt_addr )
+try:
+ write_flush_timeout( bt_addr, timeout )
+except bt.error, e:
+ print "error setting flush timeout. are you sure you're superuser?"
+ print e
+ sys.exit(1)
+print "new flush timeout is %d ms" % read_flush_timeout( bt_addr )
+
+totalsent = 0
+for i in range(num_packets):
+ pkt = "0" * 672
+ totalsent += sock.send(pkt)
+print "sent %d bytes total" % totalsent
+
+sock.close()
diff --git a/pybluez/advanced/l2-unreliable-server.py b/pybluez/advanced/l2-unreliable-server.py
new file mode 100644
index 0000000..d827775
--- /dev/null
+++ b/pybluez/advanced/l2-unreliable-server.py
@@ -0,0 +1,34 @@
+import sys
+import bluetooth
+
+if len(sys.argv) < 2:
+ print "usage: l2-unreliable-server"
+ sys.exit(2)
+
+timeout = int(sys.argv[1])
+assert timeout >= 0
+
+server_sock=bluetooth.BluetoothSocket( bluetooth.L2CAP )
+server_sock.bind(("",0x1001))
+server_sock.listen(1)
+while True:
+ print "waiting for incoming connection"
+ client_sock,address = server_sock.accept()
+ print "Accepted connection from %s" % str(address)
+
+ print "waiting for data"
+ total = 0
+ while True:
+ try:
+ data = client_sock.recv(1024)
+ except bluetooth.BluetoothError, e:
+ break
+ if len(data) == 0: break
+ total += len(data)
+ print "total byte read: %d" % total
+
+ client_sock.close()
+
+ print "connection closed"
+
+server_sock.close()
diff --git a/pybluez/advanced/read-local-bdaddr.py b/pybluez/advanced/read-local-bdaddr.py
new file mode 100644
index 0000000..410a681
--- /dev/null
+++ b/pybluez/advanced/read-local-bdaddr.py
@@ -0,0 +1,35 @@
+import os
+import sys
+import struct
+import bluetooth._bluetooth as _bt
+
+def read_local_bdaddr(hci_sock):
+ old_filter = hci_sock.getsockopt( _bt.SOL_HCI, _bt.HCI_FILTER, 14)
+ flt = _bt.hci_filter_new()
+ opcode = _bt.cmd_opcode_pack(_bt.OGF_INFO_PARAM,
+ _bt.OCF_READ_BD_ADDR)
+ _bt.hci_filter_set_ptype(flt, _bt.HCI_EVENT_PKT)
+ _bt.hci_filter_set_event(flt, _bt.EVT_CMD_COMPLETE);
+ _bt.hci_filter_set_opcode(flt, opcode)
+ hci_sock.setsockopt( _bt.SOL_HCI, _bt.HCI_FILTER, flt )
+
+ _bt.hci_send_cmd(hci_sock, _bt.OGF_INFO_PARAM, _bt.OCF_READ_BD_ADDR )
+
+ pkt = hci_sock.recv(255)
+
+ status,raw_bdaddr = struct.unpack("xxxxxxB6s", pkt)
+ assert status == 0
+
+ t = [ "%X" % ord(b) for b in raw_bdaddr ]
+ t.reverse()
+ bdaddr = ":".join(t)
+
+ # restore old filter
+ hci_sock.setsockopt( _bt.SOL_HCI, _bt.HCI_FILTER, old_filter )
+ return bdaddr
+
+if __name__ == "__main__":
+ dev_id = 0
+ hci_sock = _bt.hci_open_dev(dev_id)
+ bdaddr = read_local_bdaddr(hci_sock)
+ print bdaddr
diff --git a/pybluez/advanced/write-inquiry-scan.py b/pybluez/advanced/write-inquiry-scan.py
new file mode 100644
index 0000000..54793b7
--- /dev/null
+++ b/pybluez/advanced/write-inquiry-scan.py
@@ -0,0 +1,90 @@
+import os
+import sys
+import struct
+import bluetooth._bluetooth as bluez
+
+def read_inquiry_scan_activity(sock):
+ """returns the current inquiry scan interval and window,
+ or -1 on failure"""
+ # save current filter
+ old_filter = sock.getsockopt( bluez.SOL_HCI, bluez.HCI_FILTER, 14)
+
+ # Setup socket filter to receive only events related to the
+ # read_inquiry_mode command
+ flt = bluez.hci_filter_new()
+ opcode = bluez.cmd_opcode_pack(bluez.OGF_HOST_CTL,
+ bluez.OCF_READ_INQ_ACTIVITY)
+ bluez.hci_filter_set_ptype(flt, bluez.HCI_EVENT_PKT)
+ bluez.hci_filter_set_event(flt, bluez.EVT_CMD_COMPLETE);
+ bluez.hci_filter_set_opcode(flt, opcode)
+ sock.setsockopt( bluez.SOL_HCI, bluez.HCI_FILTER, flt )
+
+ # first read the current inquiry mode.
+ bluez.hci_send_cmd(sock, bluez.OGF_HOST_CTL,
+ bluez.OCF_READ_INQ_ACTIVITY )
+
+ pkt = sock.recv(255)
+
+ status,interval,window = struct.unpack("!xxxxxxBHH", pkt)
+ interval = bluez.btohs(interval)
+ interval = (interval >> 8) | ( (interval & 0xFF) << 8 )
+ window = (window >> 8) | ( (window & 0xFF) << 8 )
+ if status != 0: mode = -1
+
+ # restore old filter
+ sock.setsockopt( bluez.SOL_HCI, bluez.HCI_FILTER, old_filter )
+ return interval, window
+
+def write_inquiry_scan_activity(sock, interval, window):
+ """returns 0 on success, -1 on failure"""
+ # save current filter
+ old_filter = sock.getsockopt( bluez.SOL_HCI, bluez.HCI_FILTER, 14)
+
+ # Setup socket filter to receive only events related to the
+ # write_inquiry_mode command
+ flt = bluez.hci_filter_new()
+ opcode = bluez.cmd_opcode_pack(bluez.OGF_HOST_CTL,
+ bluez.OCF_WRITE_INQ_ACTIVITY)
+ bluez.hci_filter_set_ptype(flt, bluez.HCI_EVENT_PKT)
+ bluez.hci_filter_set_event(flt, bluez.EVT_CMD_COMPLETE);
+ bluez.hci_filter_set_opcode(flt, opcode)
+ sock.setsockopt( bluez.SOL_HCI, bluez.HCI_FILTER, flt )
+
+ # send the command!
+ bluez.hci_send_cmd(sock, bluez.OGF_HOST_CTL,
+ bluez.OCF_WRITE_INQ_ACTIVITY, struct.pack("HH",
+ interval, window) )
+
+ pkt = sock.recv(255)
+
+ status = struct.unpack("xxxxxxB", pkt)[0]
+
+ # restore old filter
+ sock.setsockopt( bluez.SOL_HCI, bluez.HCI_FILTER, old_filter )
+ if status != 0: return -1
+ return 0
+
+dev_id = 0
+try:
+ sock = bluez.hci_open_dev(dev_id)
+except:
+ print "error accessing bluetooth device..."
+ sys.exit(1)
+
+try:
+ interval, window = read_inquiry_scan_activity(sock)
+except Exception, e:
+ print "error reading inquiry scan activity. "
+ print e
+ sys.exit(1)
+print "current inquiry scan interval: %d (0x%X) window: %d (0x%X)" % \
+ (interval, interval, window, window)
+
+if len(sys.argv) == 3:
+ interval = int(sys.argv[1])
+ window = int(sys.argv[2])
+ print "target interval: %d window %d" % (interval, window)
+ write_inquiry_scan_activity(sock, interval, window)
+ interval, window = read_inquiry_scan_activity(sock)
+ print "current inquiry scan interval: %d (0x%X) window: %d (0x%X)" % \
+ (interval, interval, window, window)