ClusterShell

Transport ClusterShell: worker and event handlers.

class cumin.transports.clustershell.ClusterShellWorker(config: dict, target: Target)[source]

Bases: BaseWorker

It provides a Cumin worker for SSH using the ClusterShell library.

This transport uses the ClusterShell Python library to connect to the selected hosts and execute a list of commands. This transport accept the following customizations:

  • sync execution mode: given a list of commands, the first one will be executed on all the hosts, then, if the success ratio is reached, the second one will be executed on all hosts where the first one was successful, and so on.

  • async execution mode: given a list of commands, on each hosts the commands will be executed sequentially, interrupting the execution on any single host at the first command that fails. The execution on the hosts is independent between each other.

  • custom execution mode: can be achieved creating a custom event handler class that extends the BaseEventHandler class defined in cumin/transports/clustershell.py, implementing its abstract methods and setting to this class object the handler to the transport.

Worker ClusterShell constructor.

Parameters:

according to parent cumin.transports.BaseWorker.__init__().

property results: ExecutionResults

Property to access the results instance after having executed the commands.

Concrete implementation of parent abstract property.

execute() int[source]

Execute the commands on all the targets using the handler.

Concrete implementation of parent abstract method.

Parameters:

according to parent cumin.transports.BaseWorker.execute().

run() ExecutionResults[source]

Execute the commands on all the targets using the handler.

Concrete implementation of parent abstract method.

Parameters:

according to parent cumin.transports.BaseWorker.execute().

get_results()[source]

Get the results of the last task execution.

Concrete implementation of parent abstract method.

Parameters:

according to parent cumin.transports.BaseWorker.get_results().

property handler: Type[BaseEventHandler] | None

Concrete implementation of parent abstract getter and setter.

Accepted values for the setter: * an instance of a custom handler class derived from BaseEventHandler. * a str with one of the available default handler listed in DEFAULT_HANDLERS.

The event handler is mandatory for this transport.

Parameters:

according to parent cumin.transports.BaseWorker.handler.

property reporter: Type[BaseReporter]

Getter for the reporter property.

It must be a subclass of cumin.transports.clustershell.BaseReporter.

property progress_bars: bool

Getter for the boolean progress_bars property.

class cumin.transports.clustershell.HostRun(*, name: str, commands: list[~cumin.transports.Command], return_codes: list[int] = <factory>, last_executed_command_index: int = -1, state: ~cumin.transports.State = <factory>)[source]

Bases: object

Class to keep track of the execution run for a single host.

Parameters:
  • name (str) -- the hostname used to connect to it, usually the FQDN.

  • commands (list) -- the list of commands instances to execute.

  • return_codes (list) -- the list of return codes for each executed command.

  • last_executed_command_index (int) -- the index of the last executed command from the commands list on this host.

  • state (cumin.transports.State) -- the current state of the host.

property has_completed: bool

Whether the host has executed all commands.

Returns:

True if the host has executed all commands, False otherwise.

Return type:

bool

class cumin.transports.clustershell.ExecutionRun(*, commands: list[Command], hosts: NodeSet)[source]

Bases: object

Class to keep track of the whole execution progress.

Initialize the instance.

Parameters:
next_host(state: HostState) HostRun | None[source]

Get the next available host in a given state.

Parameters:

state (cumin.transports.HostState) -- the state a host must be to be selected.

Returns:

if no host was found with the given state. cumin.transports.clustershell.HostRun: the host instance if one is found.

Return type:

None

get_counter() Counter[source]

Helper method to get a Counter instance with all the hosts by state.

Returns:

the counter instance with keys that are cumin.transports.HostState instances.

Return type:

Counter

init_command_results(index: int) SimpleNamespace[source]

Initialize a simple object to keep track of the execution of a given command.

Parameters:

index (int) -- the index of the command from the commands list on this host to track.

Returns:

the simple object.

Return type:

types.SimpleNamespace

get_targets() list[TargetedHosts][source]

Helper method to get the TargetedHosts instances when returning the final results.

Returns:

a list of cumin.transports.TargetedHosts instances, one for each executed command.

Return type:

list

get_command_outputs(index: int) tuple[HostsOutputResult, ...][source]

Get the command outputs for a given command index.

Parameters:

index (int) -- the index of the command from the commands list on this host to track.

Returns:

the list of command outputs.

Return type:

list

