A Brief Introduction to bdpar

Miguel Ferreiro Diaz

2020-11-25

Abstract

Bdpar is a tool to easily build customized data flows to pre-process large volumes of information from different sources. To this end, bdpar allows to (i) easily use and create new functionalities and (ii) develop new data source extractors according to the user needs. Additionally, the package provides by default a predefined data flow to extract and preprocess the most relevant information (tokens, dates, … ) from some textual sources (SMS, email, tweets, YouTube comments).

Introduction and basics

The package has been develop using R6 class for implement, for example, the extraction of input data and the 18 Pipes that there are in the application by default. Next, the different tools that make up the application are explained and how the features it offers can be extended.

In case of any more specific doubt, use the package help through help(package = “bdpar”).

Instance

To manage the information obtained from the input data, the package uses an structure, called Instance, which allows store the extracted properties of the different Pipes. Below are described all the fields comprising the Instance class.

  • source: The input data without modifications.
  • date: The date on which the source was generated or sent.
  • data: The input data with modifications.
  • properties: Contains a list of properties extracted from the data that is being processed.
  • path: Identifier of the input data.
  • isValid: Indicates if the Instance is valid or not.
  • flowPipes: The list contains the Pipes that the Instance has passed through.
  • banPipes: The list contains the Pipes that can not be executed from that moment.

Types of input data available by default

The package have four input data implemented, which are:

bdpar is able to load data source from multiple sources. In fact, by default is fully compatible with Email (eml), Twitter (twtid), SMS (tsms) and YouTube (ytbid) data sources. However, new data loaders can be easily implemented by implementing (i) a new class implementing obtainSource and obtainDate abstract methods from the Instance class and (ii) registering this new class on the ExtractorFactory class through its method registerExtractor.

Is important to take into account that the type of Instance used is deducted by default according to the file extension. However, this behaviour can be easily modelled according to user needs.

How to create a customized input data extractors.

This example shows how a new type of Instance (named ExtractorTytb) is be created. In this particular case, ExtractorTytb is responsible of extracting textual comments from Youtube files (extension .tytb).

library(R6)
ExtractorTytb <- R6Class(
  classname = "ExtractorTytb",
  inherit = Instance,
  public = list(
    initialize = function(path) {
      if (!"character" %in% class(path)) {
        stop("[ExtractorTytb][initialize][Error] ",
             "Checking the type of the variable: path ",
             class(path))
      }
      super$initialize(path)
    },
    obtainDate = function() {
      super$setDate(file.info(super$getPath())[["ctime"]])
    },
    obtainSource = function() {
      super$setSource(readLines(super$getPath(), 
                                warn = FALSE))
      super$setData(super$getSource())
    }
  )
)

Enabling a new Instance.

In order to automatically execute the new Instance class (ExtractorTytb), is must be registered through registerExtractor method of ExtractorFactory class. Below is shown an example describing how this extractor is registered.

library(bdpar)
extractors <- ExtractorFactory$new()
extractors$registerExtractor("tytb", ExtractorTytb)

Pipe

A pipe consists of a simple task responsible for generating a new output by applying some transformations over the input data. A set of sequentially interconnected pipes to achieve a required result is called pipeline. Pipes in bdpar are represented as GenericPipe class while the pipelining process is defined through GenericPipeline abstract class.

Dependencies

An important feature that has been added to the Pipe concept is the control of the functionalities that go before and after a Pipe, that is, a control of the preprocessing flow. On the one hand, this control allows us to ensure that if a Pipe needs another before, it has already been executed. On the other hand, prevents a Pipe can not be executed later because it interferes with the functionality of a previous Pipe.

This functionality can be customized in each of the pipes that the user uses and/or develops, allowing to decide what to do in the situation in which the dependencies are not respected. Internally, the operator always checks if the Instance satisfies the dependencies to enter the following pipe.

Pipes available by default

The framework provides over 18 different pipes (inherited from GenericPipe). Each pipe is classified following two categories: (i) basic-functionality pipes and (ii) external file access pipes.

(i) Pipes of basic functionality

File2Pipe

Obtains the source using the obtainSource method which implements the subclass of the superclass Instance. By default, the subclass implemented are ExtractorEml, ExtractorSms, ExtractorTwtid and ExtractorYtbid.

FindEmojiPipe

