eorst/rasterdataset/
processing.rs

1//! Processing methods for RasterDataset.
2//!
3//! This module contains methods for applying worker functions to raster blocks,
4//! including parallel processing, reduction operations, and mosaicking.
5
6use crate::core_types::RasterType;
7use crate::blocks::RasterBlock;
8use crate::metadata::RasterDataBlock;
9use crate::types::Dimension as UtilsDimension;
10use crate::types::{Offset, ReadWindow, Size};
11use crate::gdal_utils::{create_rayon_pool, create_temp_file, file_stem_str, mosaic, mosaic_translate_cleanup, mosaic_translate_cleanup_time_steps};
12use crate::parallel_writer::{self, ParallelGeoTiffWriter};
13use crate::rasterdataset::RasterDataset;
14use anyhow::Context;
15
16use kdam::par_tqdm;
17use ndarray::{Array2, Array3, Array4};
18use num_traits::NumCast;
19use rayon::{
20    iter::IndexedParallelIterator,
21    prelude::{IntoParallelIterator, ParallelIterator},
22};
23use std::fs;
24use std::path::{Path, PathBuf};
25
26impl<R> RasterDataset<R>
27where
28    R: RasterType,
29{
30    // ─── Apply with RasterDataBlock ───
31
32    /// Applies a worker function to each block, receiving a `RasterDataBlock` with metadata.
33    ///
34    /// The worker receives `&RasterDataBlock<R>` which includes the block data array,
35    /// `layer_indices`, `date_indices`, and other metadata. This enables semantic layer
36    /// and time selection via the [`Select`](crate::selection::Select) trait.
37    ///
38    /// Blocks are processed in parallel using a Rayon thread pool. Each block's result
39    /// is written directly to the output GeoTIFF via a [`ParallelGeoTiffWriter`](crate::parallel_writer::ParallelGeoTiffWriter),
40    /// eliminating intermediate files and the subprocess-based mosaic/translate phase.
41    ///
42    /// Returns `Err` if any worker invocation fails.
43    ///
44    /// # Example
45    ///
46    /// ```rust,ignore
47    /// // See apply_reduction below for a fully-runnable doctest.
48    /// // apply() follows the same pattern but returns Array4 and writes directly to GeoTIFF:
49    /// //
50    /// // fn worker(block: &RasterDataBlock<i16>) -> anyhow::Result<Array4<i16>> {
51    /// //     Ok(block.data.mapv(|v| v as i16))  // pass-through example
52    /// // }
53    /// //
54    /// // rds.apply::<i16>(worker, 1, &out_path)?;
55    /// ```
56    pub fn apply<U>(
57        &self,
58        worker: fn(&RasterDataBlock<R>) -> anyhow::Result<Array4<U>>,
59        n_cpus: usize,
60        out_file: &Path,
61    ) -> anyhow::Result<()>
62    where
63        U: RasterType,
64    {
65        let pool = create_rayon_pool(n_cpus);
66
67        let n_times = self.metadata.shape.times;
68        let n_layers = self.metadata.shape.layers;
69        let total_rows = self.metadata.shape.rows;
70        let total_cols = self.metadata.shape.cols;
71        let epsg_code = self.metadata.epsg_code;
72        let geo_transform = self.metadata.geo_transform;
73        let n_bands = n_times * n_layers;
74        let na_value: U = NumCast::from(self.metadata.na_value).unwrap_or(U::zero());
75
76        pool.install(|| -> anyhow::Result<()> {
77            // Phase 1: Pre-create the output GeoTIFF
78            parallel_writer::create_output_geotiff::<U>(
79                out_file,
80                &geo_transform,
81                epsg_code,
82                total_cols,
83                total_rows,
84                n_bands,
85                na_value,
86            )?;
87
88            let writer = ParallelGeoTiffWriter::new(
89                out_file.to_path_buf(),
90                geo_transform,
91                epsg_code,
92                total_cols,
93                total_rows,
94                n_bands,
95            );
96
97            // Phase 2: Process blocks in parallel, writing directly to output
98            par_tqdm!(self.blocks
99                .to_owned()
100                .into_par_iter())
101                .map(|raster_block| -> anyhow::Result<()> {
102                    let bid = raster_block.block_index;
103                    let block_data = self.read_block::<R>(bid);
104                    let raster_data_block = RasterDataBlock {
105                        data: block_data,
106                        metadata: self.metadata.clone(),
107                        no_data: NumCast::from(0).unwrap(),
108                    };
109                    let result = worker(&raster_data_block)?;
110
111                    // Flatten 4D (times, layers, rows, cols) → 3D (n_bands, rows, cols)
112                    let rows = result.shape()[2];
113                    let cols = result.shape()[3];
114                    let flat = result
115                        .into_shape_with_order((n_bands, rows, cols))
116                        .context("Failed to reshape 4D result to 3D for writing")?;
117
118                    // Trim overlap and adjust window
119                    let overlap = raster_block.overlap;
120                    let trimmed = crate::array_ops::trimm_array3_asymmetric(&flat, &overlap);
121                    let trimmed_rows = trimmed.shape()[1];
122                    let trimmed_cols = trimmed.shape()[2];
123
124                    let write_window = ReadWindow {
125                        offset: Offset {
126                            rows: raster_block.read_window.offset.rows + overlap.top as isize,
127                            cols: raster_block.read_window.offset.cols + overlap.left as isize,
128                        },
129                        size: Size {
130                            rows: trimmed_rows as isize,
131                            cols: trimmed_cols as isize,
132                        },
133                    };
134
135                    parallel_writer::write_block(&writer, trimmed, write_window)?;
136
137                    Ok(())
138                })
139                .collect::<anyhow::Result<Vec<_>>>()?;
140
141            Ok(())
142        })
143    }
144
145    /// Applies a worker function to each block and outputs a Cloud Optimized GeoTIFF.
146    ///
147    /// This is the COG variant of [`apply`](Self::apply). It follows the same block-processing
148    /// pattern but produces a proper COG with overviews and IFD reordering.
149    ///
150    /// The output process:
151    /// 1. Write blocks to an intermediate GeoTIFF (same as `apply`)
152    /// 2. Build overviews in-place via GDAL's `build_overviews`
153    /// 3. Translate to final COG via `gdal_translate -of COG` for proper IFD ordering
154    /// 4. Clean up intermediate file
155    ///
156    /// # Arguments
157    /// * `worker` - Worker function receiving `&RasterDataBlock<R>`, returning `Array4<U>`
158    /// * `n_cpus` - Number of CPU threads for parallel processing
159    /// * `out_file` - Output COG file path
160    /// * `config` - Output configuration controlling format, compression, overviews
161    pub fn apply_cog<U>(
162        &self,
163        worker: fn(&RasterDataBlock<R>) -> anyhow::Result<Array4<U>>,
164        n_cpus: usize,
165        out_file: &Path,
166        config: &crate::types::OutputConfig,
167    ) -> anyhow::Result<()>
168    where
169        U: RasterType,
170    {
171        use crate::types::OutputFormat;
172
173        match config.format {
174            OutputFormat::GeoTiff => {
175                // Simple path: no overviews, no COG translation
176                self.apply::<U>(worker, n_cpus, out_file)
177            }
178            OutputFormat::GeoTiffOverviews => {
179                // Write GeoTIFF, then build overviews in-place
180                self.apply::<U>(worker, n_cpus, out_file)?;
181
182                let writer = ParallelGeoTiffWriter::new(
183                    out_file.to_path_buf(),
184                    self.metadata.geo_transform,
185                    self.metadata.epsg_code,
186                    self.metadata.shape.cols,
187                    self.metadata.shape.rows,
188                    self.metadata.shape.times * self.metadata.shape.layers,
189                );
190                writer.build_overviews(&config.overview_resampling, &config.overview_levels)?;
191                Ok(())
192            }
193            OutputFormat::COG => {
194                // Write to intermediate, then COG driver creates overviews + reorders IFDs
195                let intermediate = PathBuf::from(create_temp_file("tif"));
196
197                self.apply::<U>(worker, n_cpus, &intermediate)?;
198
199                crate::gdal_utils::translate_to_cog(
200                    &intermediate, out_file,
201                    &config.compression,
202                    &config.overview_resampling,
203                )?;
204                fs::remove_file(&intermediate).ok();
205
206                Ok(())
207            }
208        }
209    }
210
211    /// Applies a worker function to pairs of blocks from two datasets.
212    ///
213    /// The worker receives `&RasterDataBlock<R>` and `&RasterDataBlock<U>` which include
214    /// `layer_indices`, `date_indices`, and other metadata. This enables semantic selection
215    /// via the [`Select`](crate::selection::Select) trait on both datasets.
216    ///
217    /// Useful for masking, zonal statistics, or combining two rasters (e.g. applying
218    /// a cloud mask, or computing NDVI with a land cover classification).
219    ///
220    /// Blocks are processed in parallel using a Rayon thread pool. Each block pair's result
221    /// is written directly to the output GeoTIFF.
222    ///
223    /// Returns `Err` if any worker invocation fails.
224    ///
225    /// ```rust,ignore
226    /// // See apply() for the primary usage pattern. apply_with_mask() takes
227    /// // a second dataset whose blocks are paired with the primary dataset's
228    /// // blocks, and a worker that receives both blocks simultaneously.
229    /// // Example worker signature:
230    /// //
231    /// // fn masked_ndvi(
232    /// //     raster: &RasterDataBlock<i16>,
233    /// //     mask: &RasterDataBlock<i16>,
234    /// // ) -> anyhow::Result<Array4<i16>> {
235    /// //     // mask.data is used to nullify invalid pixels
236    /// //     ...
237    /// // }
238    /// //
239    /// // rds.apply_with_mask::<i16, i16>(&mask_rds, masked_ndvi, 8, &out_path)?;
240    /// ```
241    pub fn apply_with_mask<U, V>(
242        &self,
243        other: &RasterDataset<U>,
244        worker: fn(&RasterDataBlock<R>, &RasterDataBlock<U>) -> anyhow::Result<Array4<V>>,
245        n_cpus: usize,
246        out_file: &Path,
247    ) -> anyhow::Result<()>
248    where
249        U: RasterType,
250        V: RasterType,
251    {
252        let pool = create_rayon_pool(n_cpus);
253
254        let n_times = self.metadata.shape.times;
255        let n_layers = self.metadata.shape.layers;
256        let total_rows = self.metadata.shape.rows;
257        let total_cols = self.metadata.shape.cols;
258        let epsg_code = self.metadata.epsg_code;
259        let geo_transform = self.metadata.geo_transform;
260        let n_bands = n_times * n_layers;
261        let na_value: V = NumCast::from(self.metadata.na_value).unwrap_or(V::zero());
262
263        pool.install(|| -> anyhow::Result<()> {
264            // Phase 1: Pre-create the output GeoTIFF
265            parallel_writer::create_output_geotiff::<V>(
266                out_file,
267                &geo_transform,
268                epsg_code,
269                total_cols,
270                total_rows,
271                n_bands,
272                na_value,
273            )?;
274
275            let writer = ParallelGeoTiffWriter::new(
276                out_file.to_path_buf(),
277                geo_transform,
278                epsg_code,
279                total_cols,
280                total_rows,
281                n_bands,
282            );
283
284            // Phase 2: Process block pairs in parallel, writing directly to output
285            par_tqdm!(self.blocks
286                .to_owned()
287                .into_par_iter()
288                .zip(other.blocks.to_owned()))
289                .map(|(raster_block_data, _raster_block_mask)| -> anyhow::Result<()> {
290                    let bid = raster_block_data.block_index;
291                    let block_data = self.read_block::<R>(bid);
292                    let mask_data = other.read_block::<U>(bid);
293
294                    let rdb_data = RasterDataBlock {
295                        data: block_data,
296                        metadata: self.metadata.clone(),
297                        no_data: NumCast::from(0).unwrap(),
298                    };
299                    let rdb_mask = RasterDataBlock {
300                        data: mask_data,
301                        metadata: other.metadata.clone(),
302                        no_data: NumCast::from(0).unwrap(),
303                    };
304
305                    let result = worker(&rdb_data, &rdb_mask)?;
306
307                    // Flatten 4D (times, layers, rows, cols) → 3D (n_bands, rows, cols)
308                    let rows = result.shape()[2];
309                    let cols = result.shape()[3];
310                    let flat = result
311                        .into_shape_with_order((n_bands, rows, cols))
312                        .context("Failed to reshape 4D result to 3D for writing")?;
313
314                    // Trim overlap and adjust window
315                    let overlap = raster_block_data.overlap;
316                    let trimmed = crate::array_ops::trimm_array3_asymmetric(&flat, &overlap);
317                    let trimmed_rows = trimmed.shape()[1];
318                    let trimmed_cols = trimmed.shape()[2];
319
320                    let write_window = ReadWindow {
321                        offset: Offset {
322                            rows: raster_block_data.read_window.offset.rows + overlap.top as isize,
323                            cols: raster_block_data.read_window.offset.cols + overlap.left as isize,
324                        },
325                        size: Size {
326                            rows: trimmed_rows as isize,
327                            cols: trimmed_cols as isize,
328                        },
329                    };
330
331                    parallel_writer::write_block(&writer, trimmed, write_window)?;
332
333                    Ok(())
334                })
335                .collect::<anyhow::Result<Vec<_>>>()?;
336
337            Ok(())
338        })
339    }
340
341    /// Applies a worker function to pairs of blocks and outputs a Cloud Optimized GeoTIFF.
342    ///
343    /// COG variant of [`apply_with_mask`](Self::apply_with_mask).
344    pub fn apply_with_mask_cog<U, V>(
345        &self,
346        other: &RasterDataset<U>,
347        worker: fn(&RasterDataBlock<R>, &RasterDataBlock<U>) -> anyhow::Result<Array4<V>>,
348        n_cpus: usize,
349        out_file: &Path,
350        config: &crate::types::OutputConfig,
351    ) -> anyhow::Result<()>
352    where
353        U: RasterType,
354        V: RasterType,
355    {
356        use crate::types::OutputFormat;
357
358        match config.format {
359            OutputFormat::GeoTiff => {
360                self.apply_with_mask::<U, V>(other, worker, n_cpus, out_file)
361            }
362            OutputFormat::GeoTiffOverviews => {
363                self.apply_with_mask::<U, V>(other, worker, n_cpus, out_file)?;
364
365                let writer = ParallelGeoTiffWriter::new(
366                    out_file.to_path_buf(),
367                    self.metadata.geo_transform,
368                    self.metadata.epsg_code,
369                    self.metadata.shape.cols,
370                    self.metadata.shape.rows,
371                    self.metadata.shape.times * self.metadata.shape.layers,
372                );
373                writer.build_overviews(&config.overview_resampling, &config.overview_levels)?;
374                Ok(())
375            }
376            OutputFormat::COG => {
377                let intermediate = PathBuf::from(create_temp_file("tif"));
378
379                self.apply_with_mask::<U, V>(other, worker, n_cpus, &intermediate)?;
380
381                crate::gdal_utils::translate_to_cog(
382                    &intermediate, out_file,
383                    &config.compression,
384                    &config.overview_resampling,
385                )?;
386                fs::remove_file(&intermediate).ok();
387
388                Ok(())
389            }
390        }
391    }
392
393    // ─── Mosaic ───
394
395    /// Creates a mosaic from the raster blocks.
396    pub fn apply_mosaic<T>(&self, n_cpus: usize)
397    where
398        T: RasterType,
399    {
400        unsafe { std::env::set_var("GDAL_NUM_THREADS", "16") };
401        let pool = create_rayon_pool(n_cpus);
402        let tmp_file = PathBuf::from(create_temp_file("vrt"));
403        let handle = pool.install(|| {
404            par_tqdm!(self.blocks.to_owned().into_par_iter()).map(
405                |raster_block: RasterBlock<R>| -> Vec<PathBuf> {
406                    let bid = raster_block.block_index;
407                    let file_stem = file_stem_str(&tmp_file);
408
409                    let block_data = self.read_block::<T>(bid);
410
411                    raster_block.write_time_step_blocks(
412                        &block_data,
413                        &tmp_file,
414                        file_stem,
415                        bid,
416                    )
417                },
418            )
419        });
420        let collected: Vec<Vec<PathBuf>> = handle.collect();
421
422        let out_file = PathBuf::from("mosaic.tif");
423        let n_times = self.metadata.shape.times;
424        log::info!("Saving mosaic to {:?}", out_file);
425        mosaic_translate_cleanup_time_steps(
426            &collected,
427            &out_file,
428            self.metadata.epsg_code,
429            n_times,
430        );
431    }
432
433    /// Deprecated: use [`apply_mosaic`](Self::apply_mosaic) instead.
434    #[deprecated(since = "0.3.2", note = "Use apply_mosaic() instead. mosaic() will be removed in a future release.")]
435    #[allow(deprecated)]
436    pub fn mosaic<T>(&self, n_cpus: usize)
437    where
438        T: RasterType,
439    {
440        self.apply_mosaic::<T>(n_cpus);
441    }
442
443    // ─── Reduce ───
444
445    /// Applies a worker function that reduces the data dimensionality over each block.
446    ///
447    /// The worker receives `&RasterDataBlock<R>` which includes `layer_indices`,
448    /// `date_indices`, and other metadata. This enables semantic selection via the
449    /// [`Select`](crate::selection::Select) trait.
450    ///
451    /// Common uses: mean over time (timesteps → 1), mean over layers (layers → 1), or
452    /// any other reduction operation. The result is written directly to a GeoTIFF
453    /// via a parallel writer — no intermediate files.
454    ///
455    /// # Example
456    ///
457    /// ```rust
458    /// extern crate eorst;
459    /// use std::path::PathBuf;
460    /// use eorst::{
461    ///     types::{BlockSize, Dimension}, DataSourceBuilder, RasterDatasetBuilder, RasterDataBlock,
462    ///     RasterDataset, Select,
463    /// };
464    /// use ndarray::{Array3, Axis};
465    ///
466    /// fn mean_over_time(block: &RasterDataBlock<i16>, dim: Dimension) -> Array3<i16> {
467    ///     // Reduce along a dimension, returning Array3 (e.g. mean over time)
468    ///     let as_f32: ndarray::Array4<f32> = block.data.mapv(|v| v as f32);
469    ///     let mean = as_f32.mean_axis(Axis(0)).unwrap();
470    ///     mean.mapv(|v| v as i16)
471    /// }
472    ///
473    /// let manifest_dir = env!("CARGO_MANIFEST_DIR");
474    /// let data_source = DataSourceBuilder::from_file(
475    ///     &PathBuf::from(manifest_dir).join("data/cemsre_t55jfm_20200614_sub_abam5.tif"),
476    /// )
477    /// .bands(vec![3, 4])
478    /// .set_names(vec!["red", "nir"])
479    /// .build();
480    ///
481    /// let rds: RasterDataset<i16> = RasterDatasetBuilder::from_source(&data_source)
482    ///     .block_size(BlockSize { cols: 256, rows: 256 })
483    ///     .build();
484    ///
485    /// let out_path = std::env::temp_dir().join("apply_reduction_mean.tif");
486    /// rds.apply_reduction::<i16>(mean_over_time, Dimension::Layer, 1, &out_path, i16::MIN);
487    /// ```
488    pub fn apply_reduction<U>(
489        &self,
490        worker: fn(&RasterDataBlock<R>, UtilsDimension) -> Array3<U>,
491        dimension: UtilsDimension,
492        n_cpus: usize,
493        out_file: &Path,
494        na_value: U,
495    ) where
496        U: RasterType,
497    {
498        let ext = out_file.extension().and_then(std::ffi::OsStr::to_str).unwrap();
499
500        let pool = create_rayon_pool(n_cpus);
501
502        let tmp_file = if ext != "vrt" {
503            PathBuf::from(create_temp_file("vrt"))
504        } else {
505            out_file.to_path_buf()
506        };
507
508        let handle = pool.install(|| {
509            par_tqdm!(self.blocks
510                .to_owned()
511                .into_par_iter())
512                .map(|raster_block: RasterBlock<R>| -> PathBuf {
513                    let id = raster_block.block_index;
514                    let file_stem = file_stem_str(&tmp_file);
515                    let block_fn = tmp_file.with_file_name(format!("{}_{}.tif", file_stem, id));
516                    let block_data = self.read_block::<R>(id);
517                    let rdb = RasterDataBlock {
518                        data: block_data,
519                        metadata: self.metadata.clone(),
520                        no_data: NumCast::from(0).unwrap(),
521                    };
522                    let result = worker(&rdb, dimension);
523                    self.write_window3(id, result, &block_fn, na_value);
524                    block_fn
525                })
526        });
527
528        let collected: Vec<PathBuf> = handle.collect();
529
530        mosaic(&collected, &tmp_file, self.metadata.epsg_code, None, None).expect("Could not mosaic to vrt");
531        if ext != "vrt" {
532            mosaic_translate_cleanup(&collected, &tmp_file, out_file, self.metadata.epsg_code);
533        } else {
534            fs::rename(tmp_file, out_file).unwrap();
535        }
536    }
537
538    /// Deprecated: use [`apply_reduction`](Self::apply_reduction) instead.
539    #[deprecated(since = "0.3.2", note = "Use apply_reduction() instead. reduce() will be removed in a future release.")]
540    #[allow(deprecated)]
541    pub fn reduce<U>(
542        &self,
543        worker: fn(&Array4<R>, UtilsDimension) -> Array3<U>,
544        dimension: UtilsDimension,
545        n_cpus: usize,
546        out_file: &Path,
547        na_value: U,
548    ) where
549        U: RasterType,
550    {
551        let ext = out_file.extension().and_then(std::ffi::OsStr::to_str).unwrap();
552
553        let pool = create_rayon_pool(n_cpus);
554
555        let tmp_file = if ext != "vrt" {
556            PathBuf::from(create_temp_file("vrt"))
557        } else {
558            out_file.to_path_buf()
559        };
560
561        let handle = pool.install(|| {
562            self.blocks
563                .to_owned()
564                .into_par_iter()
565                .map(|raster_block: RasterBlock<R>| -> PathBuf {
566                    let id = raster_block.block_index;
567                    let file_stem = file_stem_str(&tmp_file);
568                    let block_fn = tmp_file.with_file_name(format!("{}_{}.tif", file_stem, id));
569                    let block_data = self.read_block::<R>(id);
570                    let result = worker(&block_data, dimension);
571                    self.write_window3(id, result, &block_fn, na_value);
572                    block_fn
573                })
574        });
575
576        let collected: Vec<PathBuf> = handle.collect();
577
578        mosaic(&collected, &tmp_file, self.metadata.epsg_code, None, None).expect("Could not mosaic to vrt");
579        if ext != "vrt" {
580            mosaic_translate_cleanup(&collected, &tmp_file, out_file, self.metadata.epsg_code);
581        } else {
582            fs::rename(tmp_file, out_file).unwrap();
583        }
584    }
585
586    /// Reduces the raster dataset with another dataset as a mask.
587    ///
588    /// The worker receives `&RasterDataBlock<R>` and `&RasterDataBlock<U>` which include
589    /// `layer_indices`, `date_indices`, and other metadata.
590    ///
591    /// This version uses a parallel writer that writes blocks directly to the output GeoTIFF,
592    /// eliminating intermediate files and the subprocess-based mosaic/translate phase.
593    pub fn apply_reduction_with_mask<U, V>(
594        &self,
595        other: &RasterDataset<U>,
596        worker: fn(&RasterDataBlock<R>, &RasterDataBlock<U>, UtilsDimension) -> Array3<V>,
597        dimension: UtilsDimension,
598        n_cpus: usize,
599        out_file: &Path,
600        na_value: V,
601    ) where
602        V: RasterType,
603        U: RasterType,
604    {
605        log::info!("Starting apply_reduction_with_mask. Using {:?} cpus.", n_cpus);
606        let pool = create_rayon_pool(n_cpus);
607
608        let total_rows = self.metadata.shape.rows;
609        let total_cols = self.metadata.shape.cols;
610        let epsg_code = self.metadata.epsg_code;
611        let geo_transform = self.metadata.geo_transform;
612        let n_bands: usize = 1; // reduction always produces 1 output band
613
614        pool.install(|| -> anyhow::Result<()> {
615            // Phase 1: Pre-create the output GeoTIFF
616            parallel_writer::create_output_geotiff::<V>(
617                out_file,
618                &geo_transform,
619                epsg_code,
620                total_cols,
621                total_rows,
622                n_bands,
623                na_value,
624            )?;
625
626            let writer = ParallelGeoTiffWriter::new(
627                out_file.to_path_buf(),
628                geo_transform,
629                epsg_code,
630                total_cols,
631                total_rows,
632                n_bands,
633            );
634
635            // Phase 2: Process block pairs in parallel, writing directly to output
636            self.blocks
637                .to_owned()
638                .into_par_iter()
639                .zip(other.blocks.to_owned().into_par_iter())
640                .map(|(raster_block_data, _raster_block_mask)| -> anyhow::Result<()> {
641                    let id = raster_block_data.block_index;
642                    let block_data = self.read_block::<R>(id);
643                    let mask = other.read_block::<U>(id);
644                    let rdb_data = RasterDataBlock {
645                        data: block_data,
646                        metadata: self.metadata.clone(),
647                        no_data: NumCast::from(0).unwrap(),
648                    };
649                    let rdb_mask = RasterDataBlock {
650                        data: mask,
651                        metadata: other.metadata.clone(),
652                        no_data: NumCast::from(0).unwrap(),
653                    };
654                    let result = worker(&rdb_data, &rdb_mask, dimension);
655
656                    // Trim overlap and write at the block's output window
657                    let overlap_sz = raster_block_data.overlap_size;
658                    let trimmed = crate::array_ops::trimm_array3(&result, overlap_sz);
659                    let trimmed_rows = trimmed.shape()[1];
660                    let trimmed_cols = trimmed.shape()[2];
661
662                    let write_window = ReadWindow {
663                        offset: Offset {
664                            rows: raster_block_data.read_window.offset.rows + overlap_sz as isize,
665                            cols: raster_block_data.read_window.offset.cols + overlap_sz as isize,
666                        },
667                        size: Size {
668                            rows: trimmed_rows as isize,
669                            cols: trimmed_cols as isize,
670                        },
671                    };
672
673                    parallel_writer::write_block(&writer, trimmed, write_window)?;
674
675                    Ok(())
676                })
677                .collect::<anyhow::Result<Vec<_>>>()?;
678
679            Ok(())
680        })
681        .expect("Parallel reduction failed");
682    }
683
684    /// Reduces the raster dataset with another dataset as a mask, outputting a Cloud Optimized GeoTIFF.
685    ///
686    /// COG variant of [`apply_reduction_with_mask`](Self::apply_reduction_with_mask).
687    pub fn apply_reduction_with_mask_cog<U, V>(
688        &self,
689        other: &RasterDataset<U>,
690        worker: fn(&RasterDataBlock<R>, &RasterDataBlock<U>, UtilsDimension) -> Array3<V>,
691        dimension: UtilsDimension,
692        n_cpus: usize,
693        out_file: &Path,
694        na_value: V,
695        config: &crate::types::OutputConfig,
696    ) where
697        V: RasterType,
698        U: RasterType,
699    {
700        use crate::types::OutputFormat;
701
702        match config.format {
703            OutputFormat::GeoTiff => {
704                self.apply_reduction_with_mask::<U, V>(other, worker, dimension, n_cpus, out_file, na_value)
705            }
706            OutputFormat::GeoTiffOverviews => {
707                self.apply_reduction_with_mask::<U, V>(other, worker, dimension, n_cpus, out_file, na_value);
708
709                let writer = ParallelGeoTiffWriter::new(
710                    out_file.to_path_buf(),
711                    self.metadata.geo_transform,
712                    self.metadata.epsg_code,
713                    self.metadata.shape.cols,
714                    self.metadata.shape.rows,
715                    1,
716                );
717                writer.build_overviews(&config.overview_resampling, &config.overview_levels).ok();
718            }
719            OutputFormat::COG => {
720                let intermediate = PathBuf::from(create_temp_file("tif"));
721
722                self.apply_reduction_with_mask::<U, V>(other, worker, dimension, n_cpus, &intermediate, na_value);
723
724                crate::gdal_utils::translate_to_cog(
725                    &intermediate, out_file,
726                    &config.compression,
727                    &config.overview_resampling,
728                ).ok();
729                fs::remove_file(&intermediate).ok();
730            }
731        }
732    }
733
734    /// Deprecated: use [`apply_reduction_with_mask`](Self::apply_reduction_with_mask) instead.
735    #[deprecated(since = "0.3.2", note = "Use apply_reduction_with_mask() instead. reduce_with_mask() will be removed in a future release.")]
736    #[allow(deprecated)]
737    pub fn reduce_with_mask<U, V>(
738        &self,
739        other: &RasterDataset<U>,
740        worker: fn(&Array4<R>, &Array4<U>, UtilsDimension) -> Array3<V>,
741        dimension: UtilsDimension,
742        n_cpus: usize,
743        out_file: &Path,
744        na_value: V,
745    ) where
746        V: RasterType,
747        U: RasterType,
748    {
749        log::info!("Starting reduce with mask. Using {:?} cpus.", n_cpus);
750        let pool = create_rayon_pool(n_cpus);
751
752        let tmp_file = PathBuf::from(create_temp_file("vrt"));
753
754        let handle = pool.install(|| {
755            self.blocks
756                .to_owned()
757                .into_par_iter()
758                .zip(other.blocks.to_owned().into_par_iter())
759                .map(|(raster_block_data, _raster_block_mask)| -> PathBuf {
760                    let id = raster_block_data.block_index;
761                    let file_stem = file_stem_str(&tmp_file);
762                    let block_fn = tmp_file.with_file_name(format!("{}_{}.tif", file_stem, id));
763                    let block_data = self.read_block::<R>(id);
764                    let mask = other.read_block::<U>(id);
765                    let result = worker(&block_data, &mask, dimension);
766                    self.write_window3(id, result, &block_fn, na_value);
767                    block_fn
768                })
769        });
770
771        let collected: Vec<PathBuf> = handle.collect();
772        mosaic_translate_cleanup(&collected, &tmp_file, out_file, self.metadata.epsg_code);
773    }
774
775    /// Applies a worker function that reduces rows to pixel values.
776    ///
777    /// Note: this method reads data as type `T` which may differ from the dataset's
778    /// stored type `R`, so the worker receives `&Array4<T>` rather than `&RasterDataBlock<T>`.
779    pub fn apply_reduction_row_pixel<T>(
780        &self,
781        worker: fn(&Array4<T>, UtilsDimension) -> Array2<T>,
782        na_value: T,
783        dimension: UtilsDimension,
784        n_cpus: usize,
785        out_file: &Path,
786    ) where
787        T: RasterType,
788    {
789        let pool = create_rayon_pool(n_cpus);
790        let tmp_file = PathBuf::from(create_temp_file("vrt"));
791
792        let handle = pool.install(|| {
793            self.blocks
794                .to_owned()
795                .into_par_iter()
796                .map(|raster_block: RasterBlock<R>| -> PathBuf {
797                    let id = raster_block.block_index;
798                    let file_stem = file_stem_str(&tmp_file);
799                    let block_fn = tmp_file.with_file_name(format!("{}_{}.tif", file_stem, id));
800                    let block_data = self.read_block::<T>(id);
801                    let result = worker(&block_data, dimension);
802                    raster_block.write_samples_feature(&result, &block_fn, na_value);
803                    block_fn
804                })
805        });
806
807        let collected: Vec<PathBuf> = handle.collect();
808        mosaic_translate_cleanup(&collected, &tmp_file, out_file, self.metadata.epsg_code);
809    }
810
811    /// Deprecated: use [`apply_reduction_row_pixel`](Self::apply_reduction_row_pixel) instead.
812    #[deprecated(since = "0.3.2", note = "Use apply_reduction_row_pixel() instead. reduce_row_pixel() will be removed in a future release.")]
813    #[allow(deprecated)]
814    pub fn reduce_row_pixel<T>(
815        &self,
816        worker: fn(&Array4<T>, UtilsDimension) -> Array2<T>,
817        na_value: T,
818        dimension: UtilsDimension,
819        n_cpus: usize,
820        out_file: &Path,
821    ) where
822        T: RasterType,
823    {
824        self.apply_reduction_row_pixel(worker, na_value, dimension, n_cpus, out_file);
825    }
826
827    /// Reduces rows with a mask dataset.
828    ///
829    /// Note: this method reads data as types `R` and `V` matching the datasets,
830    /// so the worker receives raw `&Array4` references.
831    pub fn apply_reduction_row_pixel_with_mask<V, U>(
832        &self,
833        other: &RasterDataset<V>,
834        worker: fn(&Array4<R>, &Array4<V>, UtilsDimension) -> Array2<U>,
835        na_value: U,
836        dimension: UtilsDimension,
837        n_cpus: usize,
838        out_file: &Path,
839    ) where
840        U: RasterType,
841        V: RasterType,
842    {
843        let pool = create_rayon_pool(n_cpus);
844
845        let tmp_file = PathBuf::from(create_temp_file("vrt"));
846
847        let handle = pool.install(|| {
848            self.blocks
849                .to_owned()
850                .into_par_iter()
851                .zip(other.blocks.to_owned().into_par_iter())
852                .map(|(raster_block_data, _raster_block_mask)| -> PathBuf {
853                    let id = raster_block_data.block_index;
854                    let file_stem = file_stem_str(&tmp_file);
855                    let block_fn = tmp_file.with_file_name(format!(
856                        "{}_{}.tif",
857                        file_stem, raster_block_data.block_index
858                    ));
859                    let block_data = self.read_block::<R>(id);
860                    let mask = other.read_block::<V>(id);
861                    let result = worker(&block_data, &mask, dimension);
862                    raster_block_data.write_samples_feature(&result, &block_fn, na_value);
863                    block_fn
864                })
865        });
866
867        let collected: Vec<PathBuf> = handle.collect();
868        mosaic_translate_cleanup(&collected, &tmp_file, out_file, self.metadata.epsg_code);
869    }
870
871    /// Deprecated: use [`apply_reduction_row_pixel_with_mask`](Self::apply_reduction_row_pixel_with_mask) instead.
872    #[deprecated(since = "0.3.2", note = "Use apply_reduction_row_pixel_with_mask() instead. reduce_row_pixel_with_mask() will be removed in a future release.")]
873    #[allow(deprecated)]
874    pub fn reduce_row_pixel_with_mask<V, U>(
875        &self,
876        other: &RasterDataset<V>,
877        worker: fn(&Array4<R>, &Array4<V>, UtilsDimension) -> Array2<U>,
878        na_value: U,
879        dimension: UtilsDimension,
880        n_cpus: usize,
881        out_file: &Path,
882    ) where
883        U: RasterType,
884        V: RasterType,
885    {
886        self.apply_reduction_row_pixel_with_mask(other, worker, na_value, dimension, n_cpus, out_file);
887    }
888}