#!/usr/bin/env python ## system-config-printer ## Copyright (C) 2008 Red Hat, Inc. ## Copyright (C) 2008 Till Kamppeter ## This program is free software; you can redistribute it and/or modify ## it under the terms of the GNU General Public License as published by ## the Free Software Foundation; either version 2 of the License, or ## (at your option) any later version. ## This program is distributed in the hope that it will be useful, ## but WITHOUT ANY WARRANTY; without even the implied warranty of ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the ## GNU General Public License for more details. ## You should have received a copy of the GNU General Public License ## along with this program; if not, write to the Free Software ## Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. import urllib, httplib, platform, threading, tempfile, traceback import os, sys from xml.etree.ElementTree import XML def normalize_space (text): result = text.strip () result = result.replace ('\n', ' ') i = result.find (' ') while i != -1: result = result.replace (' ', ' ') i = result.find (' ') return result class QueryThread (threading.Thread): def __init__ (self, parent, parameters, callback, user_data=None): threading.Thread.__init__ (self) self.parent = parent self.parameters = parameters self.callback = callback self.user_data = user_data self.setDaemon (True) def run (self): # CGI script to be executed query_command = "/query.cgi" # Headers for the post request headers = {"Content-type": "application/x-www-form-urlencoded", "Accept": "text/plain"} params = ("%s&uilanguage=%s&locale=%s" % (urllib.urlencode (self.parameters), self.parent.language[0], self.parent.language[0])) print "http://%s%s?%s" % (self.parent.base_url, query_command, params) # Send request result = None status = 1 try: conn = httplib.HTTPConnection(self.parent.base_url) conn.request("POST", query_command, params, headers) resp = conn.getresponse() status = resp.status if status == 200: result = resp.read() conn.close() except: result = sys.exc_info () if status == 200: status = 0 if self.callback != None: self.callback (status, self.user_data, result) class OpenPrinting: def __init__(self, language=None): if language == None: import locale try: language = locale.getlocale(locale.LC_MESSAGES) except locale.Error, e: language = 'C' self.language = language # XXX Read configuration file. self.base_url = "www.openprinting.org" # Restrictions on driver choices XXX Parameters to be taken from # config file self.onlyfree = 1 self.onlymanufacturer = 0 def cancelOperation(self, handle): # Just prevent the callback. try: handle.callback = None except: pass def webQuery(self, parameters, callback, user_data=None): """ webQuery(parameters, callback, user_data) -> integer Run a web query for a driver. @type parameters: dict @param parameters: URL parameters @type callback: function @param callback: callback function, taking (integer, user_data, string) parameters with the first parameter being the status code, zero for success @return: query handle """ the_thread = QueryThread (self, parameters, callback, user_data) the_thread.start() return the_thread def searchPrinters(self, make, callback, user_data=None): """ searchPrinters(make, callback, user_data) -> integer Search for printers by a manufacturer. @type make: string @param make: printer manufacturer @type callback: function @param callback: callback function, taking (integer, user_data, string) parameters with the first parameter being the status code, zero for success @return: query handle """ def parse_result (status, data, result): (callback, user_data) = data if status != 0: callback (status, user_data, result) return status = 0 printers = {} try: root = XML (result) # We store the printers as a dict of: # foomatic_id: displayname for printer in root.findall ("printer"): id = printer.find ("id") make = printer.find ("make") model = printer.find ("model") if id != None and make != None and model != None: idtxt = id.text maketxt = make.text modeltxt = model.text if idtxt and maketxt and modeltxt: printers[idtxt] = maketxt + " " + modeltxt except: status = 1 printers = sys.exc_info () try: callback (status, user_data, printers) except: (type, value, tb) = sys.exc_info () tblast = traceback.extract_tb (tb, limit=None) if len (tblast): tblast = tblast[:len (tblast) - 1] extxt = traceback.format_exception_only (type, value) for line in traceback.format_tb(tb): print (line.strip ()) print (extxt[0].strip ()) # Common parameters for the request params = { 'type': 'printers', 'make': make, 'format': 'xml' } return self.webQuery(params, parse_result, (callback, user_data)) def listDrivers(self, model, callback, user_data=None): """ listDrivers(model, callback, user_data) -> integer Obtain a list of printer drivers. @type model: string @param model: foomatic printer model string @type callback: function @param callback: callback function, taking (integer, user_data, string) parameters with the first parameter being the status code, zero for success @return: query handle """ def parse_result (status, data, result): (callback, user_data) = data if status != 0: callback (status, user_data, result) try: root = XML (result) drivers = {} # We store the drivers as a dict of: # foomatic_id: # { 'name': name, # 'url': url, # 'supplier': supplier, # 'license': short license string e.g. GPLv2, # 'licensetext': license text (HTML), # 'freesoftware': Boolean, # 'patents': Boolean, # 'shortdescription': short description, # 'recommended': Boolean, # 'functionality': # { 'text': integer percentage, # 'lineart': integer percentage, # 'graphics': integer percentage, # 'photo': integer percentage, # 'speed': integer percentage, # } # 'packages' (optional): # { arch: # { file: # { 'url': url, # 'realversion': upstream version string, # 'version': packaged version string, # 'release': package release string # } # } # } # 'ppds' (optional): # URL string list # } # There is more information in the raw XML, but this # can be added to the Python structure as needed. for driver in root.findall ('driver'): id = driver.attrib.get ('id') if id == None: continue dict = {} for attribute in ['name', 'url', 'supplier', 'license', 'shortdescription' ]: element = driver.find (attribute) if element != None: dict[attribute] = normalize_space (element.text) element = driver.find ('licensetext') if element != None: dict['licensetext'] = element.text for boolean in ['freesoftware', 'recommended', 'patents']: dict[boolean] = driver.find (boolean) != None if not dict.has_key ('name') or not dict.has_key ('url'): continue container = driver.find ('functionality') if container != None: functionality = {} for attribute in ['text', 'lineart', 'graphics', 'photo', 'speed']: element = container.find (attribute) if element != None: functionality[attribute] = element.text if functionality: dict[container.tag] = functionality packages = {} container = driver.find ('packages') if container != None: for arch in container.getchildren (): rpms = {} for package in arch.findall ('package'): rpm = {} for attribute in ['realversion','version', 'release', 'url']: element = package.find (attribute) if element != None: rpm[attribute] = element.text rpms[package.attrib['file']] = rpm packages[arch.tag] = rpms if packages: dict['packages'] = packages ppds = [] container = driver.find ('ppds') if container != None: for each in container.getchildren (): ppds.append (each.text) if ppds: dict['ppds'] = ppds drivers[id] = dict callback (0, user_data, drivers) except: callback (1, user_data, sys.exc_info ()) params = { 'type': 'drivers', 'moreinfo': '1', 'showprinterid': '1', 'onlyppdfiles': '1', 'onlynewestdriverpackages': '1', 'architectures': platform.machine(), 'noobsoletes': '1', 'onlyfree': str (self.onlyfree), 'onlymanufacturer': str (self.onlymanufacturer), 'printer': model, 'format': 'xml'} return self.webQuery(params, parse_result, (callback, user_data)) if __name__ == "__main__": import gtk, pprint gtk.gdk.threads_init () class QueryApp: def __init__(self): self.openprinting = OpenPrinting() self.main = gtk.Dialog ("OpenPrinting query application", None, gtk.DIALOG_MODAL | gtk.DIALOG_NO_SEPARATOR, (gtk.STOCK_CLOSE, gtk.RESPONSE_CLOSE, "Search", 10, "List", 20)) self.main.set_border_width (6) self.main.vbox.set_spacing (2) vbox = gtk.VBox (False, 6) self.main.vbox.pack_start (vbox, True, True, 0) vbox.set_border_width (6) self.entry = gtk.Entry () vbox.pack_start (self.entry, False, False, 6) sw = gtk.ScrolledWindow () self.tv = gtk.TextView () sw.add (self.tv) vbox.pack_start (sw, True, True, 6) self.main.connect ("response", self.response) self.main.show_all () def response (self, dialog, response): if (response == gtk.RESPONSE_CLOSE or response == gtk.RESPONSE_DELETE_EVENT): gtk.main_quit () if response == 10: # Run a query. self.openprinting.searchPrinters (self.entry.get_text (), self.search_printers_callback) if response == 20: self.openprinting.listDrivers (self.entry.get_text (), self.list_drivers_callback) def search_printers_callback (self, status, user_data, printers): if status != 0: raise printers[1] text = "" for printer in printers.values (): text += printer + "\n" gtk.gdk.threads_enter () self.tv.get_buffer ().set_text (text) gtk.gdk.threads_leave () def list_drivers_callback (self, status, user_data, drivers): if status != 0: raise drivers[1] text = pprint.pformat (drivers) gtk.gdk.threads_enter () self.tv.get_buffer ().set_text (text) gtk.gdk.threads_leave () def query_callback (self, status, user_data, result): gtk.gdk.threads_enter () self.tv.get_buffer ().set_text (str (result)) file ("result.xml", "w").write (str (result)) gtk.gdk.threads_leave () q = QueryApp() gtk.main ()