get_results() ExecutionResults[source]

Get the final results in the cumin's API representation.

Returns:

the results instance.

Return type:

cumin.transports.ExecutionResults

class cumin.transports.clustershell.BaseReporter[source]

Bases: object

abstract Reporter base class that does not report anything.

Initializes a Reporter.

abstractmethod global_timeout_nodes(nodes: dict[str, HostRun]) None[source]

Print the nodes that were caught by the global timeout in a colored and tqdm-friendly way.

Parameters:

nodes (dict) -- the mapping of the nodes processed.

abstractmethod failed_nodes(nodes: dict[str, HostRun], num_hosts: int, commands: list[Command], filter_command_index: int = -1) None[source]

Print the nodes that failed to execute commands in a colored and tqdm-friendly way.

Parameters:
  • nodes (list) -- the list of Nodes on which commands were executed

  • num_hosts (int) -- the total number of nodes.

  • commands (list) -- the list of Commands that were executed

  • filter_command_index (int, optional) -- print only the nodes that failed to execute the command specified by this command index.

abstractmethod success_nodes(command: Command | None, num_successfull_nodes: int, success_ratio: float, num_hosts: int, success_threshold: float, nodes: dict[str, HostRun], command_index: int) None[source]

Print how many nodes successfully executed all commands in a colored and tqdm-friendly way.

Parameters:
  • command (cumin.transports.Command) -- the command that was executed

  • num_successfull_nodes (int) -- the number of nodes on which the execution was successful

  • success_ratio (float) -- the ratio of successful nodes

  • tot (int) -- total number of successful executions

  • num_hosts (int) -- the total number of nodes.

  • success_threshold (float) -- the threshold of successful nodes above which the command execution is deemed successful

  • nodes (list) -- the nodes on which the command was executed

  • command_index (int) -- the index of the command in the list of commands to execute

abstractmethod command_completed() None[source]

To be called on completion of processing, when no command specific output is required.

abstractmethod command_output(command_outputs: tuple[HostsOutputResult, ...]) None[source]

Print the command output in a colored and tqdm-friendly way.

Parameters:

command_outputs (list) -- the list of command outputs instances.

abstractmethod command_header(command: Command, command_index: int) None[source]

Reports a single command execution.

Parameters:
  • command (cumin.transports.Command) -- the command the header belongs to.

  • command_index (int) -- the index of the command in the list of commands to execute.

abstractmethod message_element(message: MsgTreeElem) None[source]

Report a single message as received from the execution of a command on a node.

Parameters:

message (ClusterShell.MsgTree.MsgTreeElem) -- the message to report.

class cumin.transports.clustershell.NullReporter(*args, **kwargs)[source]

Bases: BaseReporter

abstract Reporter class that does not report anything.

Initializes a Reporter.

_callable(*args, **kwargs)[source]

Just a callable that does nothing.

class cumin.transports.clustershell.TqdmQuietReporter(*args, **kwargs)[source]

Bases: NullReporter

abstract Reports the progress of command execution without the command output.

Initializes a Reporter.

short_command_length = 35

the length to which a command should be shortened in various outputs.

Type:

int

_report_line(message: str, color_func: ~typing.Callable[[str], str] = <function ColoredType.__getattr__.<locals>.<lambda>>, nodes_string: str = '') None[source]

Print a tqdm-friendly colored status line with success/failure ratio and optional list of nodes.

Parameters:
  • message (str) -- the message to print.

  • color_func (function, optional) -- the coloring function, one of :py:class`cumin.color.Colored` methods.

  • nodes_string (str, optional) -- the string representation of the affected nodes.

_get_log_message(num: int, num_hosts: int, message: str, nodes: list[str] | None = None) tuple[str, str][source]

Get a pre-formatted message suitable for logging or printing.

Parameters:
  • num (int) -- the number of affected nodes.

  • num_hosts (int) -- the total number of nodes.

  • message (str) -- the message to print.

  • nodes (list, optional) -- the list of nodes affected.

Returns:

a tuple of (logging message, NodeSet of the affected nodes).

Return type:

tuple

global_timeout_nodes(nodes: dict[str, HostRun]) None[source]

Print the nodes that were caught by the global timeout in a colored and tqdm-friendly way.

Parameters:

according to parent BaseReporter.global_timeout_nodes().

failed_nodes(nodes: dict[str, HostRun], num_hosts: int, commands: list[Command], filter_command_index: int = -1) None[source]

