'How to apply a pandas geocode function to Pyspark column
Table is like this
id | ADDRESS |
---|---|
0 | 6101 SUMMITVIEW AVE STE 200 YAKIMA |
1 | 527 CEDAR WAY SUITE 105 OAKMONT |
2 | 1700 N ROSE AVE SUITE 460 OXNARD |
3 | 1275 YORK AVE NEW YORK |
4 | 2300 MANCHESTER EXPY A SUITE 101 A COLUMBUS |
5 | 401 N MICHIGAN AVE CHICAGO |
6 | 111 GROSSMAN DR INTERNAL MEDICINE BRAINTREE |
7 | 1850 N CENTRAL AVE STE 1600 PHOENIX |
8 | 47 NEW SCOTLAND AVENUE ALBANY MEDICAL CENTER A... |
9 | 201 N VINE ST EL DORADO |
10 | 4420 LAKE BOONE TRL RALEIGH |
11 | 2727 W HOLCOMBE BLVD HOUSTON |
12 | 850 PETER BRYCE BLVD TUSCALOOSA |
13 | 1803 WEHRLI RD NAPERVILLE |
14 | 4321 N MACDILL AVE STE 203 TAMPA |
15 | 111 CONTINENTAL DR SUITE 412 NEWARK |
16 | 1834 E INNOVATION PARK DR ORO VALLEY |
17 | 880 KEMPSVILLE RD SUITE 2200 NORFOLK |
18 | 701 PRINCETON AVE SW BIRMINGHAM |
19 | 4729 COUNTY ROAD 101 MINNETONKA |
import pandas as pd
import geopandas as gpd
import geopy
from geopy.geocoders import Nominatim
from geopy.extra.rate_limiter import RateLimiter
import matplotlib.pyplot as plt
import folium
from folium.plugins import FastMarkerCluster
locator = Nominatim(user_agent="myGeocoder")
from geopy.extra.rate_limiter import RateLimiter
geocode = RateLimiter(locator.geocode,min_delay_seconds=0.0, error_wait_seconds=1.0, swallow_exceptions=True, return_value_on_exception=None)
apprix_1_na['location'] = apprix_1_na['ADDRESS'].apply(geocode)
apprix_1_na['point'] = apprix_1_na['location'].apply(lambda loc: tuple(loc.point) if loc enter code hereelse None)
I want this code to work in Pyspark for longitude and latitude
Solution 1:[1]
I'll show a "complex" example with GoogleV3 API. It is easy suitable to your case
from geopy.geocoders import GoogleV3
from pyspark.sql.functions import col, udf
from pyspark.sql.types import FloatType, ArrayType
df = spark.createDataFrame([("123 Fake St, Springfield, 12345, USA",),("1000 N West Street, Suite 1200 Wilmington, DE 19801, USA",)], ["address"])
df.display()
address |
---|
123 Fake St, Springfield, 12345, USA |
1000 N West Street, Suite 1200 Wilmington, DE 19801, USA |
@udf(returnType=ArrayType(FloatType()))
def geoloc(address):
api = 'your_api_key_here'
geolocator = GoogleV3(api)
#get lat_long
return geolocator.geocode(address)[1]
#find coord
df = df.withColumn('geocode', geoloc(col('address')))
#separate tuple
df = df.withColumn("latitude", col('geocode').getItem(0))\
.withColumn("longitude", col('geocode').getItem(1))
df.display()
address | geocode | latitude | longitude |
---|---|---|---|
123 Fake St, Springfield, 12345, USA | [44.046238, -123.022026] | 44.046238 | -123.022026 |
1000 N West Street, Suite 1200 Wilmington, DE 19801, USA | [39.74717, -75.54999] | 39.74717 | -75.54999 |
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|---|
Solution 1 | Gabriel Migliorini |