Creates a new emoji property where the emojis stored that are in the data attribute are stored. In addition, you can decide if you want to modify the data property of the Instance, removing the emojis that are found.

FindEmoticonPipe

Creates a new emoticon property where the emoticons stored that are in the data attribute are stored. In addition, you can decide if you want to modify the data property of the Instance, removing the emoticons that are found.

FindHashtagPipe

Creates a new hashtag property where the hashtags stored that are in the data attribute are stored. In addition, you can decide if you want to modify the data property of the Instance, removing the hashtags that are found.

FindUrlPipe

Creates a new URLs property where the URLs stored that are in the data attribute are stored. In addition, you can decide if you want to modify the data property of the Instance, removing the URLs that are found.

FindUserNamePipe

Creates a new userName property where the user names stored that are in the data attribute are stored. In addition, you can decide if you want to modify the data property of the Instance, removing the user names that are found.

GuessDatePipe

Obtains the date using the obtainDate method which implements the subclass of the superclass Instance. By default, the subclass implemented are ExtractorEml, ExtractorSms, ExtractorTwtid and ExtractorYtbid.

GuessLanguagePipe

Guesses the language by using language detector of library cld2. Creates the language property which indicates the idiom text. Optionally, it is possible to choose the language provided by Twitter.

MeasureLengthPipe

Creates the length property which indicates the length of the text. The property’s name is customize throught the class constructor.

StoreFileExtPipe

Creates the extension property which indicates file’s extension.

TargetAssigningPipe

Identifies the class of the Instance (target attribute), starting with the path of the file.

TeeCSVPipe

Generates a CSV from the properties of the Instance.

ToLowerCasePipe

Converts the data of an Instance to lower case.

(ii) Pipes that access external files

AbbreviationPipe

Creates a new property abbreviation, where the abbreviations that are in the data attribute are stored. In addition, you can decide if you want to modify the data property of the Instance, replacing the abbreviations found for its extended version. The abbreviations and their substitutions have to be stored in files of type json the file associated with the language of the text is chosen. The loading and handling of the information of these files will be done through the ResourceHandler class.

ContractionPipe

Creates a new property contraction, where the contractions that are in the data attribute are stored. In addition, you can decide if you want to modify the data property of the Instance, replacing the contractions found for its extended version. The contractions and their substitutions have to be stored in files of type json the file associated with the language of the text is chosen. The loading and handling of the information of these files will be done through the ResourceHandler class.

InterjectionPipe

Creates a new property interjection, where the interjections that are in the data attribute are stored. In addition, you can decide if you want to modify the data property of the Instance, removing interjections. The interjections will be stored in files of type json and the file associated with the language of the text is chosen. The loading and handling of the information of these files will be done through the ResourceHandler class.

SlangPipe

Creates a new property slang, where the slang words that are in the data attribute are stored. In addition, you can decide if you want to modify the data property of the Instance, replacing the contractions for its extended version. The words slang and their substitutions hava to be stored in files of type json and the file associated with the language of the text is chosen. The loading and handling of the information of these files will be done through the ResourceHandler class.

StopWordPipe

Creates a new property stopwords, where the empty words that are in the data attribute are stored. In addition, you can decide if you want to modify the data property of the Instance, removing the empty words. The empty words will be stored in files of type json and the file associated with the language of the text is chosen. The loading and handling of the information of these files will be done through the ResourceHandler class.

How to create your customized Pipe

Additionally, in order to improve the customization capabilities, bdpar allows to easily desing and develop new personalized pipes by implementing the pipe method included in the GenericPipe class (inheritance relation). The code included below, exemplified the creation of a simple pipe in charge of removing multiple consecutive spaces from a string.

