Your suggestions for processing a large number of big text files ...?

Hi, hope you all are having a good time …!!!

I have a project in which I have to process a number of text files which are outputs of sensory data.

  • Total number is around 5,000 and each file is about 100MB.
  • Current sample is 50 at around 5GB.
  • Read in the content of each file.
  • Apply some regex operations and other transformations.
  • Collect the transformed data into tidy format, essentially into string[][] with sample element [| “ID”; “SensorType”; “SensorNumber”; “SensorTag”; “Measurement” |].
  • Turn the combined data into a master csv file.

The first issue is that this is pretty resource consuming essentially gobbling up all CPU and RAM on a decent system. For a single file it takes around 20 secs, 2 @ 45 secs, 50 @ 45 mins.

I have already utilised async workflows and Array.Parallel to handle File I/O and various large-array computations where I felt appropriate/suitable.

Without resorting to cloud computing and alike, are there ways and tools to improve local performance any further?

The second issue is the final csv format and its huge size. I am thinking that:

  • If I create a database with separate tables so as to remove redundancy and reduce size, I have added a database connection and data-retrieval overhead.
  • I cannot think of a more compact/fast-access balanced way to store ready-for-analytics consumption data format than a csv – it is just excellent for turning into dataframes.

I would welcome any performance improving suggestions here — what am I missing here?!

Thank You

It is a bit complicated to advice something without seeing the current implementation.

Can you share your code or mcve (minimal, complete and verifiable example) that contains exactly the same approach as you use in your real code and give a link to a single file with data? (again it could be meaningless random data). Then everyone will be able to test and make suggestions.

Does the data independent between files?

Hi, thank you. Cannot share any data but got permission to share some code.

open System
open System.Collections.Generic
open System.Globalization
open System.IO
open System.Text.RegularExpressions

open FSharp.Collections
open FSharp.Control

// Find index of the n-th element in a sequence, satisfying the predicate pred.
let tryFindIndexNth n pred (sqn : #seq<'T>) =
    let ary            = sqn |> Seq.toArray
    let lookasideTable = new Dictionary<_,_>(HashIdentity.Structural)

    let rec loop acc n pred ary =
        if lookasideTable.ContainsKey n then lookasideTable.[n]
        elif ary |> Array.isEmpty then
            let res = ValueNone
            lookasideTable.Add(n, res)
            res
        elif n = 0 then
            let res = ValueSome (acc - 1)
            lookasideTable.Add(n, res)
            res
        else
            let indOpt = ary |> Array.tryFindIndex pred
            let ind, ary =
                match indOpt with
                | None     -> 0  , Array.empty
                | Some ind -> ind, ary |> Array.skip (ind + 1)
            let res = ary |> loop (ind + 1 + acc) (n - 1) pred
            lookasideTable.Add(n, res)
            res

    ary |> loop 0 n pred

// define delimiter pattern and split the text accordingly
let delimPat = @"(?:[\s;,]+)"
let skipChar = @":"

// number of pressure and impedance sensors
let nPres = 36
let nImpd = 18

// trim the string and replace the skip-characters with ""
let clean (str : string) = Regex.Replace(str.Trim(), skipChar, "")

// split the string based on given delimiter pattern
let split str delimPat = Regex.Split(str |> clean, delimPat)

// split the string based on the delimPat delimiter pattern
let splitPat str = delimPat |> split str

// turn string to title case
let toTitleCase str =
    let culInfo   = new CultureInfo("en-US", false)
    let txtInfo   = culInfo.TextInfo
    let titleCase = str |> txtInfo.ToTitleCase
    let delimPat  = @"(?:[\s]+)"

    delimPat
    |> split titleCase
    |> String.concat ""

// process each line of data
// split each line of data based on a delimiter pattern
// turn each word into a title case
let processLine line =
    line
    |> splitPat
    |> Array.map (fun elem -> elem |> toTitleCase)

// asynchronously read-in contents of data file
let readFileAsync file = async {
    use  stream  = new FileStream(file, FileMode.Open, FileAccess.Read, FileShare.None)
    use  reader  = new StreamReader(stream)
    let! string  = reader.ReadToEndAsync()
    let  content = @"\n" |> split string
    return content
}

// write data into a file
let writeFile file (data : string[]) =
    use stream = new FileStream(file, FileMode.Create, FileAccess.Write, FileShare.None)
    use writer = new StreamWriter(stream)

    data
    |> Array.map ( fun line -> writer.WriteLine(line) )

// generate a times series record ID based on data file name
let recordId file =
    let info = new FileInfo(file)
    let name = info.Name
    let extn = info.Extension

    Regex.Replace(name, extn, "")

// asynchronously read-in contents of a data file and apply a transformation in-place
let readFileMapAsync file (f : string -> 'T) = async {
    let! content = file |> readFileAsync
    let  result  = content |> Array.Parallel.map (fun line -> line |> f)
    return result
}

// generate an array of data file names
let rec allFiles folder =
    [| for file      in folder |> Directory.GetFiles       do yield  file
       for subFolder in folder |> Directory.GetDirectories do yield! subFolder |> allFiles |]

// read-in the content of a data file
let readFile file =
    processLine
    |> readFileMapAsync file
    |> Async.RunSynchronously

// separate data and metadata
let separateData file =
    let flag = "Annotations"
    let rawd = file |> readFile

    // separate raw data to data and metadata
    // Separation occurs at the line with "Annotations" flag.
    // Lines above the flag line are data.
    // Lines below the flag line are metadata.
    let arrayPartition flg (arr : string[][]) =
        let arrLen  = arr.Length
        let flagInd = arr |> Array.tryFindIndexBack (fun arr -> arr.[0] = flg)
        let empty   = [| Array.Empty<string>(); Array.Empty<string>() |]

        let result  =
            match flagInd with
            | None     -> arr, empty
            | Some ind ->
                let count1 = ind
                let count2 = arrLen - count1 - 1
                let part1 = count1 |> Array.sub arr 0
                let part2 =
                    count2
                    |> Array.sub arr (ind + 1)
                    |> Array.map ( fun arr ->
                        let part1 = arr.[0]
                        let part2 =
                            arr
                            |> Array.tail
                            |> Array.reduce (fun elem1 elem2 -> elem1 + " " + elem2)
                        [|part1; part2|] )
                part1, part2

        result

    rawd |> arrayPartition flag

// partition data to time data, pressure data, and impedance data
let partition data =
    let ncolsPressure = nPres
    let ncolsImpedenc = nImpd

    // auxiliary helper function for partitioning data to time, pressure, and impedance data
    let dataPartition ncols1 ncols2 (data : 'T[][]) =
        let len = ncols1 + ncols2 + 1
        let data =
            data.[1..]
            |> Array.Parallel.map ( fun arr ->
                let arrLen = arr.Length
                if arrLen = len then
                    arr
                else
                    fun i -> null
                    |> Array.init (len - arrLen)
                    |> Array.append arr )

        let time  = data |> Array.map ( fun arr -> arr.[0] )
        let press = data |> Array.map ( fun arr -> Array.sub arr 1 ncols1)
        let imped = data |> Array.map ( fun arr -> Array.sub arr (ncols1 + 1) ncols2 )

        time, press, imped

    data |> dataPartition ncolsPressure ncolsImpedenc

// reshape metadata from array[][] to (array[], array[])
let metadataReshape (metadata : string[][]) =
    let emptyArr = Array.Empty<string>()
    let empty    = [| emptyArr; emptyArr |]

    if metadata = empty then
        emptyArr, emptyArr
    else
        let vals = metadata |> Array.map ( fun arr -> arr.[0] )
        let info = metadata |> Array.map ( fun arr -> arr.[1] )

        info, vals

// transform data for each file and collect in array
// add column headers
let generateDataFrame path =
    // boundaries for each group in pressure and impedance measurements
    let presLims = [| 1; 4; 7; 27; 32; 37 |]
    let impdLims = [| 1; 3; 5; 14; 16; 19 |]

    // helper function to check for tagging the time series
    let predNum num lb ub = (num >= lb) && (num < ub)

    // helper function to tag the pressure sensor time series
    let presTagger (num: string) =
        let num = int num

        if   (predNum num presLims.[0] presLims.[1]) then "Pharynx"
        elif (predNum num presLims.[1] presLims.[2]) then "UES"
        elif (predNum num presLims.[2] presLims.[3]) then "Esophagus"
        elif (predNum num presLims.[3] presLims.[4]) then "LES"
        elif (predNum num presLims.[4] presLims.[5]) then "Stomach"
        else                                              "Faulty"

    // helper function to tag the impedance sensor time series
    let impdTagger (num: string) =
        let num = int num
        if   (predNum num impdLims.[0] impdLims.[1]) then "Pharynx"
        elif (predNum num impdLims.[1] impdLims.[2]) then "UES"
        elif (predNum num impdLims.[2] impdLims.[3]) then "Esophagus"
        elif (predNum num impdLims.[3] impdLims.[4]) then "LES"
        elif (predNum num impdLims.[4] impdLims.[5]) then "Stomach"
        else                                              "Faulty"

    // transform raw data to tidy format ready for csv consumption
    let transform file =
        let data, meta       = file |> separateData
        let time, pres, impd = data |> partition
        let info, vals       = meta |> metadataReshape

        let cutoff =
            let cutoffConst = 3
            let res = info |> tryFindIndexNth 2 (fun info -> info = "Wet Swallow")

            match res with
            | ValueSome ind -> vals.[ind]
            | ValueNone     -> time.[time.Length / cutoffConst - 1]

        let ind =
            let res = time |> Array.tryFindIndex (fun t -> float t >= float cutoff)

            match res with
            | Some ind -> ind - 1
            | None     -> time.Length - 1

        let cutoffPres = pres.[0..ind]
        let cutoffImpd = impd.[0..ind]

        let recId      = file |> recordId

        let presRecs =
            cutoffPres
            |> Array.Parallel.mapi ( fun i row ->
                row |> Array.mapi ( fun j x ->
                    let typ = "Pressure"
                    let num = string (j + 1)
                    let tag = presTagger num

                    [| recId; typ; num; time.[i]; tag; x |] ) )

            |> Array.concat

        let impdRecs =
            cutoffImpd
            |> Array.Parallel.mapi ( fun i row ->
                row |> Array.mapi ( fun j x ->
                    let typ = "Impedance"
                    let num = string (j + 1)
                    let tag = impdTagger num

                    [| recId; typ; num; time.[i]; tag; x |] ) )
            |> Array.concat

        impdRecs |> Array.append presRecs

    path
    |> allFiles
    |> Array.Parallel.collect (fun file -> file |> transform)
    |> Array.append [| [| "ID"; "SensorType"; "SensorNumber"; "Tag"; "Time"; "Measurement" |] |]

I haven’t read the rest of the code, but if the following lines are those reading the 100mb files:

let! string  = reader.ReadToEndAsync()
let  content = @"\n" |> split string

I would give a shot at using a StreamReader and calls to ReadLine instead of reading all in a string, then spliting that string like it is done.

I’m totally agree with comment above.

Besides, some other thoughts after a quick look at the code.

  1. Unsure how big is part2 but using of Array.reduce to concatenate string like this:
Array.reduce (fun elem1 elem2 -> elem1 + " " + elem2)

is bad idea. Use fast String.concat " ".

  1. If data is big then better to reduce count of iteration over it to create something new. For example, here:
let time  = data |> Array.map ( fun arr -> arr.[0] )
let press = data |> Array.map ( fun arr -> Array.sub arr 1 ncols1 )
let imped = data |> Array.map ( fun arr -> Array.sub arr (ncols1 + 1) ncols2 )

Three times! Instead a simple for will be a better option:

let dataPartition ncols1 ncols2 (data : 'T[][]) =
    let lng = ncols1 + ncols2 + 1

    let len = data.Length  - 1

    let time = Array.zeroCreate len
    let press = Array.zeroCreate len
    let imped = Array.zeroCreate len

    for i = 1 to data.Length-1 do
        let arr = data.[i]
        let arrLen = arr.Length
        let arr' = 
            if arrLen = lng then arr
            else
                fun i -> null
                |> Array.init (len - arrLen)
                |> Array.append arr
        time.[i - 1] <- arr'.[0] 
        press.[i - 1] <- Array.sub arr' 1 ncols1
        imped.[i - 1] <- Array.sub arr' (ncols1 + 1) ncols2

    time, press, imped
  1. metadataReshape looks like unnecessary function. It uses only to reshape values from separateData. So, it seems like you can return it in a correct way from separateData. Part of the function is below:
let empty   = Array.Empty<string>(), Array.Empty<string>()

let result  =
    match flagInd with
    | None     -> arr, empty
    | Some ind ->
        let count1 = ind
        let count2 = arrLen - count1 - 1
        let part1 = count1 |> Array.sub arr 0
        let info = Array.zeroCreate count2
        let vals = Array.zeroCreate count2
        for i in (ind + 1)..(count2 - 1) do
            info.[ i - ind - 1 ] <- arr.[i].[0]
            let val' =  
                arr.[i]
                |> Array.tail
                |> String.concat " "
            vals.[ i - ind - 1 ] <- val'
        part1, (info, vals)

So the point is to flat it as much as possible.


If the purpose is only “read-transform-write” then I’d consider rewriting all of it in a lazy way - Seq instead of Array.

Hey, plenty appreciated. metadataReshape was a leftover from a different set of requirements. I implemented two of your suggestion albeit a tad differently with around 10% increase in speed.

let dataPartition ncols1 ncols2 (data : 'T[][]) =
    let siz  = data.Length
    let len  = ncols1 + ncols2 + 1
    let time = Array.zeroCreate siz
    let pres = Array.zeroCreate siz
    let impd = Array.zeroCreate siz

    data.[1..]
    |> Array.Parallel.iteri ( fun ind arr ->
        let arrLen = arr.Length
        let arr' =
    
        if arrLen = len then
            arr
        else
            fun i -> null
            |> Array.init (len - arrLen)
            |> Array.append arr

         time.[ind] <- arr'.[0]
         pres.[ind] <- Array.sub arr' 1 ncols1
         impd.[ind] <- Array.sub arr' (ncols1 + 1) ncols2 )

    time, pres, impd

Turning the outer layer to seq (seq<string[]> rather than string[][]) makes sense in the code logic but it has a real nasty performance hit @ upwards of 3-4 mins for processing 2 files …I have tried Seq, PSeq, Stream, and ParStream on the outer layer without much luck!

thank you again