diff options
Diffstat (limited to 'pybluez/advanced')
| -rw-r--r-- | pybluez/advanced/inquiry-with-rssi.py | 148 | ||||
| -rw-r--r-- | pybluez/advanced/l2-mtu.py | 60 | ||||
| -rw-r--r-- | pybluez/advanced/l2-unreliable-client.py | 76 | ||||
| -rw-r--r-- | pybluez/advanced/l2-unreliable-server.py | 34 | ||||
| -rw-r--r-- | pybluez/advanced/read-local-bdaddr.py | 35 | ||||
| -rw-r--r-- | pybluez/advanced/write-inquiry-scan.py | 90 |
6 files changed, 443 insertions, 0 deletions
diff --git a/pybluez/advanced/inquiry-with-rssi.py b/pybluez/advanced/inquiry-with-rssi.py new file mode 100644 index 0000000..814f7ed --- /dev/null +++ b/pybluez/advanced/inquiry-with-rssi.py @@ -0,0 +1,148 @@ +# performs a simple device inquiry, followed by a remote name request of each +# discovered device + +import os +import sys +import struct +import bluetooth._bluetooth as bluez + +def printpacket(pkt): + for c in pkt: + sys.stdout.write("%02x " % struct.unpack("B",c)[0]) + print + + +def read_inquiry_mode(sock): + """returns the current mode, or -1 on failure""" + # save current filter + old_filter = sock.getsockopt( bluez.SOL_HCI, bluez.HCI_FILTER, 14) + + # Setup socket filter to receive only events related to the + # read_inquiry_mode command + flt = bluez.hci_filter_new() + opcode = bluez.cmd_opcode_pack(bluez.OGF_HOST_CTL, + bluez.OCF_READ_INQUIRY_MODE) + bluez.hci_filter_set_ptype(flt, bluez.HCI_EVENT_PKT) + bluez.hci_filter_set_event(flt, bluez.EVT_CMD_COMPLETE); + bluez.hci_filter_set_opcode(flt, opcode) + sock.setsockopt( bluez.SOL_HCI, bluez.HCI_FILTER, flt ) + + # first read the current inquiry mode. + bluez.hci_send_cmd(sock, bluez.OGF_HOST_CTL, + bluez.OCF_READ_INQUIRY_MODE ) + + pkt = sock.recv(255) + + status,mode = struct.unpack("xxxxxxBB", pkt) + if status != 0: mode = -1 + + # restore old filter + sock.setsockopt( bluez.SOL_HCI, bluez.HCI_FILTER, old_filter ) + return mode + +def write_inquiry_mode(sock, mode): + """returns 0 on success, -1 on failure""" + # save current filter + old_filter = sock.getsockopt( bluez.SOL_HCI, bluez.HCI_FILTER, 14) + + # Setup socket filter to receive only events related to the + # write_inquiry_mode command + flt = bluez.hci_filter_new() + opcode = bluez.cmd_opcode_pack(bluez.OGF_HOST_CTL, + bluez.OCF_WRITE_INQUIRY_MODE) + bluez.hci_filter_set_ptype(flt, bluez.HCI_EVENT_PKT) + bluez.hci_filter_set_event(flt, bluez.EVT_CMD_COMPLETE); + bluez.hci_filter_set_opcode(flt, opcode) + sock.setsockopt( bluez.SOL_HCI, bluez.HCI_FILTER, flt ) + + # send the command! + bluez.hci_send_cmd(sock, bluez.OGF_HOST_CTL, + bluez.OCF_WRITE_INQUIRY_MODE, struct.pack("B", mode) ) + + pkt = sock.recv(255) + + status = struct.unpack("xxxxxxB", pkt)[0] + + # restore old filter + sock.setsockopt( bluez.SOL_HCI, bluez.HCI_FILTER, old_filter ) + if status != 0: return -1 + return 0 + +def device_inquiry_with_with_rssi(sock): + # save current filter + old_filter = sock.getsockopt( bluez.SOL_HCI, bluez.HCI_FILTER, 14) + + # perform a device inquiry on bluetooth device #0 + # The inquiry should last 8 * 1.28 = 10.24 seconds + # before the inquiry is performed, bluez should flush its cache of + # previously discovered devices + flt = bluez.hci_filter_new() + bluez.hci_filter_all_events(flt) + bluez.hci_filter_set_ptype(flt, bluez.HCI_EVENT_PKT) + sock.setsockopt( bluez.SOL_HCI, bluez.HCI_FILTER, flt ) + + duration = 4 + max_responses = 255 + cmd_pkt = struct.pack("BBBBB", 0x33, 0x8b, 0x9e, duration, max_responses) + bluez.hci_send_cmd(sock, bluez.OGF_LINK_CTL, bluez.OCF_INQUIRY, cmd_pkt) + + results = [] + + done = False + while not done: + pkt = sock.recv(255) + ptype, event, plen = struct.unpack("BBB", pkt[:3]) + if event == bluez.EVT_INQUIRY_RESULT_WITH_RSSI: + pkt = pkt[3:] + nrsp = struct.unpack("B", pkt[0])[0] + for i in range(nrsp): + addr = bluez.ba2str( pkt[1+6*i:1+6*i+6] ) + rssi = struct.unpack("b", pkt[1+13*nrsp+i])[0] + results.append( ( addr, rssi ) ) + print "[%s] RSSI: [%d]" % (addr, rssi) + elif event == bluez.EVT_INQUIRY_COMPLETE: + done = True + elif event == bluez.EVT_CMD_STATUS: + status, ncmd, opcode = struct.unpack("BBH", pkt[3:7]) + if status != 0: + print "uh oh..." + printpacket(pkt[3:7]) + done = True + else: + print "unrecognized packet type 0x%02x" % ptype + + + # restore old filter + sock.setsockopt( bluez.SOL_HCI, bluez.HCI_FILTER, old_filter ) + + return results + +dev_id = 0 +try: + sock = bluez.hci_open_dev(dev_id) +except: + print "error accessing bluetooth device..." + sys.exit(1) + +try: + mode = read_inquiry_mode(sock) +except Exception, e: + print "error reading inquiry mode. " + print "Are you sure this a bluetooth 1.2 device?" + print e + sys.exit(1) +print "current inquiry mode is %d" % mode + +if mode != 1: + print "writing inquiry mode..." + try: + result = write_inquiry_mode(sock, 1) + except Exception, e: + print "error writing inquiry mode. Are you sure you're root?" + print e + sys.exit(1) + if result != 0: + print "error while setting inquiry mode" + print "result: %d" % result + +device_inquiry_with_with_rssi(sock) diff --git a/pybluez/advanced/l2-mtu.py b/pybluez/advanced/l2-mtu.py new file mode 100644 index 0000000..1ffe04c --- /dev/null +++ b/pybluez/advanced/l2-mtu.py @@ -0,0 +1,60 @@ +import sys +import struct +import bluetooth + +def usage(): + print "usage: l2-mtu < server | client > [options]" + print "" + print "l2-mtu server to start in server mode" + print "l2-mtu client <addr> to start in client mode and connect to addr" + sys.exit(2) + +if len(sys.argv) < 2: usage() + +mode = sys.argv[1] +if mode not in [ "client", "server" ]: usage() + +if mode == "server": + server_sock=bluetooth.BluetoothSocket( bluetooth.L2CAP ) + server_sock.bind(("",0x1001)) + server_sock.listen(1) + while True: + print "waiting for incoming connection" + client_sock,address = server_sock.accept() + print "Accepted connection from %s" % str(address) + + bluetooth.set_l2cap_mtu( client_sock, 65535 ) + + print "waiting for data" + total = 0 + while True: + try: + data = client_sock.recv(65535) + except bluetooth.BluetoothError, e: + break + if len(data) == 0: break + print "received packet of size %d" % len(data) + + client_sock.close() + + print "connection closed" + + server_sock.close() +else: + sock=bluetooth.BluetoothSocket(bluetooth.L2CAP) + + bt_addr = sys.argv[2] + print "trying to connect to %s:1001" % bt_addr + port = 0x1001 + sock.connect((bt_addr, port)) + + print "connected. Adjusting link parameters." + bluetooth.set_l2cap_mtu( sock, 65535 ) + + totalsent = 0 + for i in range(1, 65535, 100): + pkt = "0" * i + sent = sock.send(pkt) + print "sent packet of size %d (tried %d)" % (sent, len(pkt)) + + sock.close() diff --git a/pybluez/advanced/l2-unreliable-client.py b/pybluez/advanced/l2-unreliable-client.py new file mode 100644 index 0000000..5b10a48 --- /dev/null +++ b/pybluez/advanced/l2-unreliable-client.py @@ -0,0 +1,76 @@ +import sys +import fcntl +import struct +import array +import bluetooth +import bluetooth._bluetooth as bt # low level bluetooth wrappers. + +def __get_acl_conn_handle(sock, addr): + hci_fd = sock.fileno() + reqstr = struct.pack( "6sB17s", bt.str2ba(addr), bt.ACL_LINK, "\0" * 17) + request = array.array( "c", reqstr ) + fcntl.ioctl( hci_fd, bt.HCIGETCONNINFO, request, 1 ) + handle = struct.unpack("8xH14x", request.tostring())[0] + return handle + +def write_flush_timeout( addr, timeout ): + hci_sock = bt.hci_open_dev() + # get the ACL connection handle to the remote device + handle = __get_acl_conn_handle(hci_sock, addr) + pkt = struct.pack("HH", handle, bt.htobs(timeout)) + response = bt.hci_send_req(hci_sock, bt.OGF_HOST_CTL, + 0x0028, bt.EVT_CMD_COMPLETE, 3, pkt) + status = struct.unpack("B", response[0])[0] + rhandle = struct.unpack("H", response[1:3])[0] + assert rhandle == handle + assert status == 0 + +def read_flush_timeout( addr ): + hci_sock = bt.hci_open_dev() + # get the ACL connection handle to the remote device + handle = __get_acl_conn_handle(hci_sock, addr) + pkt = struct.pack("H", handle) + response = bt.hci_send_req(hci_sock, bt.OGF_HOST_CTL, + 0x0027, bt.EVT_CMD_COMPLETE, 5, pkt) + status = struct.unpack("B", response[0])[0] + rhandle = struct.unpack("H", response[1:3])[0] + assert rhandle == handle + assert status == 0 + fto = struct.unpack("H", response[3:5])[0] + return fto + +# Create the client socket +sock=bluetooth.BluetoothSocket(bluetooth.L2CAP) + +if len(sys.argv) < 4: + print "usage: l2capclient.py <addr> <timeout> <num_packets>" + print " address - device that l2-unreliable-server is running on" + print " timeout - wait timeout * 0.625ms before dropping unACK'd packets" + print " num_packets - number of 627-byte packets to send on connect" + sys.exit(2) + +bt_addr=sys.argv[1] +timeout = int(sys.argv[2]) +num_packets = int(sys.argv[3]) + +print "trying to connect to %s:1001" % bt_addr +port = 0x1001 +sock.connect((bt_addr, port)) + +print "connected. Adjusting link parameters." +print "current flush timeout is %d ms" % read_flush_timeout( bt_addr ) +try: + write_flush_timeout( bt_addr, timeout ) +except bt.error, e: + print "error setting flush timeout. are you sure you're superuser?" + print e + sys.exit(1) +print "new flush timeout is %d ms" % read_flush_timeout( bt_addr ) + +totalsent = 0 +for i in range(num_packets): + pkt = "0" * 672 + totalsent += sock.send(pkt) +print "sent %d bytes total" % totalsent + +sock.close() diff --git a/pybluez/advanced/l2-unreliable-server.py b/pybluez/advanced/l2-unreliable-server.py new file mode 100644 index 0000000..d827775 --- /dev/null +++ b/pybluez/advanced/l2-unreliable-server.py @@ -0,0 +1,34 @@ +import sys +import bluetooth + +if len(sys.argv) < 2: + print "usage: l2-unreliable-server" + sys.exit(2) + +timeout = int(sys.argv[1]) +assert timeout >= 0 + +server_sock=bluetooth.BluetoothSocket( bluetooth.L2CAP ) +server_sock.bind(("",0x1001)) +server_sock.listen(1) +while True: + print "waiting for incoming connection" + client_sock,address = server_sock.accept() + print "Accepted connection from %s" % str(address) + + print "waiting for data" + total = 0 + while True: + try: + data = client_sock.recv(1024) + except bluetooth.BluetoothError, e: + break + if len(data) == 0: break + total += len(data) + print "total byte read: %d" % total + + client_sock.close() + + print "connection closed" + +server_sock.close() diff --git a/pybluez/advanced/read-local-bdaddr.py b/pybluez/advanced/read-local-bdaddr.py new file mode 100644 index 0000000..410a681 --- /dev/null +++ b/pybluez/advanced/read-local-bdaddr.py @@ -0,0 +1,35 @@ +import os +import sys +import struct +import bluetooth._bluetooth as _bt + +def read_local_bdaddr(hci_sock): + old_filter = hci_sock.getsockopt( _bt.SOL_HCI, _bt.HCI_FILTER, 14) + flt = _bt.hci_filter_new() + opcode = _bt.cmd_opcode_pack(_bt.OGF_INFO_PARAM, + _bt.OCF_READ_BD_ADDR) + _bt.hci_filter_set_ptype(flt, _bt.HCI_EVENT_PKT) + _bt.hci_filter_set_event(flt, _bt.EVT_CMD_COMPLETE); + _bt.hci_filter_set_opcode(flt, opcode) + hci_sock.setsockopt( _bt.SOL_HCI, _bt.HCI_FILTER, flt ) + + _bt.hci_send_cmd(hci_sock, _bt.OGF_INFO_PARAM, _bt.OCF_READ_BD_ADDR ) + + pkt = hci_sock.recv(255) + + status,raw_bdaddr = struct.unpack("xxxxxxB6s", pkt) + assert status == 0 + + t = [ "%X" % ord(b) for b in raw_bdaddr ] + t.reverse() + bdaddr = ":".join(t) + + # restore old filter + hci_sock.setsockopt( _bt.SOL_HCI, _bt.HCI_FILTER, old_filter ) + return bdaddr + +if __name__ == "__main__": + dev_id = 0 + hci_sock = _bt.hci_open_dev(dev_id) + bdaddr = read_local_bdaddr(hci_sock) + print bdaddr diff --git a/pybluez/advanced/write-inquiry-scan.py b/pybluez/advanced/write-inquiry-scan.py new file mode 100644 index 0000000..54793b7 --- /dev/null +++ b/pybluez/advanced/write-inquiry-scan.py @@ -0,0 +1,90 @@ +import os +import sys +import struct +import bluetooth._bluetooth as bluez + +def read_inquiry_scan_activity(sock): + """returns the current inquiry scan interval and window, + or -1 on failure""" + # save current filter + old_filter = sock.getsockopt( bluez.SOL_HCI, bluez.HCI_FILTER, 14) + + # Setup socket filter to receive only events related to the + # read_inquiry_mode command + flt = bluez.hci_filter_new() + opcode = bluez.cmd_opcode_pack(bluez.OGF_HOST_CTL, + bluez.OCF_READ_INQ_ACTIVITY) + bluez.hci_filter_set_ptype(flt, bluez.HCI_EVENT_PKT) + bluez.hci_filter_set_event(flt, bluez.EVT_CMD_COMPLETE); + bluez.hci_filter_set_opcode(flt, opcode) + sock.setsockopt( bluez.SOL_HCI, bluez.HCI_FILTER, flt ) + + # first read the current inquiry mode. + bluez.hci_send_cmd(sock, bluez.OGF_HOST_CTL, + bluez.OCF_READ_INQ_ACTIVITY ) + + pkt = sock.recv(255) + + status,interval,window = struct.unpack("!xxxxxxBHH", pkt) + interval = bluez.btohs(interval) + interval = (interval >> 8) | ( (interval & 0xFF) << 8 ) + window = (window >> 8) | ( (window & 0xFF) << 8 ) + if status != 0: mode = -1 + + # restore old filter + sock.setsockopt( bluez.SOL_HCI, bluez.HCI_FILTER, old_filter ) + return interval, window + +def write_inquiry_scan_activity(sock, interval, window): + """returns 0 on success, -1 on failure""" + # save current filter + old_filter = sock.getsockopt( bluez.SOL_HCI, bluez.HCI_FILTER, 14) + + # Setup socket filter to receive only events related to the + # write_inquiry_mode command + flt = bluez.hci_filter_new() + opcode = bluez.cmd_opcode_pack(bluez.OGF_HOST_CTL, + bluez.OCF_WRITE_INQ_ACTIVITY) + bluez.hci_filter_set_ptype(flt, bluez.HCI_EVENT_PKT) + bluez.hci_filter_set_event(flt, bluez.EVT_CMD_COMPLETE); + bluez.hci_filter_set_opcode(flt, opcode) + sock.setsockopt( bluez.SOL_HCI, bluez.HCI_FILTER, flt ) + + # send the command! + bluez.hci_send_cmd(sock, bluez.OGF_HOST_CTL, + bluez.OCF_WRITE_INQ_ACTIVITY, struct.pack("HH", + interval, window) ) + + pkt = sock.recv(255) + + status = struct.unpack("xxxxxxB", pkt)[0] + + # restore old filter + sock.setsockopt( bluez.SOL_HCI, bluez.HCI_FILTER, old_filter ) + if status != 0: return -1 + return 0 + +dev_id = 0 +try: + sock = bluez.hci_open_dev(dev_id) +except: + print "error accessing bluetooth device..." + sys.exit(1) + +try: + interval, window = read_inquiry_scan_activity(sock) +except Exception, e: + print "error reading inquiry scan activity. " + print e + sys.exit(1) +print "current inquiry scan interval: %d (0x%X) window: %d (0x%X)" % \ + (interval, interval, window, window) + +if len(sys.argv) == 3: + interval = int(sys.argv[1]) + window = int(sys.argv[2]) + print "target interval: %d window %d" % (interval, window) + write_inquiry_scan_activity(sock, interval, window) + interval, window = read_inquiry_scan_activity(sock) + print "current inquiry scan interval: %d (0x%X) window: %d (0x%X)" % \ + (interval, interval, window, window) |
