I have a project in which I have to process a number of text files which are outputs of sensory data.
Total number is around 5,000 and each file is about 100MB.
Current sample is 50 at around 5GB.
Read in the content of each file.
Apply some regex operations and other transformations.
Collect the transformed data into tidy format, essentially into string[][] with sample element [| “ID”; “SensorType”; “SensorNumber”; “SensorTag”; “Measurement” |].
Turn the combined data into a master csv file.
The first issue is that this is pretty resource consuming essentially gobbling up all CPU and RAM on a decent system. For a single file it takes around 20 secs, 2 @ 45 secs, 50 @ 45 mins.
I have already utilised async workflows and Array.Parallel to handle File I/O and various large-array computations where I felt appropriate/suitable.
Without resorting to cloud computing and alike, are there ways and tools to improve local performance any further?
The second issue is the final csv format and its huge size. I am thinking that:
If I create a database with separate tables so as to remove redundancy and reduce size, I have added a database connection and data-retrieval overhead.
I cannot think of a more compact/fast-access balanced way to store ready-for-analytics consumption data format than a csv – it is just excellent for turning into dataframes.
I would welcome any performance improving suggestions here — what am I missing here?!
It is a bit complicated to advice something without seeing the current implementation.
Can you share your code or mcve (minimal, complete and verifiable example) that contains exactly the same approach as you use in your real code and give a link to a single file with data? (again it could be meaningless random data). Then everyone will be able to test and make suggestions.
Hi, thank you. Cannot share any data but got permission to share some code.
open System
open System.Collections.Generic
open System.Globalization
open System.IO
open System.Text.RegularExpressions
open FSharp.Collections
open FSharp.Control
// Find index of the n-th element in a sequence, satisfying the predicate pred.
let tryFindIndexNth n pred (sqn : #seq<'T>) =
let ary = sqn |> Seq.toArray
let lookasideTable = new Dictionary<_,_>(HashIdentity.Structural)
let rec loop acc n pred ary =
if lookasideTable.ContainsKey n then lookasideTable.[n]
elif ary |> Array.isEmpty then
let res = ValueNone
lookasideTable.Add(n, res)
res
elif n = 0 then
let res = ValueSome (acc - 1)
lookasideTable.Add(n, res)
res
else
let indOpt = ary |> Array.tryFindIndex pred
let ind, ary =
match indOpt with
| None -> 0 , Array.empty
| Some ind -> ind, ary |> Array.skip (ind + 1)
let res = ary |> loop (ind + 1 + acc) (n - 1) pred
lookasideTable.Add(n, res)
res
ary |> loop 0 n pred
// define delimiter pattern and split the text accordingly
let delimPat = @"(?:[\s;,]+)"
let skipChar = @":"
// number of pressure and impedance sensors
let nPres = 36
let nImpd = 18
// trim the string and replace the skip-characters with ""
let clean (str : string) = Regex.Replace(str.Trim(), skipChar, "")
// split the string based on given delimiter pattern
let split str delimPat = Regex.Split(str |> clean, delimPat)
// split the string based on the delimPat delimiter pattern
let splitPat str = delimPat |> split str
// turn string to title case
let toTitleCase str =
let culInfo = new CultureInfo("en-US", false)
let txtInfo = culInfo.TextInfo
let titleCase = str |> txtInfo.ToTitleCase
let delimPat = @"(?:[\s]+)"
delimPat
|> split titleCase
|> String.concat ""
// process each line of data
// split each line of data based on a delimiter pattern
// turn each word into a title case
let processLine line =
line
|> splitPat
|> Array.map (fun elem -> elem |> toTitleCase)
// asynchronously read-in contents of data file
let readFileAsync file = async {
use stream = new FileStream(file, FileMode.Open, FileAccess.Read, FileShare.None)
use reader = new StreamReader(stream)
let! string = reader.ReadToEndAsync()
let content = @"\n" |> split string
return content
}
// write data into a file
let writeFile file (data : string[]) =
use stream = new FileStream(file, FileMode.Create, FileAccess.Write, FileShare.None)
use writer = new StreamWriter(stream)
data
|> Array.map ( fun line -> writer.WriteLine(line) )
// generate a times series record ID based on data file name
let recordId file =
let info = new FileInfo(file)
let name = info.Name
let extn = info.Extension
Regex.Replace(name, extn, "")
// asynchronously read-in contents of a data file and apply a transformation in-place
let readFileMapAsync file (f : string -> 'T) = async {
let! content = file |> readFileAsync
let result = content |> Array.Parallel.map (fun line -> line |> f)
return result
}
// generate an array of data file names
let rec allFiles folder =
[| for file in folder |> Directory.GetFiles do yield file
for subFolder in folder |> Directory.GetDirectories do yield! subFolder |> allFiles |]
// read-in the content of a data file
let readFile file =
processLine
|> readFileMapAsync file
|> Async.RunSynchronously
// separate data and metadata
let separateData file =
let flag = "Annotations"
let rawd = file |> readFile
// separate raw data to data and metadata
// Separation occurs at the line with "Annotations" flag.
// Lines above the flag line are data.
// Lines below the flag line are metadata.
let arrayPartition flg (arr : string[][]) =
let arrLen = arr.Length
let flagInd = arr |> Array.tryFindIndexBack (fun arr -> arr.[0] = flg)
let empty = [| Array.Empty<string>(); Array.Empty<string>() |]
let result =
match flagInd with
| None -> arr, empty
| Some ind ->
let count1 = ind
let count2 = arrLen - count1 - 1
let part1 = count1 |> Array.sub arr 0
let part2 =
count2
|> Array.sub arr (ind + 1)
|> Array.map ( fun arr ->
let part1 = arr.[0]
let part2 =
arr
|> Array.tail
|> Array.reduce (fun elem1 elem2 -> elem1 + " " + elem2)
[|part1; part2|] )
part1, part2
result
rawd |> arrayPartition flag
// partition data to time data, pressure data, and impedance data
let partition data =
let ncolsPressure = nPres
let ncolsImpedenc = nImpd
// auxiliary helper function for partitioning data to time, pressure, and impedance data
let dataPartition ncols1 ncols2 (data : 'T[][]) =
let len = ncols1 + ncols2 + 1
let data =
data.[1..]
|> Array.Parallel.map ( fun arr ->
let arrLen = arr.Length
if arrLen = len then
arr
else
fun i -> null
|> Array.init (len - arrLen)
|> Array.append arr )
let time = data |> Array.map ( fun arr -> arr.[0] )
let press = data |> Array.map ( fun arr -> Array.sub arr 1 ncols1)
let imped = data |> Array.map ( fun arr -> Array.sub arr (ncols1 + 1) ncols2 )
time, press, imped
data |> dataPartition ncolsPressure ncolsImpedenc
// reshape metadata from array[][] to (array[], array[])
let metadataReshape (metadata : string[][]) =
let emptyArr = Array.Empty<string>()
let empty = [| emptyArr; emptyArr |]
if metadata = empty then
emptyArr, emptyArr
else
let vals = metadata |> Array.map ( fun arr -> arr.[0] )
let info = metadata |> Array.map ( fun arr -> arr.[1] )
info, vals
// transform data for each file and collect in array
// add column headers
let generateDataFrame path =
// boundaries for each group in pressure and impedance measurements
let presLims = [| 1; 4; 7; 27; 32; 37 |]
let impdLims = [| 1; 3; 5; 14; 16; 19 |]
// helper function to check for tagging the time series
let predNum num lb ub = (num >= lb) && (num < ub)
// helper function to tag the pressure sensor time series
let presTagger (num: string) =
let num = int num
if (predNum num presLims.[0] presLims.[1]) then "Pharynx"
elif (predNum num presLims.[1] presLims.[2]) then "UES"
elif (predNum num presLims.[2] presLims.[3]) then "Esophagus"
elif (predNum num presLims.[3] presLims.[4]) then "LES"
elif (predNum num presLims.[4] presLims.[5]) then "Stomach"
else "Faulty"
// helper function to tag the impedance sensor time series
let impdTagger (num: string) =
let num = int num
if (predNum num impdLims.[0] impdLims.[1]) then "Pharynx"
elif (predNum num impdLims.[1] impdLims.[2]) then "UES"
elif (predNum num impdLims.[2] impdLims.[3]) then "Esophagus"
elif (predNum num impdLims.[3] impdLims.[4]) then "LES"
elif (predNum num impdLims.[4] impdLims.[5]) then "Stomach"
else "Faulty"
// transform raw data to tidy format ready for csv consumption
let transform file =
let data, meta = file |> separateData
let time, pres, impd = data |> partition
let info, vals = meta |> metadataReshape
let cutoff =
let cutoffConst = 3
let res = info |> tryFindIndexNth 2 (fun info -> info = "Wet Swallow")
match res with
| ValueSome ind -> vals.[ind]
| ValueNone -> time.[time.Length / cutoffConst - 1]
let ind =
let res = time |> Array.tryFindIndex (fun t -> float t >= float cutoff)
match res with
| Some ind -> ind - 1
| None -> time.Length - 1
let cutoffPres = pres.[0..ind]
let cutoffImpd = impd.[0..ind]
let recId = file |> recordId
let presRecs =
cutoffPres
|> Array.Parallel.mapi ( fun i row ->
row |> Array.mapi ( fun j x ->
let typ = "Pressure"
let num = string (j + 1)
let tag = presTagger num
[| recId; typ; num; time.[i]; tag; x |] ) )
|> Array.concat
let impdRecs =
cutoffImpd
|> Array.Parallel.mapi ( fun i row ->
row |> Array.mapi ( fun j x ->
let typ = "Impedance"
let num = string (j + 1)
let tag = impdTagger num
[| recId; typ; num; time.[i]; tag; x |] ) )
|> Array.concat
impdRecs |> Array.append presRecs
path
|> allFiles
|> Array.Parallel.collect (fun file -> file |> transform)
|> Array.append [| [| "ID"; "SensorType"; "SensorNumber"; "Tag"; "Time"; "Measurement" |] |]
If data is big then better to reduce count of iteration over it to create something new. For example, here:
let time = data |> Array.map ( fun arr -> arr.[0] )
let press = data |> Array.map ( fun arr -> Array.sub arr 1 ncols1 )
let imped = data |> Array.map ( fun arr -> Array.sub arr (ncols1 + 1) ncols2 )
Three times! Instead a simple for will be a better option:
let dataPartition ncols1 ncols2 (data : 'T[][]) =
let lng = ncols1 + ncols2 + 1
let len = data.Length - 1
let time = Array.zeroCreate len
let press = Array.zeroCreate len
let imped = Array.zeroCreate len
for i = 1 to data.Length-1 do
let arr = data.[i]
let arrLen = arr.Length
let arr' =
if arrLen = lng then arr
else
fun i -> null
|> Array.init (len - arrLen)
|> Array.append arr
time.[i - 1] <- arr'.[0]
press.[i - 1] <- Array.sub arr' 1 ncols1
imped.[i - 1] <- Array.sub arr' (ncols1 + 1) ncols2
time, press, imped
metadataReshape looks like unnecessary function. It uses only to reshape values from separateData. So, it seems like you can return it in a correct way from separateData. Part of the function is below:
let empty = Array.Empty<string>(), Array.Empty<string>()
let result =
match flagInd with
| None -> arr, empty
| Some ind ->
let count1 = ind
let count2 = arrLen - count1 - 1
let part1 = count1 |> Array.sub arr 0
let info = Array.zeroCreate count2
let vals = Array.zeroCreate count2
for i in (ind + 1)..(count2 - 1) do
info.[ i - ind - 1 ] <- arr.[i].[0]
let val' =
arr.[i]
|> Array.tail
|> String.concat " "
vals.[ i - ind - 1 ] <- val'
part1, (info, vals)
So the point is to flat it as much as possible.
If the purpose is only “read-transform-write” then I’d consider rewriting all of it in a lazy way - Seq instead of Array.
Hey, plenty appreciated. metadataReshape was a leftover from a different set of requirements. I implemented two of your suggestion albeit a tad differently with around 10% increase in speed.
let dataPartition ncols1 ncols2 (data : 'T[][]) =
let siz = data.Length
let len = ncols1 + ncols2 + 1
let time = Array.zeroCreate siz
let pres = Array.zeroCreate siz
let impd = Array.zeroCreate siz
data.[1..]
|> Array.Parallel.iteri ( fun ind arr ->
let arrLen = arr.Length
let arr' =
if arrLen = len then
arr
else
fun i -> null
|> Array.init (len - arrLen)
|> Array.append arr
time.[ind] <- arr'.[0]
pres.[ind] <- Array.sub arr' 1 ncols1
impd.[ind] <- Array.sub arr' (ncols1 + 1) ncols2 )
time, pres, impd
Turning the outer layer to seq (seq<string[]> rather than string[][]) makes sense in the code logic but it has a real nasty performance hit @ upwards of 3-4 mins for processing 2 files …I have tried Seq, PSeq, Stream, and ParStream on the outer layer without much luck!