import asyncio import logging import os import sys import defence360agent.internals.logger from defence360agent.contracts.config import Core as Config from defence360agent.simple_rpc import SUCCESS, SocketError from defence360agent.utils import is_root_user from defence360agent.utils.cli import ( EXIT_CODES, EXITCODE_GENERAL_ERROR, print_error, print_response, print_warnings, ) from defence360agent.utils.parsers import EnvParser, create_cli_parser logger = logging.getLogger(__name__) def main(rpc_handlers_init, cli_args): # get ready to start: set conservative umask os.umask(Config.FILE_UMASK) defence360agent.internals.logger.reconfigure() rpc_handlers_init() parser = create_cli_parser() args = parser.parse_args(args=cli_args) if args.log_config or os.environ.get("IMUNIFY360_LOGGING_CONFIG_FILE"): defence360agent.internals.logger.update_logging_config_from_file( args.log_config or os.environ.get("IMUNIFY360_LOGGING_CONFIG_FILE") ) if args.console_log_level: defence360agent.internals.logger.setConsoleLogLevel( args.console_log_level ) if hasattr(args, "endpoint") and hasattr(args, "generate_endpoint_params"): try: cli_kwargs = args.generate_endpoint_params(args) envvar_kwargs = EnvParser.parse( os.environ, args.command, args.envvar_parameter_options, exclude=cli_kwargs, ) result, data = args.endpoint(**envvar_kwargs, **cli_kwargs) print_warnings(data) if result == SUCCESS: print_response(args.command, data, args.json, args.verbose) else: print_error(result, data, args.json, args.verbose) sys.exit(EXIT_CODES[result]) except SocketError as e: print_response( None, {"items": "ERROR: {}".format(e)}, args.json, args.verbose ) sys.exit(EXITCODE_GENERAL_ERROR) else: print(parser.format_help()) def entrypoint(rpc_handlers_init): if not is_root_user(): logger.info("%s could be used by the root user only!", Config.NAME) print( "Imunify360 CLI is unavailable for non-root user", file=sys.stderr ) sys.exit(EXITCODE_GENERAL_ERROR) try: main(rpc_handlers_init, sys.argv[1:]) # ensure loop is closed to prevent asyncio warning # (https://bugs.python.org/issue23548) asyncio.get_event_loop().close() except KeyboardInterrupt: logger.warning("User pressed Ctrl+C, exiting...") sys.exit(EXITCODE_GENERAL_ERROR) except Exception: logger.exception( "Unknown error happened. See logs for more information" ) sys.exit(EXITCODE_GENERAL_ERROR)