Source code for cli.single_command

"""Concrete SingleCommand's and their Factory.

Since there can be many SingleCommand's (cat,
echo, wc, ..., you name it), it is convenient
to have them in a separate module.

There is a Factory class that "knows"
how to choose an appropriate Command
given it's string representation.
"""
import logging
import os.path
import os
import subprocess
import sys
import re
from docopt import docopt, DocoptExit
from schema import Schema, SchemaError, Optional, Use

from cli.commands import SingleCommand, RunnableCommandResult
from cli.exceptions import ExitException
from cli.streams import OutputStream


[docs]class CommandExternal(SingleCommand): """An external command (not described in shell). Command args: 0 -- a command's name. This name is searched via joining the current working directory with the command name (i.e. it must be somewhere in the current directory). 1..n -- command's arguments. There can be any arguments depending on a command. """ COMMAND_NOT_FOUND = 1 def run(self, input_stream, env): output = OutputStream() cmd_name = self._args_lst[0] cur_dir = env.get_cwd() cmd_name_full = os.path.join(cur_dir, cmd_name) modified_args = self._args_lst[:] modified_args[0] = cmd_name_full return_code = 0 try: completed_process = subprocess.run(modified_args, input=input_stream.get_input(), stdout=subprocess.PIPE, encoding=sys.stdout.encoding) return_code = completed_process.returncode output.write(completed_process.stdout) except FileNotFoundError: output.write('Command {} not found.'.format(cmd_name_full)) return_code = CommandExternal.COMMAND_NOT_FOUND return RunnableCommandResult(output, env, return_code)
[docs]class SingleCommandFactory: """A class that is responsible for building Single Commands. This class knows which commands exist in shell. """ registered_commands = dict() @staticmethod
[docs] def build_command(cmd_name_and_args_lexem_lst): """Build a single command out of list of lexemes representing it's arguments. Args: cmd_name_and_args_lexem_lst (list[:class:`lexer.Lexem`]): a list of lexemes. The first one must be STRING that represents a command name. The rest are STRING or QUOTED_STRING 's. All string representations of lexemes are passed to a corresponding SingleCommand descendant. """ cmd_name = cmd_name_and_args_lexem_lst[0] cls = SingleCommandFactory._get_command_class_by_name(cmd_name.get_value()) string_repr_of_all_args = [x.get_value() for x in cmd_name_and_args_lexem_lst] return cls(string_repr_of_all_args)
@staticmethod def _get_command_class_by_name(cmd_name): cmd_cls = SingleCommandFactory.registered_commands.get(cmd_name, CommandExternal) logging.debug('SingleCommandFactory: ' \ 'Class {} is responsible for invoking command {}'.format( cmd_cls, cmd_name)) return cmd_cls
def _register_single_command(command_name): """Every shell command with a name (e.g. `cat`) must be registered using this decorator. """ def class_decorator(cls): """Take a class and put it into SingleCommandFactory's dictionary. """ SingleCommandFactory.registered_commands[command_name] = cls return cls return class_decorator @_register_single_command('echo')
[docs]class CommandEcho(SingleCommand): """`echo` command: prints it's arguments. Command args: 0 -- `echo` 1..n -- strings. Those strings are written to the output, space-separated. A new line is then written (as seen in bash). """ def run(self, input_stream, env): output = OutputStream() output.write_line(' '.join(self._args_lst[1:])) return RunnableCommandResult(output, env, 0)
@_register_single_command('wc')
[docs]class CommandWc(SingleCommand): """`wc` command: count the number of words, characters and lines. Command args: 0 -- `wc` 1 (optional) -- a filename. If provided, then `wc` will count the number of characters in this file. Otherwise, it will take it's input. Can be relative or absolute. Returns `FILE_NOT_FOUND` exit code if file was provided, but was not found. """ FILE_NOT_FOUND = 1 BAD_NUMBER_OF_ARGS = 2 @staticmethod def _get_num_words(input_str): num_words = 0 last_was_char = False for ch in input_str: if not ch.isspace(): num_words += not last_was_char last_was_char = True else: last_was_char = False return num_words @staticmethod def _wc_routine(input_str): num_lines = len(input_str.splitlines()) num_words = CommandWc._get_num_words(input_str) num_bytes = len(input_str.encode(sys.stdin.encoding)) wc_result = (num_lines, num_words, num_bytes) logging.debug('wc: result for "{}" is {}.'.format(input_str, wc_result)) return wc_result def run(self, input_stream, env): return_code = 0 output = OutputStream() wc_result = None num_args = len(self._args_lst) if num_args == 2: fl_name = self._args_lst[1] full_fl_name = os.path.join(env.get_cwd(), fl_name) if not os.path.isfile(full_fl_name): output.write('wc: file {} not found.'.format(full_fl_name)) return_code = CommandWc.FILE_NOT_FOUND else: with open(full_fl_name, 'r') as opened_file: wc_result = CommandWc._wc_routine(opened_file.read()) elif num_args == 1: wc_result = CommandWc._wc_routine(input_stream.get_input()) else: output.write('wc got wrong number of arguments: expected 0 or 1, '\ 'got {}.'.format(num_args - 1)) return_code = CommandWc.BAD_NUMBER_OF_ARGS if return_code == 0: n_lines, n_words, n_bytes = wc_result output.write('{} {} {}'.format(n_lines, n_words, n_bytes)) return RunnableCommandResult(output, env, return_code)
@_register_single_command('cat')
[docs]class CommandCat(SingleCommand): """`cat` command: print it's input or file contents. Command args: 0 -- `cat` 1 (optional) -- a filename. If provided, `cat` will output that file's contents. Otherwise, it will print it's input. Can be relative or absolute. Returns `FILE_NOT_FOUND` exit code if file was provided, but was not found. """ FILE_NOT_FOUND = 1 BAD_NUMBER_OF_ARGS = 2 def run(self, input_stream, env): return_code = 0 output = OutputStream() num_args = len(self._args_lst) if num_args == 2: fl_name = self._args_lst[1] full_fl_name = os.path.join(env.get_cwd(), fl_name) if not os.path.isfile(full_fl_name): output.write('cat: file {} not found.'.format(full_fl_name)) return_code = CommandCat.FILE_NOT_FOUND else: with open(full_fl_name, 'r') as opened_file: output.write(opened_file.read()) elif num_args == 1: output.write(input_stream.get_input()) else: output.write('cat got wrong number of arguments: expected 0 or 1, '\ 'got {}.'.format(num_args - 1)) return_code = CommandCat.BAD_NUMBER_OF_ARGS return RunnableCommandResult(output, env, return_code)
@_register_single_command('pwd')
[docs]class CommandPwd(SingleCommand): """`pwd` command: print the current working directory. """ BAD_NUMBER_OF_ARGS = 1 def run(self, input_stream, env): output = OutputStream() if len(self._args_lst) != 1: output.write('pwd got wrong number of arguments: expected 0, '\ 'got {}.'.format(len(self._args_lst) - 1)) return_code = CommandPwd.BAD_NUMBER_OF_ARGS return RunnableCommandResult(output, env, return_code) cur_dir = env.get_cwd() output.write(cur_dir) return RunnableCommandResult(output, env, 0)
@_register_single_command('exit')
[docs]class CommandExit(SingleCommand): """`exit` command: exit shell. Performs exiting via throwing an exception, so all further commands are not run. """ BAD_NUMBER_OF_ARGS = 1 def run(self, input_stream, env): if len(self._args_lst) != 1: output = OutputStream() output.write('exit got wrong number of arguments: expected 0, '\ 'got {}.'.format(len(self._args_lst) - 1)) return_code = CommandExit.BAD_NUMBER_OF_ARGS return RunnableCommandResult(output, env, return_code) raise ExitException()
@_register_single_command('cd')
[docs]class CommandCd(SingleCommand): """`cd` command: change directory. Command args: 0 -- `cd` 1 -- a new filepath. Can be relative or absolute. Returns NEW_DIR_INVALID if the directory does not exist. Returns BAD_NUMBER_OF_ARGS if wrong number of arguments is supplied. """ NEW_DIR_INVALID = 1 BAD_NUMBER_OF_ARGS = 2 def run(self, input_stream, env): output = OutputStream() return_code = 0 if len(self._args_lst) != 2: output.write('cd got wrong number of arguments: expected 1, '\ 'got {}.'.format(len(self._args_lst) - 1)) return_code = CommandCd.BAD_NUMBER_OF_ARGS return RunnableCommandResult(output, env, return_code) cur_dir = env.get_cwd() new_dir = os.path.join(cur_dir, self._args_lst[1]) logging.debug('cd: trying to change dir to {}.'.format(new_dir)) if not os.path.isdir(new_dir): output.write('{} is not a directory.'.format(new_dir)) return_code = CommandCd.NEW_DIR_INVALID else: env.set_cwd(new_dir) return RunnableCommandResult(output, env, return_code)
@_register_single_command('grep')
[docs]class CommandGrep(SingleCommand): """`grep` command: find strings in a file. Usage: grep [-A <n>] [-i] [-w] PATTERN FILE Arguments: PATTERN regular expression pattern to search for FILE path to file where the search is performed Options: -i ignore case when searching -w search for the whole word -A <n> print n lines after match [default: 0] """ def run(self, input_stream, env): output = OutputStream() return_code = 0 schema = Schema({ Optional('--help'): bool, '-A': Use(int, error='n should be integer'), Optional('-w'): bool, Optional('-i'): bool, 'FILE': Use(lambda f: open(os.path.join(env.get_cwd(), f), 'r'), error='FILE should be readable'), 'PATTERN': Use(re.compile, error='PATTERN must be compileable regexp') }) parsed_args = None try: arguments = docopt(CommandGrep.__doc__, argv=self._args_lst[1:], help=False) parsed_args = schema.validate(arguments) except (SchemaError, DocoptExit) as e: output.write(CommandGrep.__doc__) logging.info('Grep arguments parse error: {}'.format(str(e))) return RunnableCommandResult(output, env, return_code) num_lines_after = parsed_args['-A'] need_whole_words = parsed_args['-w'] need_ignore_case = parsed_args['-i'] decompiled_pattern = parsed_args['PATTERN'].pattern opened_file = parsed_args['FILE'] re_flags = 0 if need_ignore_case: re_flags |= re.IGNORECASE if need_whole_words: decompiled_pattern = '\\b{}\\b'.format(decompiled_pattern) compiled_pattern = re.compile(decompiled_pattern, re_flags) lines = opened_file.read().splitlines() for i, line in enumerate(lines): if re.search(compiled_pattern, line): for ln in lines[i:i+1+num_lines_after]: output.write_line(ln) opened_file.close() return RunnableCommandResult(output, env, return_code)