Source code for yagocd.resources.pipeline

#!/usr/bin/env python
# -*- coding: utf-8 -*-

###############################################################################
# The MIT License
#
# Copyright (c) 2016 Grigory Chernyshev
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
#
###############################################################################
import json
import time
from distutils.version import LooseVersion

from easydict import EasyDict

from yagocd.resources import BaseManager, BaseNode
from yagocd.resources.material import ModificationEntity
from yagocd.resources.pipeline_config import PipelineConfigManager
from yagocd.resources.stage import StageInstance
from yagocd.util import since, YagocdUtil


@since('14.3.0')
[docs]class PipelineManager(BaseManager): """ The pipelines API allows users to view pipeline information and operate on it. `Official documentation. <https://api.go.cd/current/#pipelines>`_ `Pipeline groups documentation. <https://api.go.cd/current/#pipeline-groups>`_ :versionadded: 14.3.0. """ GROUPS_RESOURCE_PATH = '{base_api}/config/pipeline_groups' RESOURCE_PATH = '{base_api}/pipelines/{name}' VSM_RESOURCE_PATH = '{base_api}/pipelines/value_stream_map/{name}' def __iter__(self): """ Method add iterator protocol for the manager. :return: array of pipelines :rtype: list of yagocd.resources.pipeline.PipelineEntity """ return iter(self.list()) def __getitem__(self, name): """ Method add possibility to get pipeline by the name using dictionary like syntax. :param name: name of the pipeline :return: if found - pipeline :class:`yagocd.resources.pipeline.PipelineEntity`, otherwise ``None``. :rtype: yagocd.resources.pipeline.PipelineEntity """ return self.find(name=name) @since('14.3.0')
[docs] def list(self): """ List all available pipelines. :versionadded: 14.3.0. This method uses ``pipeline_groups`` API method call to list available pipelines. It also links them together, so later it's possible to refer to pipeline's descendants. :return: array of pipelines :rtype: list of yagocd.resources.pipeline.PipelineEntity """ response = self._session.get( path=self.GROUPS_RESOURCE_PATH.format(base_api=self.base_api), headers={'Accept': 'application/json'}, ) pipelines = list() for group in response.json(): for data in group['pipelines']: pipeline = PipelineEntity( session=self._session, data=data, group=group['name'] ) pipelines.append(pipeline) # build pipeline graph to link related nodes return YagocdUtil.build_graph( nodes=pipelines, dependencies=lambda parent: [material for material in parent.data.materials], compare=lambda candidate, child: candidate.description == child.data.name )
[docs] def find(self, name): """ Finds pipeline by it's name. :versionadded: 14.3.0. :param name: name of required pipeline. :return: if found - pipeline :class:`yagocd.resources.pipeline.PipelineEntity`, otherwise ``None``. :rtype: yagocd.resources.pipeline.PipelineEntity """ for pipeline in self.list(): if pipeline.data.name == name: return pipeline
[docs] def history(self, name, offset=0): """ The pipeline history allows users to list pipeline instances. Supports pagination using offset which tells the API how many instances to skip. :versionadded: 14.3.0. :param name: name of the pipeline. :param offset: number of pipeline instances to be skipped. :return: an array of pipeline instances :class:`yagocd.resources.pipeline.PipelineInstance`. :rtype: list of yagocd.resources.pipeline.PipelineInstance """ response = self._session.get( path=self._session.urljoin(self.RESOURCE_PATH, 'history', offset).format( base_api=self.base_api, name=name), headers={'Accept': 'application/json'}, ) instances = list() for instance in response.json().get('pipelines'): instances.append(PipelineInstance(session=self._session, data=instance)) return instances
[docs] def full_history(self, name): """ Method for accessing full history of specific pipeline. :versionadded: 14.3.0. It yields each instance and after one chunk is over moves to the next one. :param name: name of the pipeline. :return: an array of pipeline instances :class:`yagocd.resources.pipeline.PipelineInstance`. :rtype: list of yagocd.resources.pipeline.PipelineInstance """ offset = 0 instances = self.history(name, offset) while instances: for instance in instances: yield instance offset += len(instances) instances = self.history(name, offset)
[docs] def last(self, name): """ Get last pipeline instance. :versionadded: 14.3.0. :param name: name of the pipeline. :rtype: yagocd.resources.pipeline.PipelineInstance """ pipeline_history = self.history(name=name) if pipeline_history: return pipeline_history[0]
[docs] def get(self, name, counter): """ Gets pipeline instance object. :versionadded: 14.3.0. :param name: name of the pipeline. :param counter: pipeline counter. :return: A pipeline instance object :class:`yagocd.resources.pipeline.PipelineInstance`. :rtype: yagocd.resources.pipeline.PipelineInstance """ response = self._session.get( path=self._session.urljoin(self.RESOURCE_PATH, 'instance', counter).format( base_api=self.base_api, name=name), headers={'Accept': 'application/json'}, ) return PipelineInstance(session=self._session, data=response.json())
[docs] def status(self, name): """ The pipeline status allows users to check if the pipeline is paused, locked and schedulable. :versionadded: 14.3.0. :param name: name of the pipeline. :return: JSON containing information about pipeline state, wrapped in EasyDict class. """ response = self._session.get( path=self._session.urljoin(self.RESOURCE_PATH, 'status').format( base_api=self.base_api, name=name), headers={'Accept': 'application/json'}, ) return EasyDict(response.json())
[docs] def pause(self, name, cause): """ Pause the specified pipeline. :versionadded: 14.3.0. :param name: name of the pipeline. :param cause: reason for pausing the pipeline. """ self._session.post( path=self._session.urljoin(self.RESOURCE_PATH, 'pause').format( base_api=self.base_api, name=name), data={'pauseCause': cause}, headers={ 'Accept': 'application/json', 'Confirm': 'true' }, )
[docs] def unpause(self, name): """ Unpause the specified pipeline. :versionadded: 14.3.0. :param name: name of the pipeline. """ self._session.post( path=self._session.urljoin(self.RESOURCE_PATH, 'unpause').format( base_api=self.base_api, name=name), headers={ 'Accept': 'application/json', 'Confirm': 'true' }, )
[docs] def release_lock(self, name): """ Release a lock on a pipeline so that you can start up a new instance without having to wait for the earlier instance to finish. :versionadded: 14.3.0. :param name: name of the pipeline. :return: a text confirmation. """ response = self._session.post( path=self._session.urljoin(self.RESOURCE_PATH, 'releaseLock').format( base_api=self.base_api, name=name), headers={ 'Accept': 'application/json', 'Confirm': 'true' }, ) return response.text
[docs] def schedule(self, name, materials=None, variables=None, secure_variables=None): """ Scheduling allows user to trigger a specific pipeline. :versionadded: 14.3.0. :param name: name of the pipeline. :param materials: material revisions to use. :param variables: environment variables to set. :param secure_variables: secure environment variables to set. :return: a text confirmation. """ data = {'materials': materials, 'variables': variables, 'secure_variables': secure_variables} data = dict((k, v) for k, v in data.items() if v is not None) response = self._session.post( path=self._session.urljoin(self.RESOURCE_PATH, 'schedule').format( base_api=self.base_api, name=name), data=json.dumps(data), headers={ 'Accept': 'application/json', 'Content-Type': 'application/json', 'Confirm': 'true' }, ) return response.text
[docs] def schedule_with_instance( self, name, materials=None, variables=None, secure_variables=None, backoff=0.5, max_tries=20 ): """ Schedule pipeline and return instance. Credits of implementation comes to `gaqzi`: https://github.com/gaqzi/py-gocd/blob/master/gocd/api/pipeline.py#L122 :versionadded: 14.3.0. :warning: Replace this with whatever is the official way as soon as gocd#990 is fixed. \ https://github.com/gocd/gocd/issues/990 :param name: name of the pipeline. :param materials: material revisions to use. :param variables: environment variables to set. :param secure_variables: secure environment variables to set. :param backoff: time to wait before checking for new instance. :param max_tries: maximum tries to do. :return: possible triggered instance of pipeline. :rtype: yagocd.resources.pipeline.PipelineInstance """ last_instance = self.last(name) if last_instance: last_run_counter = last_instance.data.counter else: last_run_counter = -1 self.schedule(name=name, materials=materials, variables=variables, secure_variables=secure_variables) while max_tries > 0: candidate_instance = self.last(name) if candidate_instance and candidate_instance.data.counter > last_run_counter: return candidate_instance time.sleep(backoff) max_tries -= 1
[docs] def value_stream_map(self, name, counter): """ Method builds pipeline instance dependency graph. :param name: name of the pipeline. :param counter: pipeline counter. """ response = self._session.get( path=self._session.urljoin(self.VSM_RESOURCE_PATH, '{}.json'.format(counter)).format( base_api=self._session.base_api(api_path=''), name=name), headers={'Accept': 'application/json'}, ) data = EasyDict(response.json()) nodes = list() dependencies = dict() for level in data['levels']: for node_item in level.nodes: dependencies[node_item.id] = node_item.parents if node_item.node_type == 'DUMMY': # WTF?! continue elif node_item.node_type == 'PIPELINE': for instance in node_item.instances: pipeline_data = dict( id=node_item.id, name=node_item.name, counter=instance.counter, label=instance.label, type=node_item.node_type.capitalize(), stages=[] ) for stage in instance.stages: stage_data = dict( pipeline_name=node_item.name, pipeline_counter=instance.counter, name=stage.name, status=stage.status, counter=stage.locator.split('/')[-1] ) pipeline_data['stages'].append(stage_data) nodes.append(PipelineInstance(session=self._session, data=pipeline_data)) else: if LooseVersion(self._session.server_version) <= LooseVersion('16.5.0'): modifications = [m for m in node_item.instances] else: modifications = [m for sublist in node_item.material_revisions for m in sublist.modifications] for modification in modifications: modification['id'] = node_item.id modification['type'] = node_item.node_type.capitalize() nodes.append(ModificationEntity(session=self._session, data=modification)) return YagocdUtil.build_graph( nodes=nodes, dependencies=lambda parent: dependencies[parent.data.id], compare=lambda candidate, child: candidate == child.data.id )
[docs]class PipelineEntity(BaseNode): """ Class for the pipeline entity, which describes pipeline itself. Executing ``history`` will return pipeline instances. """ def __init__(self, session, data, group=None): super(PipelineEntity, self).__init__(session, data) self._group = group self._pipeline = PipelineManager(session=session) def __iter__(self): """ Method for iterating over all instances of a current pipeline. :return: an array of pipeline instances :class:`yagocd.resources.pipeline.PipelineInstance`. :rtype: list of yagocd.resources.pipeline.PipelineInstance """ return iter(self.full_history()) def __getitem__(self, counter): """ Method for accessing to specific pipeline instance in array-like manner. :param counter: pipeline counter. :return: A pipeline instance object :class:`yagocd.resources.pipeline.PipelineInstance`. :rtype: yagocd.resources.pipeline.PipelineInstance """ return self.get(counter=counter) @property def group(self): """ Name of the group pipeline belongs to. :return: group name. """ return self._group @staticmethod
[docs] def get_url(server_url, pipeline_name): """ Returns url for accessing pipeline entity. """ return "{server_url}/go/tab/pipeline/history/{pipeline_name}".format( server_url=server_url, pipeline_name=pipeline_name )
@property def url(self): """ Returns url for accessing pipeline entity. """ return self.get_url(server_url=self._session.server_url, pipeline_name=self.data.name) @property def config(self): """ Property for accessing pipeline configuration. :rtype: yagocd.resources.pipeline_config.PipelineConfigManager """ return PipelineConfigManager(session=self._session, pipeline_name=self.data.name)
[docs] def history(self, offset=0): """ The pipeline history allows users to list pipeline instances. :param offset: number of pipeline instances to be skipped. :return: an array of pipeline instances :class:`yagocd.resources.pipeline.PipelineInstance`. :rtype: list of yagocd.resources.pipeline.PipelineInstance """ return self._pipeline.history(name=self.data.name, offset=offset)
[docs] def full_history(self): """ Method for accessing full history of specific pipeline. It yields each instance and after one chunk is over moves to the next one. :return: an array of pipeline instances :class:`yagocd.resources.pipeline.PipelineInstance`. :rtype: list of yagocd.resources.pipeline.PipelineInstance """ return self._pipeline.full_history(name=self.data.name)
[docs] def last(self): """ Get last pipeline instance. :rtype: yagocd.resources.pipeline.PipelineInstance """ return self._pipeline.last(name=self.data.name)
[docs] def get(self, counter): """ Gets pipeline instance object. :param counter pipeline counter: :return: A pipeline instance object :class:`yagocd.resources.pipeline.PipelineInstance`. :rtype: yagocd.resources.pipeline.PipelineInstance """ return self._pipeline.get(name=self.data.name, counter=counter)
[docs] def status(self): """ The pipeline status allows users to check if the pipeline is paused, locked and schedulable. :return: JSON containing information about pipeline state, wrapped in EasyDict class. """ return self._pipeline.status(name=self.data.name)
[docs] def pause(self, cause): """ Pause the current pipeline. :param cause: reason for pausing the pipeline. """ self._pipeline.pause(name=self.data.name, cause=cause)
[docs] def unpause(self): """ Unpause the specified pipeline. """ self._pipeline.unpause(name=self.data.name)
[docs] def release_lock(self): """ Release a lock on a pipeline so that you can start up a new instance without having to wait for the earlier instance to finish. :return: a text confirmation. """ return self._pipeline.release_lock(name=self.data.name)
[docs] def schedule(self, materials=None, variables=None, secure_variables=None): """ Scheduling allows user to trigger a specific pipeline. :param materials: material revisions to use. :param variables: environment variables to set. :param secure_variables: secure environment variables to set. :return: a text confirmation. """ return self._pipeline.schedule( name=self.data.name, materials=materials, variables=variables, secure_variables=secure_variables )
[docs] def schedule_with_instance( self, materials=None, variables=None, secure_variables=None, backoff=0.5, max_tries=20 ): """ Schedule pipeline and return instance. Credits of implementation comes to `gaqzi`: https://github.com/gaqzi/py-gocd/blob/master/gocd/api/pipeline.py#L122 :warning: Replace this with whatever is the official way as soon as gocd#990 is fixed. \ https://github.com/gocd/gocd/issues/990 :param materials: material revisions to use. :param variables: environment variables to set. :param secure_variables: secure environment variables to set. :param backoff: time to wait before checking for new instance. :param max_tries: maximum tries to do. :return: possible triggered instance of pipeline. :rtype: yagocd.resources.pipeline.PipelineInstance """ return self._pipeline.schedule_with_instance( name=self.data.name, materials=materials, variables=variables, secure_variables=secure_variables, backoff=backoff, max_tries=max_tries )
[docs] def value_stream_map(self, counter): return self._pipeline.value_stream_map(name=self.data.name, counter=counter)
[docs]class PipelineInstance(BaseNode): """ Pipeline instance represents concrete execution of specific pipeline. """ def __init__(self, session, data): super(PipelineInstance, self).__init__(session, data) self._manager = PipelineManager(session=session) def __iter__(self): """ Method for iterating over stages of a current pipeline instance. :return: arrays of stages :rtype: list of yagocd.resources.stage.StageInstance """ return iter(self.stages()) def __getitem__(self, name): """ Method for accessing to specific stage in array-like manner by name. :param name: name of the stage to search. :return: found stage or None. :rtype: yagocd.resources.stage.StageInstance """ return self.stage(name=name) @property def url(self): """ Returns url for accessing pipeline instance. """ return "{server_url}/go/pipelines/value_stream_map/{pipeline_name}/{pipeline_counter}".format( server_url=self._session.server_url, pipeline_name=self.data.name, pipeline_counter=self.data.counter ) @property def pipeline_url(self): """ Returns url for accessing pipeline entity. """ return PipelineEntity.get_url(server_url=self._session.server_url, pipeline_name=self.data.name)
[docs] def stages(self): """ Method for getting stages from pipeline instance. :return: arrays of stages :rtype: list of yagocd.resources.stage.StageInstance """ stages = list() for data in self.data.stages: stages.append(StageInstance(session=self._session, data=data, pipeline=self)) return stages
[docs] def stage(self, name): """ Method for searching specific stage by it's name. :param name: name of the stage to search. :return: found stage or None. :rtype: yagocd.resources.stage.StageInstance """ for stage in self.stages(): if stage.data.name == name: return stage
[docs] def value_stream_map(self): return self._manager.value_stream_map(name=self.data.name, counter=self.data.counter)
@property def config(self): """ Property for accessing pipeline configuration. :rtype: yagocd.resources.pipeline_config.PipelineConfigManager """ return PipelineConfigManager(session=self._session, pipeline_name=self.data.name)