library(R6)
RemovesWhiteSpaces <- R6Class(
  "RemovesWhiteSpaces",
  inherit = GenericPipe,
  public = list(
    initialize = function(propertyName = "",
                          alwaysBeforeDeps = list(),
                          notAfterDeps = list()) {
      if (!"character" %in% class(propertyName)) {
        stop("[RemovesWhiteSpaces][initialize][Error] ",
             "Checking the type of the 'propertyName' variable: ",
             class(propertyName))
      }
      if (!"list" %in% class(alwaysBeforeDeps)) {
        stop("[RemovesWhiteSpaces][initialize][Error] ",
             "Checking the type of the 'alwaysBeforeDeps' variable: ",
             class(alwaysBeforeDeps))
      }
      if (!"list" %in% class(notAfterDeps)) {
        stop("[RemovesWhiteSpaces][initialize][Error] ",
             "Checking the type of the 'notAfterDeps' variable: ",
             class(notAfterDeps))
      }
      super$initialize(propertyName, alwaysBeforeDeps, notAfterDeps)
    },
    pipe = function(instance) {
      if (!"Instance" %in% class(instance)) {
        stop("[RemovesWhiteSpaces][pipe][Error] ",
             "Checking the type of the 'instance' variable: ",
             class(instance))
      }
      instance$setData(trimws(x = instance$getData()))
      
      if (length(instance$getData()) == 0) {
        instance$invalidate()
      }
      
      return(instance)
    }
  )
)

Flow of Pipes (pipelining proccess)

Flow of pipes is the set of pipes that comprising the whole preprocessing proccess. By default bdpar provides a default pipelining proccess (implemented in DefaultPipeline) comprising all the 18 available pipes.

Flow of Pipes available by default

The code included below shows a pipelining example comprising 18 pipes:

  instance %>|%
    TargetAssigningPipe$new() %>|%
    StoreFileExtPipe$new() %>|%
    GuessDatePipe$new() %>|%
    File2Pipe$new() %>|%
    MeasureLengthPipe$new(propertyName = "length_before_cleaning_text") %>|%
    FindUserNamePipe$new() %>|%
    FindHashtagPipe$new() %>|%
    FindUrlPipe$new() %>|%
    FindEmoticonPipe$new() %>|%
    FindEmojiPipe$new() %>|%
    GuessLanguagePipe$new() %>|%
    ContractionPipe$new() %>|%
    AbbreviationPipe$new() %>|%
    SlangPipe$new() %>|%
    ToLowerCasePipe$new() %>|%
    InterjectionPipe$new() %>|%
    StopWordPipe$new() %>|%
    MeasureLengthPipe$new(propertyName = "length_after_cleaning_text") %>|%
    TeeCSVPipe$new()

Create your own flow of Pipes

Additionally, in order to build a flexible framework, bdpar allows users to define their own (and customized) flow of pipes. To accomplish this task, it is necessary to create a new class that inherits from GenericPipeline and implements the execute() method. Below is included an example of how a new pipe (called TestPipeline) is created:

library(R6)
library(bdpar)
TestPipeline <- R6Class(
  "TestPipeline",
  inherit = GenericPipeline,
  public = list(
    initialize = function() {
    },
    execute = function(instance) {
      if (!"Instance" %in% class(instance)) {
        stop("[TestPipeline][execute][Error] ",
             "Checking the type of the 'instance' variable: ",
             class(instance))
      }
      message("[TestPipeline][execute][Info] ", instance$getPath())
      tryCatch(
        instance %>|%
          TargetAssigningPipe$new() %>|%
          StoreFileExtPipe$new() %>|%
          File2Pipe$new() %>|%
          RemovesWhiteSpaces$new() %>|%
          TeeCSVPipe$new()
        ,
        error = function(e) {
          message("[TestPipeline][execute][Error]", instance$getPath(), " :", paste(e))
          instance$invalidate()
        }
      )
      return(instance)
    }
  )
)

Alternatively, the pre-processing flow can be dynamically constructed through the DynamicPipeline class. An example of its use is shown below. To see the rest of the options offered by this method, access the package’s help through help(package = “bdpar”).

library(bdpar)
pipeline <- DynamicPipeline$new()
pipeline$add(list(TargetAssigningPipe$new(),StoreFileExtPipe$new(),File2Pipe$new()), pos = NULL)
pipeline$add(list(TeeCSVPipe$new()), pos = NULL)

Operator

To manage the flow of pipes, there is a new operator called %>|%. This operator allows check if the Instance was invalidated in the lastest Pipe or not. In the case that the Instance is invalid, the flow of Pipes stops and the preprocessing pass to the next Instance. In the other case, the Instance continues the current flow of Pipes. On the other hand, it facilitates the workflow between pipes because it automatically invokes the pipe function of the objects that pass through it. Finally, it should be noted that it also checks whether an Instance complies with the requirements to enter the following pipe.

bdpar.Options

