The disk.frame
package aims to be the answer to the question: how do I manipulate structured tabular data that doesn’t fit into Random Access Memory (RAM)?
In a nutshell, disk.frame
makes use of two simple ideas
disk.frame
performs a similar role to distributed systems such as Apache Spark, Python’s Dask, and Julia’s JuliaDB.jl for medium data which are datasets that are too large for RAM but not quite large enough to qualify as big data.
In this tutorial, we introduce disk.frame
, address some common questions, and replicate the sparklyr data manipulation tutorial using disk.frame
constructs.
Simply run
or
disk.frame
disk.frame
works best if it can process multiple data chunks in parallel. The best way to set-up disk.frame
so that each CPU core runs a background worker is by using
setup_disk.frame()
# this will allow unlimited amount of data to be passed from worker to worker
options(future.globals.maxSize = Inf)
The setup_disk.frame()
sets up background workers equal to the number of CPU cores; please note that, by default, hyper-threaded cores are counted as one not two.
Alternatively, one may specify the number of workers using setup_disk.frame(workers = n)
.
disk.frame
The disk.frame
package provides convenient functions to convert data.frame
s and CSVs to disk.frame
s.
disk.frame
from data.frame
We convert a data.frame
to disk.frame
using the as.data.frame
function.
library(nycflights13)
library(dplyr)
library(disk.frame)
library(data.table)
# convert the flights data to a disk.frame and store the disk.frame in the folder
# "tmp_flights" and overwrite any content if needed
flights.df <- as.disk.frame(
flights,
outdir = file.path(tempdir(), "tmp_flights.df"),
overwrite = TRUE)
flights.df
#> path: "C:\Users\RTX2080\AppData\Local\Temp\Rtmpq8cAyl/tmp_flights.df"
#> nchunks: 6
#> nrow (at source): 336776
#> ncol (at source): 19
#> nrow (post operations): ???
#> ncol (post operations): ???
You should now see a folder called tmp_flights
with some files in it, namely 1.fst
, 2.fst
…. where each fst
files is one chunk of the disk.frame
.
disk.frame
from CSVlibrary(nycflights13)
# write a csv
csv_path = file.path(tempdir(), "tmp_flights.csv")
data.table::fwrite(flights, csv_path)
# load the csv into a disk.frame
df_path = file.path(tempdir(), "tmp_flights.df")
flights.df <- csv_to_disk.frame(
csv_path,
outdir = df_path,
overwrite = T)
flights.df
#> path: "C:\Users\RTX2080\AppData\Local\Temp\Rtmpq8cAyl/tmp_flights.df"
#> nchunks: 6
#> nrow (at source): 336776
#> ncol (at source): 19
#> nrow (post operations): ???
#> ncol (post operations): ???
If the CSV is too large to read in, then we can also use the in_chunk_size
option to control how many rows to read in at once. For example to read in the data 100,000 rows at a time.
library(nycflights13)
library(disk.frame)
# write a csv
csv_path = file.path(tempdir(), "tmp_flights.csv")
data.table::fwrite(flights, csv_path)
df_path = file.path(tempdir(), "tmp_flights.df")
flights.df <- csv_to_disk.frame(
csv_path,
outdir = df_path,
in_chunk_size = 100000)
#> -----------------------------------------------------
#> Stage 1 of 2: splitting the file C:\Users\RTX2080\AppData\Local\Temp\Rtmpq8cAyl/tmp_flights.csv into smallers files:
#> Destination: C:\Users\RTX2080\AppData\Local\Temp\Rtmpq8cAyl\file28b02c2bed7
#> -----------------------------------------------------
#> Stage 1 of 2 took: 0.150s elapsed (0.110s cpu)
#> -----------------------------------------------------
#> Stage 2 of 2: Converting the smaller files into disk.frame
#> -----------------------------------------------------
#> csv_to_disk.frame: Reading multiple input files.
#> Please use `colClasses = ` to set column types to minimize the chance of a failed read
#> =================================================
#>
#> -----------------------------------------------------
#> -- Converting CSVs to disk.frame -- Stage 1 of 2:
#>
#> Converting 4 CSVs to 6 disk.frames each consisting of 6 chunks
#>
#> -- Converting CSVs to disk.frame -- Stage 1 or 2 took: 1.250s elapsed (0.050s cpu)
#> -----------------------------------------------------
#>
#> -----------------------------------------------------
#> -- Converting CSVs to disk.frame -- Stage 2 of 2:
#>
#> Row-binding the 6 disk.frames together to form one large disk.frame:
#> Creating the disk.frame at C:\Users\RTX2080\AppData\Local\Temp\Rtmpq8cAyl/tmp_flights.df
#>
#> Appending disk.frames:
#> Stage 2 of 2 took: 0.330s elapsed (0.110s cpu)
#> -----------------------------------------------------
#> Stage 1 & 2 in total took: 1.580s elapsed (0.160s cpu)
#> Stage 2 of 2 took: 1.800s elapsed (0.160s cpu)
#> -----------------------------------------------------
#> Stage 2 & 2 took: 1.950s elapsed (0.270s cpu)
#> -----------------------------------------------------
flights.df
#> path: "C:\Users\RTX2080\AppData\Local\Temp\Rtmpq8cAyl/tmp_flights.df"
#> nchunks: 6
#> nrow (at source): 336776
#> ncol (at source): 19
#> nrow (post operations): ???
#> ncol (post operations): ???
disk.frame
also has a function zip_to_disk.frame
that can convert every CSV in a zip file to disk.frame
s.
dplyr
verbs and lazy evaluationflights.df1 <- select(flights.df, year:day, arr_delay, dep_delay)
flights.df1
#> path: "C:\Users\RTX2080\AppData\Local\Temp\Rtmpq8cAyl/tmp_flights.df"
#> nchunks: 6
#> nrow (at source): 336776
#> ncol (at source): 19
#> nrow (post operations): ???
#> ncol (post operations): ???
The class of flights.df1
is also a disk.frame
after the dplyr::select
transformation. Also, disk.frame
operations are by default (and where possible) lazy, meaning it doesn’t perform the operations right away. Instead, it waits until you call collect
. Exceptions to this rule are the *_join
operations which evaluated eagerly under certain conditions see Joins for disk.frame in-depth for details.
For lazily constructed disk.frame
s (e.g. flights.df1
). The function collect
can be used to bring the results from disk into R, e.g.
collect(flights.df1) %>% head
#> year month day arr_delay dep_delay
#> 1: 2013 1 1 11 2
#> 2: 2013 1 1 20 4
#> 3: 2013 1 1 33 2
#> 4: 2013 1 1 -18 -1
#> 5: 2013 1 1 -25 -6
#> 6: 2013 1 1 12 -4
Of course, for larger-than-RAM datasets, one wouldn’t call collect
on the whole disk.frame
(because why would you need disk.frame
otherwise). More likely, one would call collect
on a filter
ed dataset or one summarized with group_by
.
Some examples of other dplyr verbs applied:
filter(flights.df, dep_delay > 1000) %>% collect %>% head
#> year month day dep_time sched_dep_time dep_delay arr_time sched_arr_time
#> 1 2013 1 9 641 900 1301 1242 1530
#> 2 2013 1 10 1121 1635 1126 1239 1810
#> 3 2013 6 15 1432 1935 1137 1607 2120
#> 4 2013 7 22 845 1600 1005 1044 1815
#> 5 2013 9 20 1139 1845 1014 1457 2210
#> arr_delay carrier flight tailnum origin dest air_time distance hour
#> 1 1272 HA 51 N384HA JFK HNL 640 4983 9
#> 2 1109 MQ 3695 N517MQ EWR ORD 111 719 16
#> 3 1127 MQ 3535 N504MQ JFK CMH 74 483 19
#> 4 989 MQ 3075 N665MQ JFK CVG 96 589 16
#> 5 1007 AA 177 N338AA JFK SFO 354 2586 18
#> minute time_hour
#> 1 0 2013-01-09T14:00:00Z
#> 2 35 2013-01-10T21:00:00Z
#> 3 35 2013-06-15T23:00:00Z
#> 4 0 2013-07-22T20:00:00Z
#> 5 45 2013-09-20T22:00:00Z
mutate(flights.df, speed = distance / air_time * 60) %>% collect %>% head
#> year month day dep_time sched_dep_time dep_delay arr_time sched_arr_time
#> 1 2013 1 1 517 515 2 830 819
#> 2 2013 1 1 533 529 4 850 830
#> 3 2013 1 1 542 540 2 923 850
#> 4 2013 1 1 544 545 -1 1004 1022
#> 5 2013 1 1 554 600 -6 812 837
#> 6 2013 1 1 554 558 -4 740 728
#> arr_delay carrier flight tailnum origin dest air_time distance hour
#> 1 11 UA 1545 N14228 EWR IAH 227 1400 5
#> 2 20 UA 1714 N24211 LGA IAH 227 1416 5
#> 3 33 AA 1141 N619AA JFK MIA 160 1089 5
#> 4 -18 B6 725 N804JB JFK BQN 183 1576 5
#> 5 -25 DL 461 N668DN LGA ATL 116 762 6
#> 6 12 UA 1696 N39463 EWR ORD 150 719 5
#> minute time_hour speed
#> 1 15 2013-01-01T10:00:00Z 370.0441
#> 2 29 2013-01-01T10:00:00Z 374.2731
#> 3 40 2013-01-01T10:00:00Z 408.3750
#> 4 45 2013-01-01T10:00:00Z 516.7213
#> 5 0 2013-01-01T11:00:00Z 394.1379
#> 6 58 2013-01-01T10:00:00Z 287.6000
dplyr
verbsThe chunk_arrange
function arranges (sorts) each chunk but not the whole dataset. So use with caution. Similarly chunk_summarise
creates summary variables within each chunk and hence also needs to be used with caution. In the Group By section, we demonstrate how to use summarise
in the disk.frame
context correctly with hard_group_by
s.
# this only sorts within each chunk
chunk_arrange(flights.df, dplyr::desc(dep_delay)) %>% collect %>% head
#> year month day dep_time sched_dep_time dep_delay arr_time sched_arr_time
#> 1 2013 1 9 641 900 1301 1242 1530
#> 2 2013 1 10 1121 1635 1126 1239 1810
#> 3 2013 1 1 848 1835 853 1001 1950
#> 4 2013 5 19 713 1700 853 1007 1955
#> 5 2013 1 13 1809 810 599 2054 1042
#> 6 2013 5 16 2233 1340 533 59 1640
#> arr_delay carrier flight tailnum origin dest air_time distance hour
#> 1 1272 HA 51 N384HA JFK HNL 640 4983 9
#> 2 1109 MQ 3695 N517MQ EWR ORD 111 719 16
#> 3 851 MQ 3944 N942MQ JFK BWI 41 184 18
#> 4 852 AA 257 N3HEAA JFK LAS 323 2248 17
#> 5 612 DL 269 N322NB JFK ATL 116 760 8
#> 6 499 AA 753 N3ERAA LGA DFW 184 1389 13
#> minute time_hour
#> 1 0 2013-01-09T14:00:00Z
#> 2 35 2013-01-10T21:00:00Z
#> 3 35 2013-01-01T23:00:00Z
#> 4 0 2013-05-19T21:00:00Z
#> 5 10 2013-01-13T13:00:00Z
#> 6 40 2013-05-16T17:00:00Z
One can chain dplyr
verbs together like with a data.frame
c4 <- flights %>%
filter(month == 5, day == 17, carrier %in% c('UA', 'WN', 'AA', 'DL')) %>%
select(carrier, dep_delay, air_time, distance) %>%
mutate(air_time_hours = air_time / 60) %>%
collect %>%
arrange(carrier)# arrange should occur after `collect`
c4 %>% head
#> carrier dep_delay air_time distance air_time_hours
#> 1 AA -7 142 1089 2.366667
#> 2 AA -9 186 1389 3.100000
#> 3 AA -6 143 1096 2.383333
#> 4 AA -4 114 733 1.900000
#> 5 AA -2 146 1085 2.433333
#> 6 AA -7 119 733 1.983333
The disk.frame
implements the chunk_group_by
operation with a significant caveat. In the disk.frame
framework, group-by happens WITHIN each chunk and not ACROSS chunks. To achieve group by across chunk we need to put all rows with the same group keys into the same file chunk; this can be achieved with hard_group_by
. However, the hard_group_by
operation can be VERY TIME CONSUMING computationally and should be avoided if possible.
The hard_group_by
operation is best illustrated with an example, suppose a disk.frame
has three chunks
# chunk1 = 1.fst
# id n
#1 a 1
#2 a 2
#3 b 3
#4 d 4
# chunk2 = 2.fst
# id n
#1 a 4
#2 a 5
#3 b 6
#4 d 7
# chunk3 = 3.fst
# id n
#1 a 4
#2 b 5
#3 c 6
and notice that the id
column contains 3 distinct values "a"
,"b"
, and "c"
. To perform hard_group_by(df, by = id)
MAY give you the following disk.frame
where all the id
s with the same values end up in the same chunks.
# chunk1 = 1.fst
# id n
#1 b 3
#2 b 6
# chunk2 = 2.fst
# id n
#1 c 6
#2 d 4
#3 d 7
# chunk3 = 3.fst
# id n
#1 a 1
#2 a 2
#3 a 4
#4 a 5
#5 a 4
Also, notice that there is no guaranteed order for the distribution of the id
s to the chunks. The order is random, but each chunk is likely to have a similar number of rows, provided that id
does not follow a skewed distribution i.e. where a few distinct values make up the majority of the rows.
Typically, chunk_group_by
is performed WITHIN each chunk. This is not an issue if the chunks have already been sharded on the by
variables beforehand; however, if this is not the case then one may need a second stage aggregation to obtain the correct result, see Two-stage group by.
By forcing the user to choose chunk_group_by
(within each chunk) and hard_group_by
(across all chunks), this ensures that the user is conscious of the choice they are making. In sparklyr
the equivalent of a hard_group_by
is performed, which we should avoid, where possible, as it is time-consuming and expensive. Hence, disk.frame
has chosen to explain the theory and allow the user to make a conscious choice when performing group_by
.
flights.df %>%
hard_group_by(carrier) %>% # notice that hard_group_by needs to be set
chunk_summarize(count = n(), mean_dep_delay = mean(dep_delay, na.rm=T)) %>% # mean follows normal R rules
collect %>%
arrange(carrier)
#> Hashing...
#> Hashing...
#> Hashing...
#> Hashing...
#> Hashing...
#> Hashing...
#> Appending disk.frames:
#> # A tibble: 16 x 3
#> carrier count mean_dep_delay
#> <chr> <int> <dbl>
#> 1 9E 18460 16.7
#> 2 AA 32729 8.59
#> 3 AS 714 5.80
#> 4 B6 54635 13.0
#> 5 DL 48110 9.26
#> 6 EV 54173 20.0
#> 7 F9 685 20.2
#> 8 FL 3260 18.7
#> 9 HA 342 4.90
#> 10 MQ 26397 10.6
#> 11 OO 32 12.6
#> 12 UA 58665 12.1
#> 13 US 20536 3.78
#> 14 VX 5162 12.9
#> 15 WN 12275 17.7
#> 16 YV 601 19.0
For most group-by tasks, the user can achieve the desired result WITHOUT using hard = TRUE
by performing the group by in two stages. For example, suppose you aim to count the number of rows group by carrier
, you can set hard = F
to find the count within each chunk and then use a second group-by to summaries each chunk’s results into the desired result. For example,
flights.df %>%
chunk_group_by(carrier) %>% # `chunk_group_by` aggregates within each chunk
chunk_summarize(count = n()) %>% # mean follows normal R rules
collect %>% # collect each individul chunks results and row-bind into a data.table
group_by(carrier) %>%
summarize(count = sum(count)) %>%
arrange(carrier)
#> # A tibble: 16 x 2
#> carrier count
#> <chr> <int>
#> 1 9E 18460
#> 2 AA 32729
#> 3 AS 714
#> 4 B6 54635
#> 5 DL 48110
#> 6 EV 54173
#> 7 F9 685
#> 8 FL 3260
#> 9 HA 342
#> 10 MQ 26397
#> 11 OO 32
#> 12 UA 58665
#> 13 US 20536
#> 14 VX 5162
#> 15 WN 12275
#> 16 YV 601
Because this two-stage approach avoids the expensive hard group_by
operation, it is often significantly faster. However, it can be tedious to write; and this is a con of the disk.frame
chunking mechanism.
Note: this two-stage approach is similar to a map-reduce operation.
One can restrict which input columns to load into memory for each chunk; this can significantly increase the speed of data processing. To restrict the input columns, use the srckeep
function which only accepts column names as a string vector.
flights.df %>%
srckeep(c("carrier","dep_delay")) %>%
hard_group_by(carrier) %>%
chunk_summarize(count = n(), mean_dep_delay = mean(dep_delay, na.rm=T)) %>% # mean follows normal R rules
collect
#> Hashing...
#> Hashing...
#> Hashing...
#> Hashing...
#> Hashing...
#> Hashing...
#> Appending disk.frames:
#> # A tibble: 16 x 3
#> carrier count mean_dep_delay
#> <chr> <int> <dbl>
#> 1 B6 54635 13.0
#> 2 FL 3260 18.7
#> 3 MQ 26397 10.6
#> 4 US 20536 3.78
#> 5 HA 342 4.90
#> 6 9E 18460 16.7
#> 7 UA 58665 12.1
#> 8 AA 32729 8.59
#> 9 DL 48110 9.26
#> 10 EV 54173 20.0
#> 11 VX 5162 12.9
#> 12 WN 12275 17.7
#> 13 YV 601 19.0
#> 14 OO 32 12.6
#> 15 AS 714 5.80
#> 16 F9 685 20.2
Input column restriction is one of the most critical efficiencies provided by disk.frame
. Because the underlying format allows random access to columns (i.e. retrieve only the columns used for processing), hence one can drastically reduce the amount of data loaded into RAM for processing by keeping only those columns that are directly used to produce the results.
disk.frame
supports many dplyr joins including:
In all cases, the left dataset (x
) must be a disk.frame
, and the right dataset (y
) can be either a disk.frame
or a data.frame
. If the right dataset is a disk.frame
and the shardkey
s are different between the two disk.frame
s then two expensive hard
group_by
operations are performed eagerly, one on the left disk.frame
and one on the right disk.frame
to perform the joins correctly.
However, if the right dataset is a data.frame
then hard_group_by
s are only performed in the case of full_join
.
Note disk.frame
does not support right_join
the user should use left_join
instead.
The below joins are performed lazily because airlines.dt
is a data.table
not a disk.frame
:
# make airlines a data.table
airlines.dt <- data.table(airlines)
# flights %>% left_join(airlines, by = "carrier") #
flights.df %>%
left_join(airlines.dt, by ="carrier") %>%
collect %>%
head
#> year month day dep_time sched_dep_time dep_delay arr_time
#> 1: 2013 1 1 517 515 2 830
#> 2: 2013 1 1 533 529 4 850
#> 3: 2013 1 1 542 540 2 923
#> 4: 2013 1 1 544 545 -1 1004
#> 5: 2013 1 1 554 600 -6 812
#> 6: 2013 1 1 554 558 -4 740
#> sched_arr_time arr_delay carrier flight tailnum origin dest air_time
#> 1: 819 11 UA 1545 N14228 EWR IAH 227
#> 2: 830 20 UA 1714 N24211 LGA IAH 227
#> 3: 850 33 AA 1141 N619AA JFK MIA 160
#> 4: 1022 -18 B6 725 N804JB JFK BQN 183
#> 5: 837 -25 DL 461 N668DN LGA ATL 116
#> 6: 728 12 UA 1696 N39463 EWR ORD 150
#> distance hour minute time_hour name
#> 1: 1400 5 15 2013-01-01T10:00:00Z United Air Lines Inc.
#> 2: 1416 5 29 2013-01-01T10:00:00Z United Air Lines Inc.
#> 3: 1089 5 40 2013-01-01T10:00:00Z American Airlines Inc.
#> 4: 1576 5 45 2013-01-01T10:00:00Z JetBlue Airways
#> 5: 762 6 0 2013-01-01T11:00:00Z Delta Air Lines Inc.
#> 6: 719 5 58 2013-01-01T10:00:00Z United Air Lines Inc.
flights.df %>%
left_join(airlines.dt, by = c("carrier", "carrier")) %>%
collect %>%
tail
#> year month day dep_time sched_dep_time dep_delay arr_time
#> 1: 2013 9 30 NA 1842 NA NA
#> 2: 2013 9 30 NA 1455 NA NA
#> 3: 2013 9 30 NA 2200 NA NA
#> 4: 2013 9 30 NA 1210 NA NA
#> 5: 2013 9 30 NA 1159 NA NA
#> 6: 2013 9 30 NA 840 NA NA
#> sched_arr_time arr_delay carrier flight tailnum origin dest air_time
#> 1: 2019 NA EV 5274 N740EV LGA BNA NA
#> 2: 1634 NA 9E 3393 JFK DCA NA
#> 3: 2312 NA 9E 3525 LGA SYR NA
#> 4: 1330 NA MQ 3461 N535MQ LGA BNA NA
#> 5: 1344 NA MQ 3572 N511MQ LGA CLE NA
#> 6: 1020 NA MQ 3531 N839MQ LGA RDU NA
#> distance hour minute time_hour name
#> 1: 764 18 42 2013-09-30T22:00:00Z ExpressJet Airlines Inc.
#> 2: 213 14 55 2013-09-30T18:00:00Z Endeavor Air Inc.
#> 3: 198 22 0 2013-10-01T02:00:00Z Endeavor Air Inc.
#> 4: 764 12 10 2013-09-30T16:00:00Z Envoy Air
#> 5: 419 11 59 2013-09-30T15:00:00Z Envoy Air
#> 6: 431 8 40 2013-09-30T12:00:00Z Envoy Air
disk.frame
supports all data.frame
operations, unlike Spark which can only perform those operations that Spark has implemented. Hence windowing functions like rank
are supported out of the box.
# Find the most and least delayed flight each day
bestworst <- flights.df %>%
srckeep(c("year","month","day", "dep_delay")) %>%
chunk_group_by(year, month, day) %>%
select(dep_delay) %>%
filter(dep_delay == min(dep_delay, na.rm = T) || dep_delay == max(dep_delay, na.rm = T)) %>%
collect
#> Adding missing grouping variables: `year`, `month`, `day`
#> Warning in min(dep_delay, na.rm = T): no non-missing arguments to min;
#> returning Inf
#> Warning in max(dep_delay, na.rm = T): no non-missing arguments to max;
#> returning -Inf
#> Adding missing grouping variables: `year`, `month`, `day`
#> Adding missing grouping variables: `year`, `month`, `day`
#> Adding missing grouping variables: `year`, `month`, `day`
#> Adding missing grouping variables: `year`, `month`, `day`
#> Adding missing grouping variables: `year`, `month`, `day`
bestworst %>% head
#> # A tibble: 6 x 4
#> # Groups: year, month, day [1]
#> year month day dep_delay
#> <int> <int> <int> <int>
#> 1 2013 2 7 148
#> 2 2013 2 7 -2
#> 3 2013 2 7 1
#> 4 2013 2 7 6
#> 5 2013 2 7 -2
#> 6 2013 2 7 -2
# Rank each flight within a daily
ranked <- flights.df %>%
srckeep(c("year","month","day", "dep_delay")) %>%
chunk_group_by(year, month, day) %>%
select(dep_delay) %>%
mutate(rank = rank(desc(dep_delay))) %>%
collect
#> Adding missing grouping variables: `year`, `month`, `day`
#> Adding missing grouping variables: `year`, `month`, `day`
#> Adding missing grouping variables: `year`, `month`, `day`
#> Adding missing grouping variables: `year`, `month`, `day`
#> Adding missing grouping variables: `year`, `month`, `day`
#> Adding missing grouping variables: `year`, `month`, `day`
ranked %>% head
#> # A tibble: 6 x 5
#> # Groups: year, month, day [1]
#> year month day dep_delay rank
#> <int> <int> <int> <int> <dbl>
#> 1 2013 1 1 2 313
#> 2 2013 1 1 4 276
#> 3 2013 1 1 2 313
#> 4 2013 1 1 -1 440
#> 5 2013 1 1 -6 742
#> 6 2013 1 1 -4 633
One can apply arbitrary transformations to each chunk of the disk.frame
by using the delayed
function which evaluates lazily or the map.disk.frame(lazy = F)
function which evaluates eagerly. For example to return the number of rows in each chunk
flights.df1 <- delayed(flights.df, ~nrow(.x))
collect_list(flights.df1) %>% head # returns number of rows for each data.frame in a list
#> [[1]]
#> [1] 56131
#>
#> [[2]]
#> [1] 56131
#>
#> [[3]]
#> [1] 56131
#>
#> [[4]]
#> [1] 56131
#>
#> [[5]]
#> [1] 56131
#>
#> [[6]]
#> [1] 56121
and to do the same with map.disk.frame
map(flights.df, ~nrow(.x), lazy = F) %>% head
#> [[1]]
#> [1] 56131
#>
#> [[2]]
#> [1] 56131
#>
#> [[3]]
#> [1] 56131
#>
#> [[4]]
#> [1] 56131
#>
#> [[5]]
#> [1] 56131
#>
#> [[6]]
#> [1] 56121
The map
function can also output the results to another disk.frame folder, e.g.
# return the first 10 rows of each chunk
flights.df2 <- map(flights.df, ~.x[1:10,], lazy = F, outdir = file.path(tempdir(), "tmp2"), overwrite = T)
flights.df2 %>% head
#> year month day dep_time sched_dep_time dep_delay arr_time
#> 1: 2013 1 1 517 515 2 830
#> 2: 2013 1 1 533 529 4 850
#> 3: 2013 1 1 542 540 2 923
#> 4: 2013 1 1 544 545 -1 1004
#> 5: 2013 1 1 554 600 -6 812
#> 6: 2013 1 1 554 558 -4 740
#> sched_arr_time arr_delay carrier flight tailnum origin dest air_time
#> 1: 819 11 UA 1545 N14228 EWR IAH 227
#> 2: 830 20 UA 1714 N24211 LGA IAH 227
#> 3: 850 33 AA 1141 N619AA JFK MIA 160
#> 4: 1022 -18 B6 725 N804JB JFK BQN 183
#> 5: 837 -25 DL 461 N668DN LGA ATL 116
#> 6: 728 12 UA 1696 N39463 EWR ORD 150
#> distance hour minute time_hour
#> 1: 1400 5 15 2013-01-01T10:00:00Z
#> 2: 1416 5 29 2013-01-01T10:00:00Z
#> 3: 1089 5 40 2013-01-01T10:00:00Z
#> 4: 1576 5 45 2013-01-01T10:00:00Z
#> 5: 762 6 0 2013-01-01T11:00:00Z
#> 6: 719 5 58 2013-01-01T10:00:00Z
Notice disk.frame
supports the purrr
syntax for defining a function using ~
.
In the disk.frame
framework, sampling a proportion of rows within each chunk can be performed using sample_frac
.
flights.df %>% sample_frac(0.01) %>% collect %>% head
#> year month day dep_time sched_dep_time dep_delay arr_time sched_arr_time
#> 1 2013 1 9 1633 1635 -2 2009 1945
#> 2 2013 12 27 1603 1600 3 1906 1948
#> 3 2013 2 1 1700 1703 -3 1819 1810
#> 4 2013 1 16 555 600 -5 917 825
#> 5 2013 1 3 1725 1725 0 2024 2050
#> 6 2013 1 1 1909 1910 -1 2212 2224
#> arr_delay carrier flight tailnum origin dest air_time distance hour
#> 1 24 WN 891 N466WN EWR HOU 254 1411 16
#> 2 -42 DL 409 N727TW JFK SFO 342 2586 16
#> 3 9 EV 4373 N12569 EWR DCA 44 199 17
#> 4 52 MQ 4650 N530MQ LGA ATL 139 762 6
#> 5 -26 UA 512 N518UA JFK SFO 332 2586 17
#> 6 -12 DL 1629 N6710E JFK LAS 323 2248 19
#> minute time_hour
#> 1 35 2013-01-09T21:00:00Z
#> 2 0 2013-12-27T21:00:00Z
#> 3 3 2013-02-01T22:00:00Z
#> 4 0 2013-01-16T11:00:00Z
#> 5 25 2013-01-03T22:00:00Z
#> 6 10 2013-01-02T00:00:00Z
One can output a disk.frame
by using the write_disk.frame
function. E.g.
this will output a disk.frame to the folder “out”