Apache NiFi ExecuteScript: Groovy script для замены значений Json через файл сопоставления

9 paranza [2016-06-01 22:42:00]

Я работаю с Apache NiFi 0.5.1 на Groovy script, чтобы заменить входящие значения Json теми, что содержатся в файле сопоставления. Файл сопоставления выглядит так (это простой .txt):

Header1;Header2;Header3
 A;some text;A2

Я начал со следующего:

import groovy.json.JsonBuilder 
import groovy.json.JsonSlurper 
import java.nio.charset.StandardCharsets 

def flowFile = session.get(); 
if (flowFile == null) { 
    return; 
} 

flowFile = session.write(flowFile, 
        { inputStream, outputStream -> 

            def content = """ 
{ 
  "field1": "A"
  "field2": "A", 
  "field3": "A" 

}""" 

            def slurped = new JsonSlurper().parseText(content) 
            def builder = new JsonBuilder(slurped) 
            builder.content.field1 = "A"
            builder.content.field2 = "some text" 
            builder.content.field3 = "A2" 
            outputStream.write(builder.toPrettyString().getBytes(StandardCharsets.UTF_8)) 
        } as StreamCallback) 
session.transfer(flowFile, ExecuteScript.REL_SUCCESS)

Этот первый шаг работает очень хорошо, хотя он жестко запрограммирован и далеко не идеален. Моя первоначальная мысль заключалась в том, чтобы использовать ReplaceTextWithMapping, чтобы иметь возможность выполнять подстановки, однако он не очень хорошо работает со сложными файлами сопоставления (например, с несколькими столбцами). Я хотел бы принять это дальше, но я не уверен, как это сделать. Прежде всего, вместо того, чтобы проходить во всем закодированном JSON, я хотел бы прочитать входящий поток. Как это возможно в NiFi? Прежде чем запускать script как часть ExecuteScript, у меня есть файл .Json с содержимым через UpdateAttribute, где filename = myResultingJSON.json. Кроме того, я знаю, как загрузить файл .txt с помощью Groovy (String mappingContent= new File('/path/to/file').getText('UTF-8'), однако как использовать загруженный файл для выполнения замещений, чтобы мой итоговый JSON выглядел следующим образом:

{ 
  "field1": "A"
  "field2": "some text", 
  "field3": "A2" 
}

Благодарим вас за помощь,

я.

EDIT:

Первая модификация script позволяет мне читать из InputStream:

import groovy.json.JsonBuilder
import groovy.json.JsonSlurper

import java.nio.charset.StandardCharsets

def flowFile = session.get();
if (flowFile == null) {
    return;
}

flowFile = session.write(flowFile,
        { inputStream, outputStream ->

            def content = org.apache.commons.io.IOUtils.toString(inputStream, java.nio.charset.StandardCharsets.UTF_8)

            def slurped = new JsonSlurper().parseText(content)
            def builder = new JsonBuilder(slurped)
            builder.content.field1 = "A"
            builder.content.field2 = "some text" 
            builder.content.field3 = "A2" 
            outputStream.write(builder.toPrettyString().getBytes(StandardCharsets.UTF_8))
        } as StreamCallback)
session.transfer(flowFile, ExecuteScript.REL_SUCCESS)

Затем я перешел к тестированию подхода с помощью ConfigSlurper и написал общий класс, прежде чем вводить логику в Groovy ExecuteScript:

class TestLoadingMappings {

    static void main(String[] args) {

        def content = '''
         {"field2":"A",
         "field3": "A"
         }
         '''

        println "This is the content of the JSON file" + content

        def slurped = new JsonSlurper().parseText(content)
        def builder = new JsonBuilder(slurped)

        println "This is the content of my builder " + builder

        def propertiesFile = new File("D:\\myFile.txt")
        Properties props = new Properties()
        props.load(new FileInputStream(propertiesFile))
        def config = new ConfigSlurper().parse(props).flatten()

        println "This is the content of my config " + config

        config.each { k, v ->
            if (builder[k]) {
                builder[k] = v
            }
        }
        println(builder.toPrettyString())
    }

}

Я возвращаюсь с groovy.lang.MissinPropertyException, и это потому, что сопоставление не так просто. Все поля/свойства (от поля1 до поля3) входят в InpuStream с тем же значением (например,), и это означает, что каждый раз, когда поле2, например, имеет это значение, вы можете быть уверены, что оно будет действительным для двух других свойств. Однако у меня не может быть поля отображения, которое отображает "field2": "someText", потому что фактическое сопоставление управляется первым значением в файле сопоставления. Вот пример:

{ 
      "field1": "A"
      "field2": "A", 
      "field3": "A" 

 }

В моем файле сопоставления я:

A;some text;A2

Однако поле1 нуждается в сопоставлении с A (первое значение в файле) или останется неизменным, если хотите. Field2 нуждается в сопоставлении с значением в последнем столбце (A2), и, наконец, Field3 нуждается в сопоставлении с "некоторым текстом" в среднем столбце.

Вы можете помочь с этим? Это то, что я могу достичь с помощью Groovy и ExecuteScript. При необходимости я могу разделить конфигурационные файлы на два.

Кроме того, я быстро посмотрел на другой вариант (PutDistributedMapCache), и я не уверен, что понял, как загрузить пары ключ-значение в кэш распределенной карты. Похоже, вам нужно иметь DistributedMapCacheClient, и я не уверен, что это можно легко реализовать.

Спасибо!

ИЗМЕНИТЬ 2:

Некоторый другой прогресс, теперь я работаю с отображением, но не уверен, почему он не работает при чтении второй строки файла свойств:

"A" someText
"A2" anotherText

class TestLoadingMappings {

    static void main(String[] args) {

        def content = '''
         {"field2":"A",
         "field3":"A"
         }
         '''

        println "This is the content of the JSON file" + content

        def slurper = new JsonSlurper().parseText(content)
        def builder = new JsonBuilder(slurper)

        println "This is the content of my builder " + builder

        assert builder.content.field2 == "A"
        assert builder.content.field3 == "A"

        def propertiesFile = new File('D:\\myTest.txt')
        Properties props = new Properties()
        props.load(new FileInputStream(propertiesFile))
        println "This is the content of the properties " + props
        def config = new ConfigSlurper().parse(props).flatten()

        config.each { k, v ->
            if (builder.content.field2) {

                builder.content.field2 = config[k]
            }
            if (builder.content.field3) {

                builder.content.field3 = config[k]
            }

            println(builder.toPrettyString())
            println "This is my builder " + builder
        }
    }
}

Я возвращаюсь с помощью: This is my builder {"field2":"someText","field3":"someText"}

Любая идея, почему?

Большое вам спасибо

ИЗМЕНИТЬ 3 (перемещен снизу)

Я написал следующее:

    import groovy.json.JsonBuilder
    import groovy.json.JsonSlurper

    class TestLoadingMappings {

        static void main(String[] args) {

            def content =
            '''
            {"field2":"A",
             "field3":"A"
            }
            '''
            def slurper = new JsonSlurper().parseText(content)
            def builder = new JsonBuilder(slurper)

            println "This is the content of my builder " + builder

            def propertiesFile = new File('D:\\properties.txt')
            Properties props = new Properties()
            props.load(new FileInputStream(propertiesFile))
            def conf = new ConfigSlurper().parse(props).flatten()

            conf.each { k, v ->
            if (builder.content[k]) {
                builder.content[k] = v
            }
            println("This prints the resulting JSON :" + builder.toPrettyString())
        }
    }
}

Однако мне пришлось изменить структуру файла сопоставления следующим образом:

"field1"="substitutionText"
"field2"="substitutionText2"

Затем я включил ConfigSlurper в ExecuteScript script, как показано ниже:

import groovy.json.JsonBuilder
import groovy.json.JsonSlurper
import org.apache.commons.io.IOUtils
import org.apache.nifi.processor.io.StreamCallback

import java.nio.charset.StandardCharsets

def flowFile = session.get();
if (flowFile == null) {
    return;
}

flowFile = session.write(flowFile,
        { inputStream, outputStream ->

            def content = IOUtils.toString(inputStream, StandardCharsets.UTF_8)

            def slurped = new JsonSlurper().parseText(content)
            def builder = new JsonBuilder(slurped)
            outputStream.write(builder.toPrettyString().getBytes(StandardCharsets.UTF_8))

            def propertiesFile = new File(''D:\\properties.txt')
            Properties props = new Properties()
            props.load(new FileInputStream(propertiesFile))
            def conf = new ConfigSlurper().parse(props).flatten();

            conf.each { k, v ->
                if (builder.content[k]) {
                    builder.content[k] = v
                }
            }
            outputStream.write(content.toString().getBytes(StandardCharsets.UTF_8))
        } as StreamCallback)
session.transfer(flowFile, ExecuteScript.REL_SUCCESS)

Проблема заключается в том, что я не могу реально реплицировать логику в исходном файле сопоставления, используя что-то похожее на то, что было создано в моем TestLoadingMappings. Как упоминалось в моих предыдущих комментариях/изменениях, отображение должно работать следующим образом:

field2 = если A затем заменить на "некоторый текст"

field3 =, если A затем заменить на A2

...

field2 = B, затем замените на "другой текст"

field3 = B, затем замените B2

и сын.

В двух словах сопоставления управляются входящим значением в InputStream (который меняется), который условно отображает различные значения в зависимости от атрибута JSON. Можете ли вы рекомендовать лучший способ достичь этого сопоставления с помощью Groovy/ExecuteScript? У меня есть гибкость в изменении файла сопоставления, можете ли вы увидеть способ, которым я могу его изменить, чтобы достичь желаемых сопоставлений?

Спасибо

json groovy apache-nifi


1 ответ


9 mattyb [2016-06-02 05:31:00]

У меня есть несколько примеров того, как читать в файле потока, содержащем JSON:

http://funnifi.blogspot.com/2016/02/executescript-explained-split-fields.html http://funnifi.blogspot.com/2016/05/validating-json-in-nifi-with.html http://funnifi.blogspot.com/2016/02/executescript-processor-replacing-flow.html

У вас есть правильная структура выше; в основном вы можете использовать эту переменную "inputStream" в закрытии для чтения содержимого входящего потока. Если вы хотите прочитать все сразу (что вам, скорее всего, нужно будет сделать для JSON), вы можете использовать IOUtils.toString(), за которым следует JsonSlurper, как это сделано в примерах в ссылках выше.

Для вашего файла сопоставления, особенно если ваш JSON "плоский", у вас может быть файл Java Properties, сопоставляющий имя поля с новым значением:

field2 = некоторый текст

field3 = А2

Просмотрите ConfigSlurper для чтения в файлах свойств.

После того, как вы удалили входящий JSON файл и прочитали в файле сопоставления, вы можете получить в отдельных полях JSON, используя нотацию массива, вместо прямой записи элемента. Поэтому скажем, что я прочитал свойства в ConfigSlurper, и я хочу перезаписать любое существующее свойство в моем входе JSON (называемом "json" для примера) с одним из файла свойств. Это может выглядеть так:

config.parse(props).flatten().each { k,v ->
  if(json[k]) {
    json[k] = v
  }
}

Затем вы можете продолжить работу с вашим outputStream.write().

Вместо чтения ваших сопоставлений из файла вы также можете загрузить его в распределенный кеш с помощью процессора PutDistributedMapCache. Вы можете читать из DistributedCacheMapServer в ExecuteScript, у меня есть пример здесь:

http://funnifi.blogspot.com/2016/04/inspecting-your-nifi.html

Если ваше сопоставление сложное, вы можете использовать процессор TransformJSON, который будет доступен в следующей версии NiFi (0.7.0). Связанный случай с Джирой находится здесь:

https://issues.apache.org/jira/browse/NIFI-361

ИЗМЕНИТЬ

В ответ на ваши изменения я не понял, что у вас есть несколько правил для разных значений. В этом случае файл свойств, вероятно, не лучший способ представления отображений. Вместо этого вы можете использовать JSON:

{
  "field2": {
         "A": "some text",
         "B": "some other text"
       },
  "field3": {
         "A": "A2",
         "B": "B2"
       }
}

Затем вы можете использовать JSONSlurper для чтения в файле сопоставлений. Ниже приведен пример использования приведенного выше файла сопоставления:

import groovy.json.JsonBuilder
import groovy.json.JsonSlurper
import org.apache.commons.io.IOUtils
import org.apache.nifi.processor.io.StreamCallback

import java.nio.charset.StandardCharsets

def flowFile = session.get();
if (flowFile == null) {
    return;
}

def mappingJson = new File('/Users/mburgess/mappings.json').text

flowFile = session.write(flowFile, { inputStream, outputStream ->

    def content = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
    def inJson = new JsonSlurper().parseText(content)
    def mappings = new JsonSlurper().parseText(mappingJson)

    inJson.each {k,v -> 
        inJson[k] = mappings[k][v]
    }
    outputStream.write(inJson.toString().getBytes(StandardCharsets.UTF_8))
} as StreamCallback)

session.transfer(flowFile, REL_SUCCESS)