Print the nodes that failed to execute commands in a colored and tqdm-friendly way.

Parameters:

according to parent BaseReporter.failed_nodes().

success_nodes(command: Command | None, num_successfull_nodes: int, success_ratio: float, num_hosts: int, success_threshold: float, nodes: dict[str, HostRun], command_index: int) None[source]

Print how many nodes successfully executed all commands in a colored and tqdm-friendly way.

Parameters:

according to parent BaseReporter.success_nodes().

class cumin.transports.clustershell.TqdmReporter(*args, **kwargs)[source]

Bases: TqdmQuietReporter

Reports the progress of command execution with full command output.

Initializes a Reporter.

command_completed() None[source]

To be called on completion of processing, when no command specific output is required.

Parameters:

according to parent BaseReporter.command_completed().

command_output(command_outputs: tuple[HostsOutputResult, ...]) None[source]

Print the command output in a colored and tqdm-friendly way.

Parameters:

according to parent BaseReporter.command_output().

command_header(command: Command, command_index: int) None[source]

Reports a single command execution.

Parameters:

according to parent BaseReporter.command_header().

message_element(message: MsgTreeElem) None[source]

Report a single message as received from the execution of a command on a node.

Parameters:

according to parent BaseReporter.message_element().

class cumin.transports.clustershell.BaseEventHandler(target: Target, commands: list[Command], reporter: BaseReporter, progress_bars: BaseExecutionProgress, success_threshold: float = 1.0, **kwargs: Any)[source]

Bases: EventHandler

ClusterShell event handler base class.

Inherit from ClusterShell.Event.EventHandler class and define a base EventHandler class to be used in Cumin. It can be subclassed to generate custom EventHandler classes while taking advantage of some common functionalities.

Event handler ClusterShell extension constructor.

Parameters:
close(task)[source]

Additional method called at the end of the whole execution, useful for reporting and final actions.

Parameters:

task (ClusterShell.Task.Task) -- a ClusterShell Task instance.

on_timeout(task)[source]

Update the state of the nodes and the timeout counter.

Callback called by the ClusterShellWorker when a ClusterShell.Task.TimeoutError is raised. It means that the whole execution timed out.

Parameters:

task (ClusterShell.Task.Task) -- a ClusterShell Task instance.

ev_hup(worker, node, rc)[source]

Command execution completed on a node.

This callback is triggered by ClusterShell for each node when it completes the execution of a command.

Parameters:

according to parent ClusterShell.Event.EventHandler.ev_hup().

ev_pickup(worker, node)[source]

Command execution started on a node, remove the command from the node's queue.

This callback is triggered by the ClusterShell library for each node when it starts executing a command.

Parameters:

according to parent ClusterShell.Event.EventHandler.ev_pickup().

ev_read(worker, node, sname, msg)[source]

Worker has data to read from a specific node. Print it if running on a single host.

This callback is triggered by ClusterShell for each node when output is available.

Parameters:

according to parent ClusterShell.Event.EventHandler.ev_read().

ev_close(worker, timedout)[source]

Worker has finished or timed out.

This callback is triggered by ClusterShell when the execution has completed or timed out.

Parameters:

according to parent ClusterShell.Event.EventHandler.ev_close().

_success_nodes_report(current_command_index: int = -1) None[source]

Print how many nodes successfully executed all commands in a colored and tqdm-friendly way.

Parameters:

current_command_index (int, optional) -- the command index the success is referring to, if any.

property is_above_threshold: bool

Whether the success threshold is still met.

Returns:

True if the success ratio is still above or equal the success threshold.

Return type:

bool

class cumin.transports.clustershell.SyncEventHandler(target: Target, commands: list[Command], reporter: BaseReporter, progress_bars: BaseExecutionProgress, success_threshold: float = 1.0, **kwargs: Any)[source]

Bases: BaseEventHandler

Custom ClusterShell event handler class that execute commands synchronously.

The implemented logic is:

  • execute command #N on all nodes where command #`N-1` was successful according to batch_size.

  • the success ratio is checked at each command completion on every node, and will abort if not met, however nodes already scheduled for execution with ClusterShell will execute the command anyway. The use of the batch_size allow to control this aspect.

  • if the execution of command #N is completed and the success ratio is greater than the success threshold, re-start from the top with N=N+1.

The typical use case is to orchestrate some operation across a fleet, ensuring that each command is completed by enough nodes before proceeding with the next one.

Define a custom ClusterShell event handler to execute commands synchronously.

Parameters:

according to parent BaseEventHandler.__init__().

start_command(schedule: bool = False) None[source]

Initialize progress bars and variables for this command execution.

Executed at the start of each command.

Parameters:

schedule (bool, optional) -- whether the next command should be sent to ClusterShell for execution or not.

end_command() bool[source]

Command terminated, print the result and schedule the next command if criteria are met.

Executed at the end of each command inside a lock.

Returns:

True if the next command should be scheduled, False otherwise.

Return type:

bool

on_timeout(task: <module 'ClusterShell.Task' from '/home/docs/checkouts/readthedocs.org/user_builds/cumin/envs/v6.0.0/lib/python3.14/site-packages/ClusterShell/Task.py'>) None[source]

Override parent class on_timeout method to run end_command.

Parameters:

according to parent BaseEventHandler.on_timeout().

ev_hup(worker, node, rc)[source]

Command execution completed on a node.

This callback is triggered by ClusterShell for each node when it completes the execution of a command. Update the progress bars and keep track of nodes based on the success/failure of the command's execution. Schedule a timer for further decisions.

Parameters:

according to parent ClusterShell.Event.EventHandler.ev_hup().

ev_timer(timer: EngineTimer) None[source]

Schedule the current command on the next node or the next command on the first batch of nodes.

This callback is triggered by ClusterShell when a scheduled Task.timer() goes off.

Parameters:

according to parent ClusterShell.Event.EventHandler.ev_timer().

close(task: <module 'ClusterShell.Task' from '/home/docs/checkouts/readthedocs.org/user_builds/cumin/envs/v6.0.0/lib/python3.14/site-packages/ClusterShell/Task.py'>) None[source]

Concrete implementation of parent abstract method to print the success nodes report.

Parameters:

according to parent cumin.transports.BaseEventHandler.close().

class cumin.transports.clustershell.AsyncEventHandler(target: Target, commands: list[Command], reporter: BaseReporter, progress_bars: BaseExecutionProgress, success_threshold: float = 1.0, **kwargs: Any)[source]

Bases: BaseEventHandler

Custom ClusterShell event handler class that execute commands asynchronously.

The implemented logic is:

  • execute on all nodes independently every command in a sequence, aborting the execution on that node if any command fails.

  • The success ratio is checked at each node completion (either because it completed all commands or aborted earlier), however nodes already scheduled for execution with ClusterShell will execute the commands anyway. The use of the batch_size allows to control this aspect.

  • if the success ratio is met, schedule the execution of all commands to the next node.

The typical use case is to execute read-only commands to gather the status of a fleet without any special need of orchestration between the nodes.

Define a custom ClusterShell event handler to execute commands asynchronously between nodes.

Parameters:

according to parent BaseEventHandler.__init__().

ev_hup(worker, node, rc)[source]

Command execution completed on a node.

This callback is triggered by ClusterShell for each node when it completes the execution of a command. Enqueue the next command if the success criteria are met, track the failure otherwise. Update the progress bars accordingly.

Parameters:

according to parent ClusterShell.Event.EventHandler.ev_hup().

ev_timer(timer: EngineTimer) None[source]

Schedule the current command on the next node or the next command on the first batch of nodes.

This callback is triggered by ClusterShell when a scheduled Task.timer() goes off.

Parameters:

according to parent ClusterShell.Event.EventHandler.ev_timer().

close(task: <module 'ClusterShell.Task' from '/home/docs/checkouts/readthedocs.org/user_builds/cumin/envs/v6.0.0/lib/python3.14/site-packages/ClusterShell/Task.py'>) None[source]

Concrete implementation of parent abstract method to print the nodes reports and close progress bars.

Parameters:

according to parent cumin.transports.BaseEventHandler.close().

cumin.transports.clustershell.worker_class

Required by the transport auto-loader in cumin.transport.Transport.new().

cumin.transports.clustershell.DEFAULT_HANDLERS: dict[str, Type[EventHandler]] = {'async': <class 'cumin.transports.clustershell.AsyncEventHandler'>, 'sync': <class 'cumin.transports.clustershell.SyncEventHandler'>}

mapping of available default event handlers for ClusterShellWorker.

Type:

dict