Skip to content
Snippets Groups Projects
Commit 626b2219 authored by Ashley Whetter's avatar Ashley Whetter
Browse files

Moved parallel code outside of PyLinter

parent 9ec7c30c
No related branches found
No related tags found
No related merge requests found
Loading
Loading
@@ -193,7 +193,7 @@ MSGS = {
 
 
if multiprocessing is not None:
class ChildLinter(multiprocessing.Process):
class ChildRunner(multiprocessing.Process):
def run(self):
# pylint: disable=no-member, unbalanced-tuple-unpacking
tasks_queue, results_queue, self._config = self._args
Loading
Loading
@@ -717,115 +717,23 @@ class PyLinter(config.OptionsManagerMixIn,
return path.endswith('.py')
# pylint: enable=unused-argument
 
def _init_msg_states(self):
for msg in self.msgs_store.messages:
if not msg.may_be_emitted():
self._msgs_state[msg.msgid] = False
def check(self, files_or_modules):
"""main checking entry: check a list of files or modules from their
name.
"""
# initialize msgs_state now that all messages have been registered into
# the store
for msg in self.msgs_store.messages:
if not msg.may_be_emitted():
self._msgs_state[msg.msgid] = False
self._init_msg_states()
 
if not isinstance(files_or_modules, (list, tuple)):
files_or_modules = (files_or_modules,)
 
if self.config.jobs == 1:
self._do_check(files_or_modules)
else:
with _patch_sysmodules():
self._parallel_check(files_or_modules)
def _get_jobs_config(self):
child_config = collections.OrderedDict()
filter_options = {'long-help'}
filter_options.update((opt_name for opt_name, _ in self._external_opts))
for opt_providers in six.itervalues(self._all_options):
for optname, optdict, val in opt_providers.options_and_values():
if optdict.get('deprecated'):
continue
if optname not in filter_options:
child_config[optname] = utils._format_option_value(
optdict, val)
child_config['python3_porting_mode'] = self._python3_porting_mode
child_config['plugins'] = self._dynamic_plugins
return child_config
def _parallel_task(self, files_or_modules):
# Prepare configuration for child linters.
child_config = self._get_jobs_config()
children = []
manager = multiprocessing.Manager()
tasks_queue = manager.Queue()
results_queue = manager.Queue()
for _ in range(self.config.jobs):
child_linter = ChildLinter(args=(tasks_queue, results_queue,
child_config))
child_linter.start()
children.append(child_linter)
# Send files to child linters.
expanded_files = utils.expand_files(files_or_modules, self, self.config.black_list, self.config.black_list_re)
for module_desc in expanded_files:
tasks_queue.put([module_desc.path])
# collect results from child linters
failed = False
for _ in expanded_files:
try:
result = results_queue.get()
except Exception as ex:
print("internal error while receiving results from child linter",
file=sys.stderr)
print(ex, file=sys.stderr)
failed = True
break
yield result
# Stop child linters and wait for their completion.
for _ in range(self.config.jobs):
tasks_queue.put('STOP')
for child in children:
child.join()
if failed:
print("Error occurred, stopping the linter.", file=sys.stderr)
sys.exit(32)
def _parallel_check(self, files_or_modules):
# Reset stats.
self.open()
all_stats = []
module = None
for result in self._parallel_task(files_or_modules):
(
_,
self.file_state.base_name,
module,
messages,
stats,
msg_status
) = result
for msg in messages:
msg = utils.Message(*msg)
self.set_current_module(module)
self.reporter.handle_message(msg)
all_stats.append(stats)
self.msg_status |= msg_status
self.stats = _merge_stats(all_stats)
self.current_name = module
# Insert stats data to local checkers.
for checker in self.get_checkers():
if checker is not self:
checker.stats = self.stats
self._do_check(files_or_modules)
 
def _do_check(self, files_or_modules):
walker = utils.PyLintASTWalker(self)
Loading
Loading
@@ -1290,18 +1198,123 @@ group are mutually exclusive.'),
print("Multiprocessing library is missing, "
"fallback to single process", file=sys.stderr)
linter.set_option("jobs", 1)
else:
if linter.config.jobs == 0:
linter.config.jobs = multiprocessing.cpu_count()
elif linter.config.jobs == 0:
linter.config.jobs = multiprocessing.cpu_count()
 
# insert current working directory to the python path to have a correct
# behaviour
with fix_import_path(args):
linter.check(args)
if linter.config.jobs == 1:
linter.check(args)
else:
self._parallel_run(args)
linter.generate_reports()
if exit:
sys.exit(self.linter.msg_status)
 
def _parallel_run(self, files_or_modules):
with _patch_sysmodules():
self.linter._init_msg_states()
self._parallel_check(files_or_modules)
def _parallel_task(self, files_or_modules):
# Prepare configuration for child linters.
child_config = self._get_jobs_config()
children = []
manager = multiprocessing.Manager()
tasks_queue = manager.Queue()
results_queue = manager.Queue()
for _ in range(self.linter.config.jobs):
child_linter = ChildRunner(args=(tasks_queue, results_queue,
child_config))
child_linter.start()
children.append(child_linter)
# Send files to child linters.
expanded_files = utils.expand_files(
files_or_modules,
self.linter,
self.linter.config.black_list,
self.linter.config.black_list_re
)
for module_desc in expanded_files:
tasks_queue.put([module_desc.path])
# collect results from child linters
failed = False
for _ in expanded_files:
try:
result = results_queue.get()
except Exception as ex:
print("internal error while receiving results from child linter",
file=sys.stderr)
print(ex, file=sys.stderr)
failed = True
break
yield result
# Stop child linters and wait for their completion.
for _ in range(self.linter.config.jobs):
tasks_queue.put('STOP')
for child in children:
child.join()
if failed:
print("Error occured, stopping the linter.", file=sys.stderr)
sys.exit(32)
def _parallel_check(self, files_or_modules):
# Reset stats.
self.linter.open()
all_stats = []
module = None
for result in self._parallel_task(files_or_modules):
(
_,
self.linter.file_state.base_name,
module,
messages,
stats,
msg_status
) = result
for msg in messages:
msg = utils.Message(*msg)
self.linter.set_current_module(module)
self.linter.reporter.handle_message(msg)
all_stats.append(stats)
self.linter.msg_status |= msg_status
self.linter.stats = _merge_stats(all_stats)
self.linter.current_name = module
# Insert stats data to local checkers.
for checker in self.linter.get_checkers():
if checker is not self.linter:
checker.stats = self.linter.stats
def _get_jobs_config(self):
child_config = collections.OrderedDict()
filter_options = {'long-help'}
filter_options.update((opt_name for opt_name, _ in self.linter._external_opts))
for opt_providers in six.itervalues(self.linter._all_options):
for optname, optdict, val in opt_providers.options_and_values():
if optdict.get('deprecated'):
continue
if optname not in filter_options:
child_config[optname] = utils._format_option_value(
optdict, val)
child_config['python3_porting_mode'] = self.linter._python3_porting_mode
child_config['plugins'] = self.linter._dynamic_plugins
return child_config
def cb_set_rcfile(self, name, value):
"""callback for option preprocessing (i.e. before option parsing)"""
self._rcfile = value
Loading
Loading
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment