Asterisk - The Open Source Telephony Project  21.4.1
swagger_model.py
1 
2 # Asterisk -- An open source telephony toolkit.
3 #
4 # Copyright (C) 2013, Digium, Inc.
5 #
6 # David M. Lee, II <dlee@digium.com>
7 #
8 # See http://www.asterisk.org for more information about
9 # the Asterisk project. Please do not directly contact
10 # any of the maintainers of this project for assistance;
11 # the project provides a web site, mailing lists and IRC
12 # channels for your use.
13 #
14 # This program is free software, distributed under the terms of
15 # the GNU General Public License Version 2. See the LICENSE file
16 # at the top of the source tree.
17 #
18 
19 """Swagger data model objects.
20 
21 These objects should map directly to the Swagger api-docs, without a lot of
22 additional fields. In the process of translation, it should also validate the
23 model for consistency against the Swagger spec (i.e., fail if fields are
24 missing, or have incorrect values).
25 
26 See https://github.com/wordnik/swagger-core/wiki/API-Declaration for the spec.
27 """
28 
29 from __future__ import print_function
30 import json
31 import os.path
32 import pprint
33 import re
34 import sys
35 import traceback
36 
37 # We don't fully support Swagger 1.2, but we need it for subtyping
38 SWAGGER_VERSIONS = ["1.1", "1.2"]
39 
40 SWAGGER_PRIMITIVES = [
41  'void',
42  'string',
43  'boolean',
44  'number',
45  'int',
46  'long',
47  'double',
48  'float',
49  'Date',
50 ]
51 
52 
53 class Stringify(object):
54  """Simple mix-in to make the repr of the model classes more meaningful.
55  """
56  def __repr__(self):
57  return "%s(%s)" % (self.__class__, pprint.saferepr(self.__dict__))
58 
59 
60 def compare_versions(lhs, rhs):
61  '''Performs a lexicographical comparison between two version numbers.
62 
63  This properly handles simple major.minor.whatever.sure.why.not version
64  numbers, but fails miserably if there's any letters in there.
65 
66  For reference:
67  1.0 == 1.0
68  1.0 < 1.0.1
69  1.2 < 1.10
70 
71  @param lhs Left hand side of the comparison
72  @param rhs Right hand side of the comparison
73  @return < 0 if lhs < rhs
74  @return == 0 if lhs == rhs
75  @return > 0 if lhs > rhs
76  '''
77  lhs = [int(v) for v in lhs.split('.')]
78  rhs = [int(v) for v in rhs.split('.')]
79  return (lhs > rhs) - (lhs < rhs)
80 
81 
82 class ParsingContext(object):
83  """Context information for parsing.
84 
85  This object is immutable. To change contexts (like adding an item to the
86  stack), use the next() and next_stack() functions to build a new one.
87  """
88 
89  def __init__(self, swagger_version, stack):
90  self.__swagger_version = swagger_version
91  self.__stack = stack
92 
93  def __repr__(self):
94  return "ParsingContext(swagger_version=%s, stack=%s)" % (
95  self.swagger_version, self.stack)
96 
97  def get_swagger_version(self):
98  return self.__swagger_version
99 
100  def get_stack(self):
101  return self.__stack
102 
103  swagger_version = property(get_swagger_version)
104 
105  stack = property(get_stack)
106 
107  def version_less_than(self, ver):
108  return compare_versions(self.swagger_version, ver) < 0
109 
110  def next_stack(self, json, id_field):
111  """Returns a new item pushed to the stack.
112 
113  @param json: Current JSON object.
114  @param id_field: Field identifying this object.
115  @return New context with additional item in the stack.
116  """
117  if not id_field in json:
118  raise SwaggerError("Missing id_field: %s" % id_field, self)
119  new_stack = self.stack + ['%s=%s' % (id_field, str(json[id_field]))]
120  return ParsingContext(self.swagger_version, new_stack)
121 
122  def next(self, version=None, stack=None):
123  if version is None:
124  version = self.version
125  if stack is None:
126  stack = self.stack
127  return ParsingContext(version, stack)
128 
129 
130 class SwaggerError(Exception):
131  """Raised when an error is encountered mapping the JSON objects into the
132  model.
133  """
134 
135  def __init__(self, msg, context, cause=None):
136  """Ctor.
137 
138  @param msg: String message for the error.
139  @param context: ParsingContext object
140  @param cause: Optional exception that caused this one.
141  """
142  super(Exception, self).__init__(msg, context, cause)
143 
144 
145 class SwaggerPostProcessor(object):
146  """Post processing interface for model objects. This processor can add
147  fields to model objects for additional information to use in the
148  templates.
149  """
150  def process_resource_api(self, resource_api, context):
151  """Post process a ResourceApi object.
152 
153  @param resource_api: ResourceApi object.
154  @param context: Current context in the API.
155  """
156  pass
157 
158  def process_api(self, api, context):
159  """Post process an Api object.
160 
161  @param api: Api object.
162  @param context: Current context in the API.
163  """
164  pass
165 
166  def process_operation(self, operation, context):
167  """Post process a Operation object.
168 
169  @param operation: Operation object.
170  @param context: Current context in the API.
171  """
172  pass
173 
174  def process_parameter(self, parameter, context):
175  """Post process a Parameter object.
176 
177  @param parameter: Parameter object.
178  @param context: Current context in the API.
179  """
180  pass
181 
182  def process_model(self, model, context):
183  """Post process a Model object.
184 
185  @param model: Model object.
186  @param context: Current context in the API.
187  """
188  pass
189 
190  def process_property(self, property, context):
191  """Post process a Property object.
192 
193  @param property: Property object.
194  @param context: Current context in the API.
195  """
196  pass
197 
198  def process_type(self, swagger_type, context):
199  """Post process a SwaggerType object.
200 
201  @param swagger_type: ResourceListing object.
202  @param context: Current context in the API.
203  """
204  pass
205 
206  def process_resource_listing(self, resource_listing, context):
207  """Post process the overall ResourceListing object.
208 
209  @param resource_listing: ResourceListing object.
210  @param context: Current context in the API.
211  """
212  pass
213 
214 
215 class AllowableRange(Stringify):
216  """Model of a allowableValues of type RANGE
217 
218  See https://github.com/wordnik/swagger-core/wiki/datatypes#complex-types
219  """
220  def __init__(self, min_value, max_value):
221  self.min_value = min_value
222  self.max_value = max_value
223 
224  def to_wiki(self):
225  return "Allowed range: Min: {0}; Max: {1}".format(self.min_value, self.max_value)
226 
227 
229  """Model of a allowableValues of type LIST
230 
231  See https://github.com/wordnik/swagger-core/wiki/datatypes#complex-types
232  """
233  def __init__(self, values):
234  self.values = values
235 
236  def to_wiki(self):
237  return "Allowed values: {0}".format(", ".join(self.values))
238 
239 
240 def load_allowable_values(json, context):
241  """Parse a JSON allowableValues object.
242 
243  This returns None, AllowableList or AllowableRange, depending on the
244  valueType in the JSON. If the valueType is not recognized, a SwaggerError
245  is raised.
246  """
247  if not json:
248  return None
249 
250  if not 'valueType' in json:
251  raise SwaggerError("Missing valueType field", context)
252 
253  value_type = json['valueType']
254 
255  if value_type == 'RANGE':
256  if not 'min' in json and not 'max' in json:
257  raise SwaggerError("Missing fields min/max", context)
258  return AllowableRange(json.get('min'), json.get('max'))
259  if value_type == 'LIST':
260  if not 'values' in json:
261  raise SwaggerError("Missing field values", context)
262  return AllowableList(json['values'])
263  raise SwaggerError("Unkown valueType %s" % value_type, context)
264 
265 
267  """Model of an operation's parameter.
268 
269  See https://github.com/wordnik/swagger-core/wiki/parameters
270  """
271 
272  required_fields = ['name', 'paramType', 'dataType']
273 
274  def __init__(self):
275  self.param_type = None
276  self.name = None
277  self.description = None
278  self.data_type = None
279  self.required = None
280  self.allowable_values = None
281  self.allow_multiple = None
282 
283  def load(self, parameter_json, processor, context):
284  context = context.next_stack(parameter_json, 'name')
285  validate_required_fields(parameter_json, self.required_fields, context)
286  self.name = parameter_json.get('name')
287  self.param_type = parameter_json.get('paramType')
288  self.description = parameter_json.get('description') or ''
289  self.data_type = parameter_json.get('dataType')
290  self.required = parameter_json.get('required') or False
291  self.default_value = parameter_json.get('defaultValue')
292  self.allowable_values = load_allowable_values(
293  parameter_json.get('allowableValues'), context)
294  self.allow_multiple = parameter_json.get('allowMultiple') or False
295  processor.process_parameter(self, context)
296  if parameter_json.get('allowedValues'):
297  raise SwaggerError(
298  "Field 'allowedValues' invalid; use 'allowableValues'",
299  context)
300  return self
301 
302  def is_type(self, other_type):
303  return self.param_type == other_type
304 
305 
307  """Model of an error response.
308 
309  See https://github.com/wordnik/swagger-core/wiki/errors
310  """
311 
312  required_fields = ['code', 'reason']
313 
314  def __init__(self):
315  self.code = None
316  self.reason = None
317 
318  def load(self, err_json, processor, context):
319  context = context.next_stack(err_json, 'code')
320  validate_required_fields(err_json, self.required_fields, context)
321  self.code = err_json.get('code')
322  self.reason = err_json.get('reason')
323  return self
324 
325 
327  """Model of a data type.
328  """
329 
330  def __init__(self):
331  self.name = None
332  self.is_discriminator = None
333  self.is_list = None
334  self.singular_name = None
335  self.lc_singular_name = None
336  self.is_primitive = None
337  self.is_binary = None
338 
339  def load(self, type_name, processor, context):
340  # Some common errors
341  if type_name == 'integer':
342  raise SwaggerError("The type for integer should be 'int'", context)
343 
344  self.name = type_name
345  type_param = get_list_parameter_type(self.name)
346  self.is_list = type_param is not None
347  if self.is_list:
348  self.singular_name = type_param
349  self.lc_singular_name = type_param.lower()
350  else:
351  self.singular_name = self.name
352  self.lc_singular_name = self.name.lower()
353  self.is_primitive = self.singular_name in SWAGGER_PRIMITIVES
354  self.is_binary = (self.singular_name == 'binary')
355  processor.process_type(self, context)
356  return self
357 
358 
360  """Model of an operation on an API
361 
362  See https://github.com/wordnik/swagger-core/wiki/API-Declaration#apis
363  """
364 
365  required_fields = ['httpMethod', 'nickname', 'responseClass', 'summary']
366 
367  def __init__(self):
368  self.http_method = None
369  self.nickname = None
370  self.nickname_lc = None
371  self.response_class = None
372  self.parameters = []
373  self.summary = None
374  self.notes = None
375  self.error_responses = []
376 
377  def load(self, op_json, processor, context):
378  context = context.next_stack(op_json, 'nickname')
379  validate_required_fields(op_json, self.required_fields, context)
380  self.http_method = op_json.get('httpMethod')
381  self.nickname = op_json.get('nickname')
382  self.nickname_lc = self.nickname.lower()
383  response_class = op_json.get('responseClass')
384  self.response_class = response_class and SwaggerType().load(
385  response_class, processor, context)
386 
387  # Specifying WebSocket URL's is our own extension
388  self.is_websocket = op_json.get('upgrade') == 'websocket'
389  self.is_req = not self.is_websocket
390 
391  if self.is_websocket:
392  self.websocket_protocol = op_json.get('websocketProtocol')
393  if self.http_method != 'GET':
394  raise SwaggerError(
395  "upgrade: websocket is only valid on GET operations",
396  context)
397 
398  params_json = op_json.get('parameters') or []
399  self.parameters = [
400  Parameter().load(j, processor, context) for j in params_json]
401  self.query_parameters = [
402  p for p in self.parameters if p.is_type('query')]
403  self.has_query_parameters = self.query_parameters and True
404  self.path_parameters = [
405  p for p in self.parameters if p.is_type('path')]
406  self.has_path_parameters = self.path_parameters and True
407  self.header_parameters = [
408  p for p in self.parameters if p.is_type('header')]
409  self.has_header_parameters = self.header_parameters and True
410  self.has_parameters = self.has_query_parameters or \
412  self.is_binary_response = self.response_class.is_binary
413 
414  # Body param is different, since there's at most one
415  self.body_parameter = [
416  p for p in self.parameters if p.is_type('body')]
417  if len(self.body_parameter) > 1:
418  raise SwaggerError("Cannot have more than one body param", context)
419  self.body_parameter = self.body_parameter and self.body_parameter[0]
420  self.has_body_parameter = self.body_parameter and True
421 
422  self.summary = op_json.get('summary')
423  self.notes = op_json.get('notes')
424  err_json = op_json.get('errorResponses') or []
425  self.error_responses = [
426  ErrorResponse().load(j, processor, context) for j in err_json]
427  self.has_error_responses = self.error_responses != []
428  processor.process_operation(self, context)
429  return self
430 
431 
432 class Api(Stringify):
433  """Model of a single API in an API declaration.
434 
435  See https://github.com/wordnik/swagger-core/wiki/API-Declaration
436  """
437 
438  required_fields = ['path', 'operations']
439 
440  def __init__(self,):
441  self.path = None
442  self.description = None
443  self.operations = []
444 
445  def load(self, api_json, processor, context):
446  context = context.next_stack(api_json, 'path')
447  validate_required_fields(api_json, self.required_fields, context)
448  self.path = api_json.get('path')
449  self.description = api_json.get('description')
450  op_json = api_json.get('operations')
451  self.operations = [
452  Operation().load(j, processor, context) for j in op_json]
453  self.has_websocket = any(op.is_websocket for op in self.operations)
454  processor.process_api(self, context)
455  return self
456 
457 
458 def get_list_parameter_type(type_string):
459  """Returns the type parameter if the given type_string is List[].
460 
461  @param type_string: Type string to parse
462  @returns Type parameter of the list, or None if not a List.
463  """
464  list_match = re.match('^List\[(.*)\]$', type_string)
465  return list_match and list_match.group(1)
466 
467 
469  """Model of a Swagger property.
470 
471  See https://github.com/wordnik/swagger-core/wiki/datatypes
472  """
473 
474  required_fields = ['type']
475 
476  def __init__(self, name):
477  self.name = name
478  self.type = None
479  self.description = None
480  self.required = None
481 
482  def load(self, property_json, processor, context):
483  validate_required_fields(property_json, self.required_fields, context)
484  # Bit of a hack, but properties do not self-identify
485  context = context.next_stack({'name': self.name}, 'name')
486  self.description = property_json.get('description') or ''
487  self.required = property_json.get('required') or False
488 
489  type = property_json.get('type')
490  self.type = type and SwaggerType().load(type, processor, context)
491 
492  processor.process_property(self, context)
493  return self
494 
495 
497  """Model of a Swagger model.
498 
499  See https://github.com/wordnik/swagger-core/wiki/datatypes
500  """
501 
502  required_fields = ['description', 'properties']
503 
504  def __init__(self):
505  self.id = None
506  self.id_lc = None
507  self.subtypes = []
508  self.__subtype_types = []
509  self.notes = None
510  self.description = None
511  self.__properties = None
512  self.__discriminator = None
513  self.__extends_type = None
514 
515  def load(self, id, model_json, processor, context):
516  context = context.next_stack(model_json, 'id')
517  validate_required_fields(model_json, self.required_fields, context)
518  # The duplication of the model's id is required by the Swagger spec.
519  self.id = model_json.get('id')
520  self.id_lc = self.id.lower()
521  if id != self.id:
522  raise SwaggerError("Model id doesn't match name", context)
523  self.subtypes = model_json.get('subTypes') or []
524  if self.subtypes and context.version_less_than("1.2"):
525  raise SwaggerError("Type extension support added in Swagger 1.2",
526  context)
527  self.description = model_json.get('description')
528  props = model_json.get('properties').items() or []
529  self.__properties = [
530  Property(k).load(j, processor, context) for (k, j) in props]
531  self.__properties = sorted(self.__properties, key=lambda p: p.name)
532 
533  discriminator = model_json.get('discriminator')
534 
535  if discriminator:
536  if context.version_less_than("1.2"):
537  raise SwaggerError("Discriminator support added in Swagger 1.2",
538  context)
539 
540  discr_props = [p for p in self.__properties if p.name == discriminator]
541  if not discr_props:
542  raise SwaggerError(
543  "Discriminator '%s' does not name a property of '%s'" % (
544  discriminator, self.id),
545  context)
546 
547  self.__discriminator = discr_props[0]
548 
549  self.model_json = json.dumps(model_json,
550  indent=2, separators=(',', ': '))
551 
552  processor.process_model(self, context)
553  return self
554 
555  def extends(self):
556  return self.__extends_type and self.__extends_type.id
557 
558  def extends_lc(self):
559  return self.__extends_type and self.__extends_type.id_lc
560 
561  def set_extends_type(self, extends_type):
562  self.__extends_type = extends_type
563 
564  def set_subtype_types(self, subtype_types):
565  self.__subtype_types = subtype_types
566 
567  def discriminator(self):
568  """Returns the discriminator, digging through base types if needed.
569  """
570  return self.__discriminator or \
571  self.__extends_type and self.__extends_type.discriminator()
572 
573  def properties(self):
574  base_props = []
575  if self.__extends_type:
576  base_props = self.__extends_type.properties()
577  return base_props + self.__properties
578 
579  def has_properties(self):
580  return len(self.properties()) > 0
581 
582  def all_subtypes(self):
583  """Returns the full list of all subtypes, including sub-subtypes.
584  """
585  res = self.__subtype_types + \
586  [subsubtypes for subtype in self.__subtype_types
587  for subsubtypes in subtype.all_subtypes()]
588  return sorted(res, key=lambda m: m.id)
589 
590  def has_subtypes(self):
591  """Returns True if type has any subtypes.
592  """
593  return len(self.subtypes) > 0
594 
595 
597  """Model class for an API Declaration.
598 
599  See https://github.com/wordnik/swagger-core/wiki/API-Declaration
600  """
601 
602  required_fields = [
603  'swaggerVersion', '_author', '_copyright', 'apiVersion', 'basePath',
604  'resourcePath', 'apis', 'models'
605  ]
606 
607  def __init__(self):
608  self.swagger_version = None
609  self.author = None
610  self.copyright = None
611  self.api_version = None
612  self.base_path = None
613  self.resource_path = None
614  self.apis = []
615  self.models = []
616 
617  def load_file(self, api_declaration_file, processor):
618  context = ParsingContext(None, [api_declaration_file])
619  try:
620  return self.__load_file(api_declaration_file, processor, context)
621  except SwaggerError:
622  raise
623  except Exception as e:
624  print("Error: ", traceback.format_exc(), file=sys.stderr)
625  raise SwaggerError(
626  "Error loading %s" % api_declaration_file, context, e)
627 
628  def __load_file(self, api_declaration_file, processor, context):
629  with open(api_declaration_file) as fp:
630  self.load(json.load(fp), processor, context)
631 
632  expected_resource_path = '/api-docs/' + \
633  os.path.basename(api_declaration_file) \
634  .replace(".json", ".{format}")
635 
636  if self.resource_path != expected_resource_path:
637  print("%s != %s" % (self.resource_path, expected_resource_path),
638  file=sys.stderr)
639  raise SwaggerError("resourcePath has incorrect value", context)
640 
641  return self
642 
643  def load(self, api_decl_json, processor, context):
644  """Loads a resource from a single Swagger resource.json file.
645  """
646  # If the version doesn't match, all bets are off.
647  self.swagger_version = api_decl_json.get('swaggerVersion')
648  context = context.next(version=self.swagger_version)
649  if not self.swagger_version in SWAGGER_VERSIONS:
650  raise SwaggerError(
651  "Unsupported Swagger version %s" % self.swagger_version, context)
652 
653  validate_required_fields(api_decl_json, self.required_fields, context)
654 
655  self.author = api_decl_json.get('_author')
656  self.copyright = api_decl_json.get('_copyright')
657  self.api_version = api_decl_json.get('apiVersion')
658  self.base_path = api_decl_json.get('basePath')
659  self.resource_path = api_decl_json.get('resourcePath')
660  self.requires_modules = api_decl_json.get('requiresModules') or []
661  api_json = api_decl_json.get('apis') or []
662  self.apis = [
663  Api().load(j, processor, context) for j in api_json]
664  paths = set()
665  for api in self.apis:
666  if api.path in paths:
667  raise SwaggerError("API with duplicated path: %s" % api.path, context)
668  paths.add(api.path)
669  self.has_websocket = any(api.has_websocket for api in self.apis)
670  models = api_decl_json.get('models').items() or []
671  self.models = [Model().load(id, json, processor, context)
672  for (id, json) in models]
673  self.models = sorted(self.models, key=lambda m: m.id)
674  # Now link all base/extended types
675  model_dict = dict((m.id, m) for m in self.models)
676  for m in self.models:
677  def link_subtype(name):
678  res = model_dict.get(name)
679  if not res:
680  raise SwaggerError("%s has non-existing subtype %s",
681  m.id, name)
682  res.set_extends_type(m)
683  return res;
684  if m.subtypes:
685  m.set_subtype_types([
686  link_subtype(subtype) for subtype in m.subtypes])
687  return self
688 
689 
691  """Model of an API listing in the resources.json file.
692  """
693 
694  required_fields = ['path', 'description']
695 
696  def __init__(self):
697  self.path = None
698  self.description = None
699  self.api_declaration = None
700 
701  def load(self, api_json, processor, context):
702  context = context.next_stack(api_json, 'path')
703  validate_required_fields(api_json, self.required_fields, context)
704  self.path = api_json['path'].replace('{format}', 'json')
705  self.description = api_json['description']
706 
707  if not self.path or self.path[0] != '/':
708  raise SwaggerError("Path must start with /", context)
709  processor.process_resource_api(self, context)
710  return self
711 
712  def load_api_declaration(self, base_dir, processor):
713  self.file = (base_dir + self.path)
714  self.api_declaration = ApiDeclaration().load_file(self.file, processor)
715  processor.process_resource_api(self, [self.file])
716 
717 
719  """Model of Swagger's resources.json file.
720  """
721 
722  required_fields = ['apiVersion', 'basePath', 'apis']
723 
724  def __init__(self):
725  self.swagger_version = None
726  self.api_version = None
727  self.base_path = None
728  self.apis = None
729 
730  def load_file(self, resource_file, processor):
731  context = ParsingContext(None, [resource_file])
732  try:
733  return self.__load_file(resource_file, processor, context)
734  except SwaggerError:
735  raise
736  except Exception as e:
737  print("Error: ", traceback.format_exc(), file=sys.stderr)
738  raise SwaggerError(
739  "Error loading %s" % resource_file, context, e)
740 
741  def __load_file(self, resource_file, processor, context):
742  with open(resource_file) as fp:
743  return self.load(json.load(fp), processor, context)
744 
745  def load(self, resources_json, processor, context):
746  # If the version doesn't match, all bets are off.
747  self.swagger_version = resources_json.get('swaggerVersion')
748  if not self.swagger_version in SWAGGER_VERSIONS:
749  raise SwaggerError(
750  "Unsupported Swagger version %s" % self.swagger_version, context)
751 
752  validate_required_fields(resources_json, self.required_fields, context)
753  self.api_version = resources_json['apiVersion']
754  self.base_path = resources_json['basePath']
755  apis_json = resources_json['apis']
756  self.apis = [
757  ResourceApi().load(j, processor, context) for j in apis_json]
758  processor.process_resource_listing(self, context)
759  return self
760 
761 
762 def validate_required_fields(json, required_fields, context):
763  """Checks a JSON object for a set of required fields.
764 
765  If any required field is missing, a SwaggerError is raised.
766 
767  @param json: JSON object to check.
768  @param required_fields: List of required fields.
769  @param context: Current context in the API.
770  """
771  missing_fields = [f for f in required_fields if not f in json]
772 
773  if missing_fields:
774  raise SwaggerError(
775  "Missing fields: %s" % ', '.join(missing_fields), context)
def next_stack(self, json, id_field)
def __load_file(self, resource_file, processor, context)
def process_model(self, model, context)
def process_api(self, api, context)
def process_operation(self, operation, context)
def load(self, resources_json, processor, context)
def process_property(self, property, context)
def process_resource_api(self, resource_api, context)
static int load_file(const char *filename, char **ret)
Read a TEXT file into a string and return the length.
def process_type(self, swagger_type, context)
def __load_file(self, api_declaration_file, processor, context)
def load(self, api_decl_json, processor, context)
def process_parameter(self, parameter, context)
def process_resource_listing(self, resource_listing, context)