The bdpar.Options object is used to store the different configuration parameters of the pipes used in the preprocessing. For example, to indicate the keys used to work with the APIs that require it (such as YouTube or Twitter) as well as various configuration parameters that allow to customize the behavior of the application such as the choice of text format to use in case there are multipart emails (plain text or text in html format). It is important to keep in mind that if the parameters are not needed, the value can be omitted.

The different options that exist initially and how their value is indicated to you are shown below.

# [eml]
bdpar.Options$set("extractorEML.mpaPartSelected", <<PartSelectedOnMPAlternative>>)

# [resources]
bdpar.Options$set("resources.abbreviations.path", <<abbreviation.path>>)
bdpar.Options$set("resources.contractions.path", <<contractions.path>>)
bdpar.Options$set("resources.interjections.path", <<interjections.path>>)
bdpar.Options$set("resources.slangs.path", <<slangs.path>>)
bdpar.Options$set("resources.stopwords.path", <<stopwords.path>>)

# [twitter]
bdpar.Options$set("twitter.consumer.key", <<consumer_key>>)
bdpar.Options$set("twitter.consumer.secret", <<consumer_secret>>)
bdpar.Options$set("twitter.access.token", <<access_token>>)
bdpar.Options$set("twitter.access.token.secret", <<access_token_secret>>)
bdpar.Options$set("cache.twitter.path", <<cache.path>>)

# [teeCSVPipe]
bdpar.Options$set("teeCSVPipe.output.path", <<outputh.path>>)

# [youtube]
bdpar.Options$set("youtube.app.id", <<app_id>>)
bdpar.Options$set("youtube.app.password", <<app_password>>)
bdpar.Options$set("cache.youtube.path", <<cache.path>>)

# [cache]
bdpar.Options$set("cache", <<status_cache>>)
bdpar.Options$set("cache.folder", <<cache.path>>)

# [parallel]
bdpar.Options$set("numCores", <<num_cores>>)

# [verbose]
bdpar.Options$set("verbose", <<status_verbose>>)

Logging on bdpar

The bdpar log is configured through the configureLog function. This system manages both the place to display the messages and the priority level of each message showing only the messages with a higher level than indicated in the threshold variable. The log can be output either by console, by file, or both.

bdpar.options$configureLog(console = TRUE, threshold = "INFO", file = "fileLog.log")

The type of message changes according to the level indicated:

  • The DEBUG, INFO and ERROR levels return a text using the message function.
  • The WARN level returns a text using the warning function.
  • The FATAL level returns a text using the stop function.

If the case of wanting to deactivate the bdpar log completely, the disableLog method in bdpar.Options object does this task. It is important to note that the messages with WARN or FATAL level will still be displayed by console.

bdpar.options$disableLog()

In the case of wanting to see the status of the instances at the output of each pipe, the DEBUG logging mode must be activated. An example of the output could be:

[0000-00-00 00:00:00][pipeOperator][freduce][DEBUG] Instance_ID:1 (Last pipe: TargetAssigningPipe)
    Path: /_ham_/example.eml
    Date: 
    IsValid: TRUE
    Source: ""
    Data: ""
    FlowPipes: TargetAssigningPipe
    BanPipes: 
    Properties: 
        - target: ham

If the “verbose” option is enabled, for example in the runPipeline function, only messages with WARN or FATAL levels will be displayed by the console.

Cache functionality

If the bdpar cache is configured through the “cache” and “cache.folder” options, the status of the instances will be stored after each pipe. This allows to avoid rejections of previously executed tasks, if the order and configuration of the pipe and pipeline is the same as what is stored in the cache. In case of using the cache parameter, for example in the runPipeline function, it will set the option to the input value.

bdpar.Options$set("cache", TRUE)
bdpar.Options$set("cache.folder", "path/cache/folder")

If you want to remove the cache, the cleanCache method does this task.

Parallelization

The bdpar package allows the parallelization of the processing of Instances in order to save computational time. This functionality is configured through the “numCores” option (included in bdpar.Options) which will indicate the number of cores to be used in the execution.

bdpar.Options$set("numCores", 2)

It should be mentioned that in the case of parallelization, the output of the cores’ log will only be available in file mode.

Development

The bdpar package is also available in a development version at the Github development page: github.com/miferreiro/bdpar