Skip to content
Snippets Groups Projects

Resolve "Use the code module for the interactive session"

Merged Ivan Kondov requested to merge 241-use-the-code-module-for-the-interactive-session into master
Files
2
"""manage sessions for dynamic model processing / incremental development"""
import os
import sys
import readline # noqa # pylint: disable=unused-import
from code import InteractiveConsole
from functools import cached_property
from textx import metamodel_from_file, textx_isinstance
from virtmat.language.metamodel.processors import table_processor, null_processor, number_processor
@@ -94,10 +97,11 @@ def expand_query_prefix(query):
return {k: _recursive_q(v, p_map[k]) for k, v in query.items()}
class SessionManager():
class SessionManager(InteractiveConsole):
"""session manager for basic interactive work using a text terminal"""
def __init__(self, lpad, **kwargs):
super().__init__()
self.lpad = lpad
self.kwargs = dict(kwargs)
del self.kwargs['uuid']
@@ -115,29 +119,38 @@ class SessionManager():
'Table': table_processor}
self.metamodel.register_obj_processors(obj_processors)
def main_loop(self):
"""this is the main loop of the interactive session"""
if not sys.stdin.isatty():
msg = 'Session running in a shell not connected to a terminal'
get_logger(__name__).warning(msg)
warnings.warn(msg, TextSUserWarning)
ps1_save = getattr(sys, 'ps1', None)
ps2_save = getattr(sys, 'ps2', None)
try:
sys.ps1 = 'Input > '
sys.ps2 = ' > '
self.interact(banner='', exitmsg='')
finally:
print('Exiting')
self.session.stop_runner()
sys.ps1 = ps1_save
sys.ps2 = ps2_save
def runsource(self, source, filename=None, symbol=None):
"""this is used by the superclass to realize a REPL"""
if source.strip():
try:
self.process_input(source)
finally:
self.check_session()
return False
@error_handler
def get_model_value(self, *args, **kwargs):
"""wrapped and evaluated version of get_model() of the Session class"""
return getattr(self.session.get_model(*args, uuid=self.uuid, **kwargs), 'value', '')
def main_loop(self):
"""this is the main loop of the interactive session"""
while True:
try:
input_str = input('Input > ')
except EOFError:
print('\nExiting')
self.session.stop_runner()
break
except KeyboardInterrupt:
print('\nType %exit or %close or Ctrl+D to close session')
continue
if not input_str.strip():
continue
if self.process_input(input_str):
break
self.check_session()
def check_session(self):
"""check session consistency"""
if (len(self.session.models) != len(self.session.uuids) or
@@ -154,9 +167,7 @@ class SessionManager():
get_logger(__name__).debug('process_input: session model: %s', model)
if textx_isinstance(model, self.metamodel['Magic']):
if model.com in ('exit', 'bye', 'close', 'quit'):
print('Exiting')
self.session.stop_runner()
return True
raise SystemExit()
self.process_magic(model)
return False
if textx_isinstance(model, self.metamodel['Expression']):
Loading