While the MRJob
class is the part of the framework thathandles the execution of your code in a MapReduce context, the runner isthe part that packages and submits your job to be run, and reporting theresults back to you.
In most cases, you will interact with runners via the command line andconfiguration files. When you invoke mrjob via the command line, it reads yourcommand line options (the --runner
parameter) to determine which type ofrunner to create. Then it creates the runner, which reads your configurationfiles and command line args and starts your job running in whatever contextyou chose.
Most of the time, you won’t have any reason to construct a runner directly.Instead you’ll invoke your Python script on the command line and it will make arunner automatically or you’ll write some sort of wrapper that callsmy_job.make_runner()
.
Internally, the general order of operations is:
After you have reviewed our how-to document, please login and use this form to create a new job posting. If you have submitted jobs previously under your login, you can view them by logging in now. In case of questions, please contact the PSF Python Job Board team. Runners are configured by several methods: from mrjob.conf (see Config file format and location) from the command line by re-defining jobrunnerkwargs etc in your MRJob (see Job runner configuration) by instantiating the runner directly.
make_runner()
on your jobrun()
on your runner. This will:--mapper
, --combiner
, --reducer
, and--step-num
argumentsEach runner runs a single job once; if you want to run a job multipletimes, make multiple runners.
Subclasses: DataprocJobRunner
,EMRJobRunner
,HadoopJobRunner
,InlineMRJobRunner
,LocalMRJobRunner
To test the job locally, just run:
The script will automatically invoke itself to run the various steps, usingInlineMRJobRunner
(--runner=inline
). If you wantto simulate Hadoop more closely, you can use --runner=local
, which doesn’tadd your working directory to the PYTHONPATH
, sets a few Hadoopenvironment variables, and uses multiple subprocesses for tasks.
You can also run individual steps:
By default, we read from stdin, but you can also specify one or moreinput files. It automatically decompresses .gz and .bz2 files:
See mrjob.examples
for more examples.
Set up a hadoop cluster (see http://hadoop.apache.org/docs/current/)
Run your job with -rhadoop
:
Note
You don’t need to install mrjob
or any other libraries on the nodesof your Hadoop cluster, but they do at least need a version of Pythonthat’s compatible with your job.
Set up your Amazon account and credentials (see Configuring AWS credentials)
Run your job with -remr
:
Set up your Google account and credentials (see Getting started with Google Cloud)
Run your job with -rdataproc
:
Note
Dataproc does not yet support Spark or libjars.
Runners are configured by several methods:
mrjob.conf
(see Config file format and location)job_runner_kwargs()
etc in yourMRJob
(see Job runner configuration)In most cases, you should put all configuration in mrjob.conf
and use thecommand line args or class variables to customize how individual jobs are run.
It is fairly common to write an organization-specific wrapper around mrjob. Usemake_runner()
to run an MRJob
from another Python script. The context manager guarantees that all temporaryfiles are cleaned up regardless of the success or failure of your job.
This pattern can also be used to write integration tests (see Testing jobs).
You instantiate the MRJob
, use a context manager tocreate the runner, run the job, and cat its output, parsing that output withthe job’s output protocol.
Further reference:
Note
You should pay attention to the next sentence.
You cannot use the programmatic runner functionality in the same file as yourjob class. As an example of what not to do, here is some code that does notwork.
What you need to do instead is put your job in one file, and your run code inanother. Here are two files that would correctly handle the above case.
The file with the job class is sent to Hadoop to be run. Therefore, the jobfile cannot attempt to start the Hadoop job, or you would be recursivelycreating Hadoop jobs!
The code that runs the job should only run outside of the Hadoop context.
The if__name__'__main__'
block is only run if you invoke the job fileas a script. It is not run when imported. That’s why you can import the jobclass to be run, but it can still be invoked as an executable.
Counters may be read through thecounters()
method on the runner. Theexample below demonstrates the use of counters in a test case.
mr_counting_job.py
test_counters.py
Python 3.4 or 3.6 is installed on my Amazon EMR cluster instances, but Spark is running Python 2.7. How do I upgrade Spark to Python 3.4 or 3.6?
In most Amazon EMR release versions, cluster instances and system applications use different Python versions by default:
To upgrade the Python version that PySpark uses, point the PYSPARK_PYTHON environment variable for the spark-env classification to the directory where Python 3.4 or 3.6 is installed.
Amazon EMR release version 5.21.0 and later
Submit a reconfiguration request with a configuration object similar to the following:
Amazon EMR release version 4.6.0-5.20.x
1. Connect to the master node using SSH.
2. Run the following command to change the default Python environment:
3. Run the pyspark command to confirm that PySpark is using the correct Python version:
The output shows that PySpark is now using the same Python version that is installed on the cluster instances. Example:
Spark uses the new configuration for the next PySpark job.
Add a configuration object similar to the following when you launch a cluster using Amazon EMR release version 4.6.0 or later: