Skip to content

Latest commit

 

History

History
736 lines (535 loc) · 46.9 KB

Ch07-joining_patterns.asciidoc

File metadata and controls

736 lines (535 loc) · 46.9 KB

Analytic Patterns: Joining Tables

In this chapter we’ll cover JOIN operations in Pig. A join is used to 'join' multiple datasets or relations into a single relation based on the presence of a common key or keys. Pig supports several types of JOIN operations, including INNER, OUTER and FULL joins. We’ll be learning how to perform different kinds of joins in Pig, and we’ll also walk through how a join works at a low level, in Python/MrJob. By the end of the chapter, you’ll understand how to join like a pro.

To understand this chapter, it helps if you’re familiar with joining data from a SQL or related background. If you’re new to joins, a more thorough introduction will help. Check out the post 'A Visual Explanation of SQL Joins' by Jeff Atwood.

In database terminology, a join combines the rows of two or more tables based on some matching information, known as a key. For example, you could join a table of names and a table of mailing addresses, so long as both tables had a common field for the user ID. You could also join a table of prices to a table of items, given an item ID column in both tables. Joins are useful because they permit people to normalize data — that is to say, eliminate redundant content between multiple tables — yet still bring several tables' content to a single view on-the-fly.

Joins are pedestrian fare in relational databases. Far less so for Hadoop, since MapReduce wasn’t really created with joins in mind, and you have to go through acrobatics to make it work. [1] Pig’s JOIN operator provides the syntactical ease of a SQL query. While Pig will shield you from hand-writing joins in MapReduce, it’s still all MapReduce behind the scenes, so your joins are still subject to certain performance considerations. This section will dig into the basics of Pig joins, then explain how to avoid certain mishaps.

Matching Records Between Tables (Inner Join)

Inner joins are used to find the set of records having matching join keys in both tables. If a record’s join key in table A doesn’t have a match in any record’s join key in table B, it will be filtered from the output. If there is a match, the records with matched keys will be crossed to produce as many records as the number of records in table A times the number of records in table B, for that join key.

In other words, an inner join drops records that don’t have matching keys in both tables. This means that the result of an inner join may have fewer rows than either of the original tables, or it may have more records than either of the original tables - up to an upper limit of A * B.

Joining Records in a Table with Directly Matching Records from Another Table (Direct Inner Join)

There is a stereotypical picture in baseball of a "slugger": a big fat man who comes to plate challenging your notion of what an athlete looks like, and challenging the pitcher to prevent him from knocking the ball for multiple bases (or at least far enough away to lumber up to first base). To examine the correspondence from body type to ability to hit for power (i.e. high SLG), we will need to join the people table (listing height and weight) with their hitting stats.

Preparing the Relations to Join (ch_07/fat_hits.pig)
fatness = FOREACH people GENERATE
    player_id, name_first, name_last,
    height_in, weight_lb;

slugging_stats = FOREACH (FILTER bat_careers BY (PA > 1000))
    GENERATE
        player_id,
        SLG;

The syntax of the join statement itself shouldn’t be much of a surprise:

An Inner Join (ch_07/fat_hits.pig)
slugging_fatness_join = JOIN
    fatness        BY player_id,
    slugging_stats BY player_id;

just_20 = LIMIT slugging_fatness_join 20; DUMP @;

DESCRIBE just_20

/*
{
    fatness::player_id: chararray,
    fatness::name_first: chararray,
    fatness::name_last: chararray,
    fatness::height_in: int,
    fatness::weight_lb: int,
    slugging_stats::player_id: chararray,
    slugging_stats::SLG: double
}
*/
Disambiguating Field Names With ::

As a consequence of flattening records from the fatness table next to records from the slugging_stats table, the two tables each contribute a field named player_id. Although we privately know that both fields have the same value, Pig is right to insist on an unambiguous reference. The schema helpfully prefixes the field names with a slug, separated by ::, to make it unambiguous.

You’ll need to run a FOREACH across the joined data, specifying the qualified names of the fields you want to keep. One thing to keep in mind is that it is easy to get confused as to whether you should reference a field via x::y or x.y. Try to remember: x::y is used to disambiguate joined records, and x.y is used to reference values in bags when calling aggregate functions.

Body Type vs Slugging Average

So having done the join, we finish by preparing the output:

bmis = FOREACH (JOIN fatness BY player_id, slugging_stats BY player_id) {

    BMI = 703.0*weight_lb/(double)(height_in*height_in);

    GENERATE
        fatness::player_id,
        name_first,
        name_last,
        SLG,
        height_in,
        weight_lb,
        BMI;
};

We added a field for BMI (Body Mass Index), a simple measure of body type. It is found by diving a person’s weight by their height squared, and, since we’re stuck with english units, multiplying by 703 to convert to metric.

Though BMI can’t distinguish between 180 pounds of muscle and 180 pounds of flab, it reasonably controls for weight-due-to-tallness vs weight-due-to-bulkiness: beanpole Randy Johnson (6'10"/2.1m, 225lb/102kg) and pocket rocket Tim Raines (5'8"/1.7m, 160lb/73kb) both have a low BMI of 23; Babe Ruth (who in his later days was 6'2"/1.88m 260lb/118kb) and Cecil Fielder (of whom Bill James wrote "…​his reported weight of 261 leaves unanswered the question of what he might weigh if he put his other foot on the scale") both have high BMIs well above 30 [2]

How a Join Works

So that you can effectively reason about the behavior of a JOIN, it’s important that you have the following two ways to think about its operation: (a) as the equivalent of a COGROUP-and-FLATTEN; and (b) as the underlying map-reduce job it produces. Understanding how a join works in map/reduce goes a long way to understanding map/reduce itself.

A Join is a COGROUP+FLATTEN

A JOIN in Pig is just shorthand for the equivalent COGROUP operation. Applying the COGROUP operation with a FLATTEN in place of the JOIN gives us the equivalent command:

-- Original JOIN
slugging_fatness_join = JOIN fatness BY player_id,
														 slugging_stats BY player_id;

-- Equivalent COGROUP/FLATTEN
slugging_fatness_join = FOREACH
	(COGROUP fatness BY player_id, slugging_stats BY player_id)
	GENERATE
		FLATTEN(fatness),
		FLATTEN(slugging_stats);

DESCRIBE slugging_fatness_join;

slugging_fatness_join: {
	fatness::player_id: chararray,
	fatness::name_first: chararray,
	fatness::name_last: chararray,
	fatness::height_in: int,
	fatness::weight_lb: int,
	slugging_stats::player_id: chararray,
	slugging_stats::SLG: double
}

In this sense, a JOIN is just a convenience - shorthand for a COGROUP/FLATTEN. We haven’t introduced COGROUP before, but it is a fundamental operation in Pig. A COGROUP is direct shorthand for a reduce operation on more than one table. Used on one table, it is equivalent to GROUP BY. Used on two tables, it causes a joint reduce on the join keys between the tables.

A Join is a Map/Reduce Job with a Secondary Sort on the Table Name

The way to perform a join in map-reduce is similarly a particular application of the COGROUP we stepped through above. In the next example, we’ll walk through it on its own in Python/MrJob. We’ll be joining an example customers table (created by Joe Stein):

Customers (ch_07/customers.dat)
Alice Bob|not bad|US
Sam Sneed|valued|CA
Jon Sneed|valued|CA
Arnold Wesise|not so good|UK
Henry Bob|not bad|US
Yo Yo Ma|not so good|CA
Jon York|valued|CA
Alex Ball|valued|UK
Jim Davis|not so bad|JA

to an example countries table:

Countries (ch_07/countries.dat)
United States|US
Canada|CA
United Kingdom|UK
Italy|IT

The mapper receives its set of input splits either from the customers table or from the countries table and makes the appropriate transformations: splitting the line into fields, and emitting a key/value. The key is the join key - in this case, the country code field of both sets of records. The mapper knows which file and type of record it is receiving based on the length of the fields (in Pig, the JOIN code would have the schema). The records it emits follow the COGROUP pattern: the join field as the key, which acts as the partitioning key; We use the SORT_VALUES option, which ensures the values are sorted as well. Then, we employ a trick to ensure that for each join key, country records are seen always before customer records. We achieve this by adding an arbitrary key to the front of the value: 'A' for countries, 'B' for customers. This makes countries sort before customers for each and every join/partition key. After that trick, the join is simply a matter of storing countries ('A' records) and crossing this array with each customer record.

# Adapted for MrJob from Joe Stein's example at:
# http://allthingshadoop.com/2011/12/16/simple-hadoop-streaming-tutorial-using-joins-and-keys-with-python/

import sys, os, re
from mrjob.job import MRJob

class MRJoin(MRJob):

  # Performs secondary sort
  SORT_VALUES = True

  def mapper(self, _, line):
    splits = line.rstrip("\n").split("|")

    if len(splits) == 2: # country data
      symbol = 'A' # make country sort before person data
      country2digit = splits[1]
      yield country2digit, [symbol, splits]
    else: # person data
      symbol = 'B'
      country2digit = splits[2]
      yield country2digit, [symbol, splits]

  def reducer(self, key, values):
    countries = [] # should come first, as they are sorted on artificia key 'A'
    for value in values:
      if value[0] == 'A':
        countries.append(value)
      if value[0] == 'B':
        for country in countries:
          yield key, country[1:] + value[1:]

if __name__ == '__main__':
  MRJoin.run()

To run our join locally using MrJob:

cd examples/ch_07
python ./join.py countries.dat customers.dat

Our output is as expected for an inner join. The key is the join key, and the value is the pair of records joined:

"CA"	[["Canada", "CA"], ["Jon Sneed", "valued", "CA"]]
"CA"	[["Canada", "CA"], ["Jon York", "valued", "CA"]]
"CA"	[["Canada", "CA"], ["Sam Sneed", "valued", "CA"]]
"CA"	[["Canada", "CA"], ["Yo Yo Ma", "not so good", "CA"]]
"UK"	[["United Kingdom", "UK"], ["Alex Ball", "valued", "UK"]]
"UK"	[["United Kingdom", "UK"], ["Arnold Wesise", "not so good", "UK"]]
"US"	[["United States", "US"], ["Alice Bob", "not bad", "US"]]
"US"	[["United States", "US"], ["Henry Bob", "not bad", "US"]]

The output this join has one record for each discrete combination of the keys in A (countries) and B (customers). As you will notice in our Python/MrJob version of the join, the secondary sort ensures that for each key the reducer receives all the records for table A strictly followed by all records for table B. We gather all the A records in to an array, then on each B record emit the A records stapled to the B records. All the A records have to be held in memory at the same time, while all the B records simply flutter by; this means that if you have two datasets of wildly different sizes or distribution, it is worth ensuring the Reducer receives the smaller group first. In map/reduce, the table with the largest number of records per key should be assigned the last-occurring field group label; in Pig, that table should be named last in the JOIN statement.

Note that there is no requirement we store relation A in memory in an array. We could, if there were too many records for one key in both sides of a join, write it to disk and then stream it through for every record in relation B. Storing it in RAM is much more convenient whenever possible.

For more on MapReduce algorithms, Data-Intensive Text Processing with MapReduce by Jimmy Lin and Chris Dyer, is an excellent read and helped a great deal in crafting this example.

Pattern in Use
  • Exercise — Explore the correspondence of weight, height and BMI to SLG using a medium-data tool such as R, Pandas or Excel. Spoiler alert: the stereotypes of the big fat slugger is quite true.

Handling Nulls and Non-matches in Joins and Groups

It’s important to understand how Null keys are handled in Join and Group operations. Briefly:

  • In map-reduce, Nulls are respected as keys:

  • In a single-table Pig GROUP, Nulls are also respected as keys.

  • In a multi-table COGROUP, Nulls are respected as keys, but not grouped together

  • In a JOIN operation, rows with Nulls do not take place in the join at all, but are processed anyway

  • If you have a lot of Null keys, watch out: it is somewhere between costly and foolish.

When we say 'null key', we mean that if the group or join key is a scalar expression, that it has a null result; and if the key is a tuple, that all elements of the tuple are null. So

  • these are null keys: Null, (Null,Null,Null), ("hi",Null,"howareyou") (even one non-null field)

  • these are not: "" (empty string), 0 (zero); An empty bag {} and a bag with a tuple holding null {()} are both not-null, but a bag cannot be used as a join or group key.

In the base Hadoop infrastructure, there’s not much to understand: a key is a key, and Hadoop doesn’t treat nulls specially in any way. Anything different is up to your program, and Pig does in fact supply something different.

A single-table GROUP statement does treat Nulls as keys. It’s pretty easy to come up with a table having many Null values for the key you’re grouping on; and if you do, all of them will be sent to the same reducer. If you actually need those keys, well, whaddayagonnado: sounds like one of the reducers will have to endure a bad day at work. But if you don’t need the groups having Null keys, get rid of them as early as possible.

A COGROUP statement with multiple tables also treats Nulls as keys (so get rid of them if unwanted). But take note! Multi-table groups treat each table’s Nulls as distinct. That is, if table A had 4 records with null keys, and table B had 2 records with null keys, COGROUP A by key, B by key would produce

  • a row whose three fields are the null key; a bag holding the four associated records from A, and an empty bag; and

  • a row whose three fields are the null key; an empty bag; and a bag holding the two associated records from B.

What do you do if you want null keys treated like any other tuple? Add an indicator field saying whether the value is null, and coalesce the actual key to non-null value. So instead of JOIN aa BY has_nulls, bb BY has_nulls, write

Join on NULL Fields
JOIN
  aa BY ( (has_nulls IS NULL ? 'x' : 'Y'), (has_nulls IS NULL ? -999 : has_nulls) ),
  bb BY ( (has_nulls IS NULL ? 'x' : 'Y'), (has_nulls IS NULL ? -999 : has_nulls) );

Even if there are records whose value is -999, they will have 'Y' for the indicator, while the null-keyed records will have 'x', and so they will not meet up. (For your sanity, if it’s possible to choose a replacement value that can’t occur in the data set do so). The file j-important_notes_about_joins.pig in the sample code repo has a bunch more demonstrations of edge cases in groups and joins.

Pattern in Use: Inner Join
  • Where You’ll Use It  — Any time you need to match records among tables. Re-attaching metadata about a record to the record. Combining incidences of defective products with the manufacturing devices that made them.

  • Standard Snippet  — JOIN aa BY key, bb BY key;

  • Hello, SQL Users  — The only join that Hadoop admits is the "equi-join" — equality of values. Much more on this to follow.

  • Important to Know

    • List the tables in the statement from smallest to largest (largest table last)

    • You can do a multi-way join; see the documentation

    • The key does not appear in the output

    • :: is for disambiguation, . is for projecting tuples in a bag. JOIN doesn’t create new bags, so :: is probably what you want.

  • Output Count  — For each key that matches, the number of pairings among keys. This can be anywhere from much smaller to explosively bigger.

  • Records  — Schema of the result is the schema from each table stapled end-to-end. Values are unchanged from their input.

  • Data Flow  — Pipelinable: it’s composed onto the end of the preceding map or reduce, and if it stands alone becomes a map-only job.

  • See Also

    • DataFu’s bag left outer join;

    • Left outer join on three tables: http://datafu.incubator.apache.org/docs/datafu/guide/more-tips-and-tricks.html

    • Time-series chapter: Range query using cross

    • Time-series chapter: Range query using prefix and UDFs (the ip-to-geo example)

    • Time-series chapter: Self-join for successive row differences

    • Advanced Pig: Sparse joins for filtering, with a HashMap (replicated)

    • The internet, for information on Bitmap index or Bloom filter joins

Enumerating a Many-to-Many Relationship

In the previous examples there has been a direct pairing of each line in the main table with the unique line from the other table that decorates it. Therefore, there output had exactly the same number of rows as the larger input table. When there are multiple records per key, however, the the output will have one row for each pairing of records from each table. A key with two records from the left table and 3 records from the right table yields six output records.

Using the GROUP ALL trick we learned last chapter, we can count the total records before and after a many-to-many JOIN:

Many-to-Many Join (ch_07/many_to_many.pig)
-- Count the number of bat_seasons records
total_bat_seasons = FOREACH (GROUP bat_seasons ALL) GENERATE
    'bat_seasons' AS label,
    COUNT_STAR(bat_seasons) AS total;

-- Count the number of park_team_years
total_park_team_years = FOREACH (GROUP park_team_years ALL) GENERATE
    'park_team_years' AS label,
    COUNT_STAR(park_team_years) AS total;

-- Always trim the fields we don't need
player_team_years = FOREACH bat_seasons GENERATE year_id, team_id, player_id;
park_team_years   = FOREACH park_team_years GENERATE year_id, team_id, park_id;

player_stadia = FOREACH (JOIN
    player_team_years BY (year_id, team_id),
    park_team_years   BY (year_id, team_id)
    ) GENERATE
        player_team_years::year_id AS year_id,
        player_team_years::team_id AS team_id,
        player_id,
        park_id;
total_player_stadia = FOREACH (GROUP player_stadia ALL) GENERATE
    'player_stadium' AS label,
    COUNT_STAR(player_stadia) AS total;

-- Finally, UNION our label/totals and dump them together
answer = UNION total_bat_seasons, total_park_team_years, total_player_stadia; DUMP @;

Which results in:

(park_team_years,2911)
(bat_seasons,77939)
(player_stadio,80565)

You’ll see that the 77939 batting_seasons became 80565 home stadium-player pairings. The cross-product behavior didn’t cause a big explosion in counts — as opposed to our next example, which will generate much more data.

Joining a Table with Itself (self-join)

Joining a table with itself is very common when you are analyzing relationships of elements within the table (when analyzing graphs or working with datasets represented as attribute-value lists it becomes predominant.) Our example here will be to identify all teammates pairs: players listed as having played for the same team in the same year. The only annoying part about doing a self-join in Pig is that you can’t, at least not directly. Pig won’t let you list the same table in multiple slots of a JOIN statement, and also won’t let you just write something like "mytable_dup = mytable;" to assign a new alias [3]. Instead you have to use a FOREACH to create a duplicate representative. If you don’t have any other excuse, use a project-star expression: p2 = FOREACH p1 GENERATE *;. In this case, we already need to do a projection; we feel the most readable choice is to repeat the statement twice.

-- Pig disallows self-joins so this won't work:
wont_work = JOIN bat_seasons BY (team_id, year_id), bat_seasons BY (team_id, year_id);

"ERROR ... Pig does not accept same alias as input for JOIN operation : bat_seasons"

That’s OK, we didn’t want all those stupid fields anyway; we’ll just make two copies and then join the table copies to find all teammate pairs. We’re going to say a player isn’t their their own teammate, and so we also reject the self-pairs.

p1 = FOREACH bat_seasons GENERATE player_id, team_id, year_id;
p2 = FOREACH bat_seasons GENERATE player_id, team_id, year_id;

teammate_pairs = FOREACH (JOIN
    p1 BY (team_id, year_id),
    p2 by (team_id, year_id)
  ) GENERATE
    p1::player_id AS pl1,
    p2::player_id AS pl2;

teammate_pairs = FILTER teammate_pairs BY (pl1 != pl2);

Lets get to know our data a little better, before we proceed. How big is a baseball team, anyway?

-- Get the total players per team per year
players_per_team = FOREACH (
    GROUP bat_seasons BY (team_id, year_id))
    GENERATE
        FLATTEN(group) AS (team_id, year_id),
        COUNT_STAR(bat_seasons) AS total_players;

-- Then get the average of that total
avg_players = FOREACH (GROUP players_per_team ALL) GENERATE
    ROUND(AVG(players_per_team.total_players)) AS avg_players;

DUMP @;

(29)

As opposed to the slight many-to-many expansion of the previous section, there are on average about 29 players per roster to be paired.

-- Finally: how big is our join?
total_teammate_pairs = FOREACH (group teammate_pairs ALL) GENERATE
	COUNT_STAR(teammate_pairs) AS total;
DUMP @;

(2292658)

The result set here is explosively larger: 2,292,658 pairings from the original 77,939 player seasons, an expansion of almost 30x. You might have reasonably expected the expansion factor to be very close to the average number of players per team, thinking "29 average players per team, so 29 times as many pairings as players." But a join creates as many rows as the product of the records in each tables' bag — the square of the roster size in this case — and the sum of the squares necessarily exceeds the direct sum.

Our bat_seasons table ignores mid-season trades and only lists a single team the player played the most games for, so in infrequent cases this will identify some teammate pairs that didn’t actually overlap. There’s no simple option that lets you join on players' intervals of service on a team: joins must be based on testing key equality, and we would need an "overlaps" test. In the time-series chapter you’ll meet tools for handling such cases, but it’s a big jump in complexity for a small number of renegades. You’d be better off handling it by first listing every stint on a team for each player in a season, with separate fields for the year and for the start/end dates. Doing the self-join on the season (just as we have here) would then give you every possible teammate pair, with some fraction of false pairings. Lastly, use a FILTER to reject the cases where they don’t overlap. Any time you’re looking at a situation where 5% of records are causing 150% of complexity, look to see whether this approach of "handle the regular case, then fix up the edge cases" can apply.

Its worth noting that the equivalent SQL would be:

SELECT DISTINCT b1.player_id, b2.player_id
	FROM bat_season b1, bat_season b2
	WHERE b1.team_id = b2.team_id          -- same team
		AND b1.year_id = b2.year_id          -- same season
		AND b1.player_id != b2.player_id     -- reject self-teammates
	GROUP BY b1.player_id
;

Joining Records Without Discarding Non-Matches (Outer Join)

The Baseball Hall of Fame is meant to honor the very best in the game, and each year a very small number of players are added to its rolls. It’s a significantly subjective indicator, which is its cardinal virtue and its cardinal flaw — it represents the consensus judgement of experts, but colored to some small extent by emotion, nostalgia, and imperfect quantitative measures. But as you’ll see over and over again, the best basis for decisions is the judgement of human experts backed by data-driven analysis. What we’re assembling as we go along this tour of analytic patterns isn’t a mathematical answer to who the highest performers are, it’s a basis for centering discussion around the right mixture of objective measures based on evidence and human judgement where the data is imperfect.

So we’d like to augment the career stats table we assembled earlier with columns showing, for hall-of-famers, the year they were admitted, and a Null value for the rest. (This allows that column to also serve as a boolean indicator of whether the players were inducted). If you tried to use the JOIN operator in the form we have been, you’ll find that it doesn’t work. A plain JOIN operation keeps only rows that have a match in all tables, and so all of the non-hall-of-famers will be excluded from the result. (This differs from COGROUP, which retains rows even when some of its inputs lack a match for a key). The answer is to use an 'outer join'

career_stats = FOREACH (JOIN
    bat_careers BY player_id LEFT OUTER,
    hof_bat BY player_id) GENERATE
        bat_careers::player_id,
        bat_careers::n_seasons,
        hof_bat::year_inducted AS hof_year;

DUMP @;

Since the batting_hof table has exactly one row per player, the output has exactly as many rows as the career stats table, and exactly as many non-null rows as the hall of fame table.

[4]

...
(foxja01,1,)
(foxja02,4,)
(foxjo01,4,)
(foxne01,19,1997)
...

Lets look at another example: lets JOIN ball park/team locations and generic geographic data from GeoNames.

LEFT OUTER JOIN (ch_07/park_locations.pig)
geonames = FILTER geonames BY feature_code == 'STDM';

parks_geonames = JOIN parks BY (park_name, state, country) LEFT OUTER, geonames BY (name, admin1_code, country_code);

DUMP @;

Which gets us some records with matched place names, and some without:

(STP01,Tropicana Field,1998-03-31,2013-09-23,1,1286,-82.65,27.77,St. Petersburg,FL,US,4175752,Tropicana Field,Tropicana Field,Tropikana-fild,teulopikana pildeu,Тропикана-филд,トロピカーナ・フィールド,트로피카나 필드,27.76781,-82.6526,S,STDM,US,,FL,103,,,0,8,27,America/New_York,2013-01-09)
(CHI02,23rd Street Park,1872-05-29,1877-10-06,0,129,-87.63,41.85,Chicago,IL,US,,,,,,,,,,,,,,,,,,,)
(KAN02,Association Park,1886-04-30,1888-09-29,0,114,-94.56,39.11,Kansas City,MO,US,,,,,,,,,,,,,,,,,,,)
(CLE04,Brotherhood Park,1890-04-30,1890-10-04,0,62,-81.65,41.48,Cleveland,OH,US,,,,,,,,,,,,,,,,,,,)
(STL09,Busch Stadium II,1966-05-12,2005-10-02,0,3174,-90.19,38.62,St. Louis,MO,US,,,,,,,,,,,,,,,,,,,)
(SFO02,Candlestick Park,1960-04-12,1999-09-30,0,3173,-122.39,37.71,San Francisco,CA,US,7521373,Candlestick Park,Candlestick Park,kaendeulseutig pakeu,kaindalastika parka,ملعب كانديلستيك بارك,कैन्डलस्टिक पार्क,キャンドルスティック・パーク,캔들스틱 파크,37.7135,-122.38443,S,STDM,US,,CA,075,,,0,,4,America/Los_Angeles,2010-08-16)

In this example, there will be some parks that have no direct match to location names and, of course, there will be many, many places that do not match a park. The first two JOINs we did were "inner" JOINs — the output contains only rows that found a match. In this case, we want to keep all the parks, even if no places matched but we do not want to keep any places that lack a park. Since all rows from the left (first most dataset) will be retained, this is called a "left outer" JOIN. If, instead, we were trying to annotate all places with such parks as could be matched — producing exactly one output row per place — we would use a "right outer" JOIN instead. If we wanted to do the latter but (somewhat inefficiently) flag parks that failed to find a match, you would use a "full outer" JOIN. (Full JOINs are pretty rare.)

In a Pig JOIN it is important to order the tables by size — putting the smallest table first and the largest table last. (You’ll learn why in the "Map/Reduce Patterns" (REF) chapter.) So while a right join is not terribly common in traditional SQL, it’s quite valuable in Pig. If you look back at the previous examples, you will see we took care to always put the smaller table first. For small tables or tables of similar size, it is not a big deal — but in some cases, it can have a huge impact, so get in the habit of always following this best practice.

Note
A Pig join is outwardly similar to the join portion of a SQL SELECT statement, but notice that although you can place simple expressions in the join expression, you can make no further manipulations to the data whatsoever in that statement. Pig’s design philosophy is that each statement corresponds to a specific data transformation, making it very easy to reason about how the script will run; this makes the typical Pig script more long-winded than corresponding SQL statements but clearer for both human and robot to understand.
Pattern in Use
  • Where You’ll Use It  — Any time only some records have matches but you want to preserve the whole. All products from the manufacturing line paired with each incident report about a product (keeping products with no incident report). All customers that took a test drive matched with the past cars they bought from you (but not discarding the new customer records)

  • Standard Snippet  — FOREACH (JOIN aa BY key LEFT OUTER, bb BY key) GENERATE a::key..a::last_field,b::second_field…​;

  • Hello, SQL Users  — Right joins are much more common in Pig, because you want the table size to determine the order they’re listed in

  • Important to Know  — Records with NULL keys are dropped even in an outer join

  • Output Count  — At least as many records as the OUTER table has, expanded by the number of ways to pair records from each table for a key. Like any join, output size can be explosively higher

  • Data Flow  — Pipelinable: it’s composed onto the end of the preceding map or reduce, and if it stands alone becomes a map-only job.

Joining Tables that do not have a Foreign-Key Relationship

With the exception of the last one, all of the joins we’ve done so far have been on nice clean values designed in advance to match records among tables. In SQL parlance, the career_stats and batting_hof tables both had player_id as a primary key (a column of unique, non-null values tied to each record’s identity). The team_id field in the bat_seasons and park_team_years tables points into the teams table as a foreign key: an indexable column whose only values are primary keys in another table, and which may have nulls or duplicates. But sometimes you must match records among tables that do not have a polished mapping of values. In that case, it can be useful to use an outer join as the first pass to unify what records you can before you bring out the brass knuckles or big guns for what remains.

Suppose we wanted to plot where each major-league player grew up — perhaps as an answer in itself as a browsable map, or to allocate territories for talent scouts, or to see whether the quiet wide spaces of country living or the fast competition of growing up in the city better fosters the future career of a high performer. While the people table lists the city, state and country of birth for most players, we must geolocate those place names — determine their longitude and latitude — in order to plot or analyze them.

There are geolocation services on the web, but they are imperfect, rate-limited and costly for commercial use [5]. Meanwhile the freely-available geonames database gives geo-coordinates and other information on more than seven million points of interest across the globe, so for informal work it can make a lot of sense to opportunistically decorate whatever records match and then decide what to do with the rest.

Geolocation JOIN without Foreign-Key Relationship (ch_07/people_locations.pig)
-- Filter to only populated places in the US, see http://www.geonames.org/export/codes.html
geonames = FILTER geonames BY feature_code matches 'PPL.*' AND country_code == 'US';
geonames = FOREACH geonames GENERATE geonameid, latitude, longitude, name, admin1_code;

-- Trim extra fields from players, and limit to those born in the USA
players = FILTER players BY birth_country == 'USA';
players = FOREACH players GENERATE player_id, name_first, name_last, birth_city, birth_state, birth_country;

-- Now make our 'approximate' JOIN
geolocated_somewhat = JOIN LEFT OUTER
    players BY (birth_city, birth_state),
    geonames BY (name, admin1_code)
;

DESCRIBE geolocated_somewhat;

/*
geolocated_somewhat: {
    players::player_id: chararray,
    players::name_first: chararray,
    players::name_last: chararray,
    players::birth_city: chararray,
    players::birth_state: chararray,
    players::birth_country: chararray,
    geonames::geonameid: chararray,
    geonames::latitude: float,
    geonames::longitude: float,
    geonames::name: chararray,
    geonames::admin1_code: chararray}
*/

geolocated_trimmed = FOREACH geolocated_somewhat GENERATE player_id, name_first, name_last, latitude, longitude;

DUMP @;

Lets take a look at a metric behind the JOIN:

total = FOREACH (GROUP geolocated_trimmed ALL) GENERATE 'total' AS label, COUNT_STAR(geolocated_trimmed) AS total;

with_lat = FILTER geolocated_trimmed BY latitude IS NOT NULL;
with_lat_total = FOREACH (GROUP with_lat ALL) GENERATE 'with_lat' AS label, COUNT_STAR(with_lat) AS total;

without_lat = FILTER geolocated_trimmed BY latitude IS NULL;
without_lat_total = FOREACH (GROUP without_lat ALL) GENERATE 'without_lat' AS label, COUNT_STAR(without_lat) AS total;

report = UNION total, with_lat_total, without_lat_total;

DUMP @;

In the important sense, this JOIN worked quite well: 76.7% of records found a match:

(without_lat,3893)
(with_lat,12868)
(total,16761)

Experienced database hands might now suggest doing a join using some sort of fuzzy-match or some sort of other fuzzy equality. However, in map-reduce the only kind of join you can do is an "equi-join" — one that uses key equality to match records. Unless an operation is 'transitive' — that is, unless a joinsto b and b joinsto c guarantees a joinsto c, a plain join won’t work, which rules out approximate string matches; joins on range criteria (where keys are related through inequalities (x < y)); graph distance; geographic nearness; and edit distance. You also can’t use a plain join on an 'OR' condition: "match stadiums and places if the placename and state are equal or the city and state are equal", "match records if the postal code from table A matches any of the component zip codes of place B". Much of the latter part of this book centers on what to do when there is a clear way to group related records in context, but which is more complicated than key equality.

Pattern in Use
  • Where You’ll Use It  — Any time you’re geolocating records, sure, but the lessons here hold any time you’re combining messy data with canonical records

  • Hello, SQL Users  — No fuzzy matches, no string distance, no inequalities. There’s no built-in SOUNDEX UDF, but that would be legal as it produces a scalar value to test with equality

  • Important to Know  — Watch out for an embarrassment of riches — there are many towns named "Springfield".

Joining on an Integer Table to Fill Holes in a List

In some cases you want to ensure that there is an output row for each potential value of a key. For example, a histogram of career hits will show that Pete Rose (4256 hits) and Ty Cobb (4189 hits) have so many more hits than the third-most player (Hank Aaron, 3771 hits) there are gaps in the output bins.

To fill the gaps, generate a list of all the potential keys, then generate your (possibly hole-y) result table, and do a join of the keys list (LEFT OUTER) with results. In some cases, this requires one job to enumerate the keys and a separate job to calculate the results. For our purposes here, we can simply use the integer table. (We told you it was surprisingly useful!)

If we prepare a histogram of career hits, similar to the one above for seasons, you’ll find that Pete Rose (4256 hits) and Ty Cobb (4189 hits) have so many more hits than the third-most player (Hank Aaron, 3771 hits) there are gaps in the output bins. To make it so that every bin has an entry, do an outer join on the integer table.

-- SQL Equivalent:
SET @H_binsize = 10;
SELECT bin, H, IFNULL(n_H,0)
  FROM      (SELECT @H_binsize * idx AS bin FROM numbers WHERE idx <= 430) nums
  LEFT JOIN (SELECT @H_binsize*CEIL(H/@H_binsize) AS H, COUNT(*) AS n_H
    FROM bat_career bat GROUP BY H) hist
  ON hist.H = nums.bin
  ORDER BY bin DESC
;

Regular old histogram of career hits, bin size 100:

Hits Histogram without Gaps Filled In (ch_07/filled_histogram.pig)
player_hits = FOREACH (GROUP bat_seasons BY player_id) GENERATE
    100 * ROUND(SUM(bat_seasons.H)/100.0) AS bin;
histogram = FOREACH (GROUP player_hits BY bin) GENERATE
    group AS bin,
    COUNT_STAR(player_hits) AS total;

Generate a list of all the bins we want to keep, then perform a LEFT JOIN of bins with histogram counts. Missing rows will have a null ct value, which we can convert to zero.

Hits Histogram With Gaps Filled In (ch_07/filled_histogram.pig)
-- Numbers, from 0 to 9999
numbers = LOAD '/data/gold/numbers10k.txt' AS (number:int);

-- Get a count of hits per player, across all player seasons
player_hits = FOREACH (GROUP bat_seasons BY player_id) GENERATE
    100 * ROUND(SUM(bat_seasons.H)/100.0) AS bin;

-- Get the maximum player hits bin to filter the numbers relation
max_hits = FOREACH (GROUP player_hits ALL) GENERATE MAX(player_hits.bin) AS max_bin;

-- Count the number of occurrences for each bin
histogram = FOREACH (GROUP player_hits BY bin) GENERATE
    group AS bin,
    COUNT_STAR(player_hits) AS total;

-- Calculate the complete set of histogram bins up to our limit
histogram_bins = FOREACH (FILTER numbers BY 100 * number <= max_hits.max_bin) GENERATE
	  100 * number AS bin;

-- Finally, join the histogram bins with the histogram data to get our gap-less histogram
filled_histogram = FOREACH (JOIN histogram_bins BY bin LEFT OUTER, histogram BY bin) GENERATE
    histogram_bins::bin,
    (total IS NULL ? 0 : total)
;

DUMP @;

You can see Pete Rose, Ty Cobb and Hank Aaron:

...
(3800,1)
(3900,0)
(4000,0)
(4100,0)
(4200,1)
(4300,1)
Pattern in Use
  • Where You’ll Use It  — Whenever you know the values you want (whether they’re integers, model numbers, dates, etc) and always want a corresponding row in the output table

Selecting Only Records That Lack a Match in Another Table (anti-join)

In the case of an anti-join (known as an 'anti-join'), we want to remove records from one table that do have a match in the other table. We can achieve this with an OUTER JOIN followed by a FILTER on those records lacking the 'required' fields from the join.

-- Always trim fields we don't need
all_stars_p  = FOREACH all_stars GENERATE player_id, year_id;

-- An outer join of the two will leave both matches and non-matches.
scrub_seasons_join = JOIN
    bat_seasons BY (player_id, year_id) LEFT OUTER,
    all_stars_p BY (player_id, year_id);

-- ...and the non-matches will have Nulls in all the all-stars slots
anti_join = FILTER scrub_seasons_join
    BY all_stars_p::player_id IS NULL;

Once the matches have been eliminated, pick off the first table’s fields. The double-colon in 'all_stars_p::' makes clear which table’s field we mean.

Selecting Only Records That Posess a Match in Another Table (semi-join)

A semi-join is the counterpart to an anti-join: you want to find records that do have a match in another table, but not keep the fields from that table around.

Let’s use the same example — player seasons where they made the all-star team — but only look for seasons that were all-stars. You might think you could do this with a join:

-- Don't do this... produces duplicates!
bats_g  = JOIN all_stars BY (player_id, year_id), bat_seasons BY (player_id, year_id);
badness = FOREACH bats_g GENERATE bat_seasons::player_id .. bat_seasons::HR;

The result is wrong, and even a diligent spot-check will probably fail to notice. You see, from 1959-1962 there were multiple All-Star games (!), and so players who appeared in both have two rows in the All-Star table. In turn, each singular row in the bat_season table became two rows in the result for players in those years. We’ve broken the contract of leaving the original table unchanged.

This is the biggest thing people coming from a SQL background need to change about their thinking. In SQL, the JOIN rules all. In Pig, GROUP and COGROUP rule the land, and nearly every other structural operation is some piece of syntactic sugar on top of those. So when going gets rough with a JOIN, remember that it’s just a convenience and ask yourself whether a COGROUP would work better. In this case it does:

-- Players with no entry in the allstars_p table have an empty allstars_p bag
allstar_seasons_cg = COGROUP
    bat_seasons BY (player_id, year_id),
    allstars_p  BY (player_id, year_id);

Now select all cogrouped rows where there was an all-star record, and project just the records from the original table.

-- One row in the batting table => One row in the result
all_star_seasons = FOREACH
    (FILTER all_star_seasons_cg BY (COUNT_STAR(all_stars_p) > 0L))
    GENERATE FLATTEN(bat_seasons);

The JOIN version was equivent to flattening both bags (GENERATE FLATTEN(bat_seasons), FLATTEN(allstars_p)) and then removing the fields we had just flattened. In the COGROUP version, neither the incorrect duplicate rows nor the unnecessary columns are created.

An Alternative to Anti-Join: use a COGROUP

As a lesson on the virtues of JOINs and COGROUPs, let’s examine an alternate version of the anti-join introduced above (REF).

-- Players with no entry in the allstars_p table have an empty allstars_p bag
bats_ast_cg = COGROUP
    bat_seasons BY (player_id, year_id),
    all_stars_p BY (player_id, year_id);

Select all cogrouped rows where there were no all-star records, and project the batting table fields.

anti_join = FOREACH
    (FILTER bats_ast_cg BY (COUNT_STAR(all_stars_p) == 0L))
    GENERATE FLATTEN(bat_seasons);

There are three opportunities for optimization here. Though these tables are far to small to warrant optimization, it’s a good teachable moment for when to (not) optimize.

  • You’ll notice that we projected off the extraneous fields from the allstars table before the map. Pig is sometimes smart enough to eliminate fields we don’t need early. There’s two ways to see if it did so. The surest way is to consult the tree that EXPLAIN produces. If you make the program use all_stars and not all_stars_p, you’ll see that the extra fields are present. The other way is to look at how much data comes to the reducer with and without the projection. If there is less data using all_stars_p than all_stars, the explicit projection is required.

  • The EXPLAIN output also shows that co-group version has a simpler map-reduce plan, raising the question of whether it’s more performant.

  • Usually we put the smaller table (all_stars) on the right in a join or cogroup. However, although the allstars table is smaller, it has larger cardinality (barely): (player_id, team_id) is a primary key for the bat_seasons table. So the order is likely to be irrelevant.

But "more performant" or "possibly more performant" doesn’t mean "use it instead".

Eliminating extra fields is almost always worth it, but the explicit projection means extra lines of code and it means an extra alias for the reader to understand. On the other hand, the explicit projection reassures the experienced reader that the projection is for-sure-no-doubt-about-it taking place. That’s actually why we chose to be explicit here: we find that the more-complicated script gives the reader less to think about.

In contrast, any SQL user will immediately recognize the join formulation of this as an anti-join. Introducing a RIGHT OUTER join or choosing the cogroup version disrupts that familiarity. Choose the version you find most readable, and then find out if you care whether it’s more performant; the simpler explain graph or the smaller left-hand join table do not necessarily imply a faster dataflow. For this particular shape of data, even at much larger scale we’d be surprised to learn that either of the latter two optimizations mattered.

Wrapping Up

In this chapter we’ve learned how to JOIN and COGROUP. These operations allow us to bring additional data-sources into our analysis. The INNER JOIN lets us combine records from multiple datasets with common join keys. The OUTER JOIN or semi-join decorates one relation with matches from another relation. An anti-join filters records with a match in another table, leaving only those that don’t match. The self-join showed us how to explore groupings within a given relation. Taken together, these techniques constitute a powerful toolset for working with one or more sources of data.

Joins are a fundamental data operation, and by adding them to our toolkit we’ve broadened our capabilities dramatically. Now we can look at hitting records in combination with player records, or ball park records in combination with city data. We’ve seen how to join relational data that is meant to be joined - and also how to join disparate data sources to bring in related data from the web. We’ve even learned how to join data to itself, to analyze pairs within groups.

In the next chapter we’ll learn about sorting, or ordering data. We’ll be able to manipulate records individually with map-only patterns, group and join data using map/reduce, and to sort data within and without groups. Our data vocabulary is nearly complete!


1. Hence why you may see Hadoop joins on data scientist tech interviews.
2. The dataset we’re using unfortunately only records players' weights at the start of their career, so you will see different values listed for Mr. Fielder and Mr. Ruth.
3. If it didn’t cause such a surprisingly hairy set of internal complications, it would have long ago been fixed
4. Please note that the hof_bat table excludes players admitted to the Hall of Fame based on their pitching record. With the exception of Babe Ruth — who would likely have made the Hall of Fame as a pitcher if he hadn’t been the most dominant hitter of all time — most pitchers have very poor offensive skills and so are relegated back with the rest of the crowd
5. Put another way, "Accurate, cheap, fast: choose any two