Skip to content

Exec source #125

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ Current sources shipping with KSM include:
- GitLab (using Personal Auth Tokens)
- BitBucket
- Amazon S3
- Local Executable
- Build your own (and contribute back!)

# Building
Expand Down Expand Up @@ -199,6 +200,11 @@ The [default configurations](src/main/resources/application.conf) can be overwri
- `SOURCE_HTTP_AUTH_GOOGLEIAM_SERVICE_ACCOUNT` Google Service Account name.
- `SOURCE_HTTP_AUTH_GOOGLEIAM_SERVICE_ACCOUNT_KEY` Google Service Account Key in JSON string encoded. If not the key isn't configured, it'll try to get the token from environment.
- `SOURCE_HTTP_AUTH_GOOGLEIAM_TARGET_AUDIENCE` Google Target Audience for token authentication.
- `io.conduktor.ksm.source.ExecSourceAcl`: Get the ACL from the stdout of some executable. Allows the user to write their own executable (written in any language of their choosing) that generates the yaml or csv output defining the ACL.
- `SOURCE_EXEC_CMD`: Full path to the executable
- `SOURCE_EXEC_ARGS`: Arguments passed to the executable, they will be split by the below separator value. Defaults to ''
- `SOURCE_EXEC_ARGS_SEP`: String separator to split the argument value. Defaults to ','. For example, setting the args to 'a,b,c,d' and the separator to ',' will pass in the args [a, b, c, d] to the executable
- `SOURCE_EXEC_PARSER`: 'yaml' or 'csv', defaults to 'yaml'

- `NOTIFICATION_CLASS`: Class for notification in case of ACL changes in Kafka.
- `io.conduktor.ksm.notification.ConsoleNotification` (default): Print changes to the console. Useful for logging
Expand Down
11 changes: 11 additions & 0 deletions src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,17 @@ source {
password = ${?SOURCE_BITBUCKET_SERVER_AUTH_PASSWORD}
}
}
exec {
//command needs to be a full path
command = "/bin/false"
command = ${?SOURCE_EXEC_CMD}
args = ""
args = ${?SOURCE_EXEC_ARGS}
sep = ","
sep = ${?SOURCE_EXEC_ARGS_SEP}
parser = "yaml"
parser = ${?SOURCE_EXEC_PARSER}
}
bitbucket-cloud {
api {
url = "https://api.bitbucket.org/2.0"
Expand Down
100 changes: 100 additions & 0 deletions src/main/scala/io/conduktor/ksm/source/ExecSourceAcl.scala.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package io.conduktor.ksm.source

import com.typesafe.config.Config
import io.conduktor.ksm.parser.AclParserRegistry
import io.conduktor.ksm.source
import org.slf4j.LoggerFactory
import java.util.regex.Pattern

import sys.process._
import java.io._

class ExecSourceAcl(parserRegistry: AclParserRegistry) extends SourceAcl(parserRegistry) {

private val log = LoggerFactory.getLogger(classOf[ExecSourceAcl])

override val CONFIG_PREFIX: String = "exec"
final val COMMAND_CONFIG: String = "command"
final val COMMAND_ARGS_CONFIG: String = "args"
final val COMMAND_ARGS_SEP_CONFIG: String = "sep"
final val PARSER = "parser"

var command: List[String] = _
var parser: String = "csv"

/**
* internal config definition for the module
*/
override def configure(config: Config): Unit = {
//The command option should be required
val cmd = config.getString(COMMAND_CONFIG)

//For args we could use some defaults
val args: String = if (!config.hasPath(COMMAND_ARGS_CONFIG)) "" else config.getString(COMMAND_ARGS_CONFIG)
val sep: String = if (!config.hasPath(COMMAND_ARGS_SEP_CONFIG)) "," else config.getString(COMMAND_ARGS_SEP_CONFIG)

val parserType = if (!config.hasPath(PARSER)) "yaml" else config.getString(PARSER)
configure(cmd, args, sep, parserType)

}

def configure(command: String, args: String, sep: String, parser: String): Unit = {

this.command = List.concat(
List(command),
args.split(Pattern.quote(sep))
)
log.info("command: {}", this.command)

this.parser = parser
log.info("PARSER: {}", this.parser)
}

def configure(command: String, args: String, sep: String): Unit = {

configure(command, args, sep, this.parser)

}

override def refresh(): Option[ParsingContext] = {
val (return_code, stdout, stderr) = exec(command)

// If return_code is 0, the command was a success, parse out the stdout
return_code match {
case 0 =>
Some(
ParsingContext(
parserRegistry.getParser(this.parser),
new StringReader(stdout)
)
)
// Otherwise, assume something went wrong
case _ => {
log.error("Error executing the process, got return code {}", return_code)
log.debug("Stdout: {}", stdout)
log.error("Stderr: {}", stderr)
None
}
}
}

/**
* Close all the necessary underlying objects or connections belonging to this instance
*/
override def close(): Unit = {
// Do nothing
}

//Function here is taken from a StackOverflow answer I found
private def exec(cmd: Seq[String]): (Int, String, String) = {
val stdoutStream = new ByteArrayOutputStream
val stderrStream = new ByteArrayOutputStream
val stdoutWriter = new PrintWriter(stdoutStream)
val stderrWriter = new PrintWriter(stderrStream)
val exitValue = cmd.!(ProcessLogger(stdoutWriter.println, stderrWriter.println))
stdoutWriter.close()
stderrWriter.close()
(exitValue, stdoutStream.toString, stderrStream.toString)
}

}
72 changes: 72 additions & 0 deletions src/test/scala/io/conduktor/ksm/source/ExecSourceAclTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package io.conduktor.ksm.source

import io.conduktor.ksm.parser.AclParserRegistry
import io.conduktor.ksm.parser.yaml.YamlAclParser
import io.conduktor.ksm.parser.{AclParserRegistry}

import java.io.{File}
import java.nio.charset.StandardCharsets
import java.nio.file.{Files, Paths}
import kafka.security.auth._
import org.scalamock.scalatest.MockFactory
import org.scalatest.{FlatSpec, Matchers}

class ExecSourceAclTest extends FlatSpec with Matchers with MockFactory {

val yamlAclParser = new YamlAclParser()
val aclParserRegistryMock: AclParserRegistry = stub[AclParserRegistry]
(aclParserRegistryMock.getParserByFilename _).when(*).returns(yamlAclParser)

"Test 1" should "successfully parse exec output" in {

val yamlContent =
"""
|users:
| alice:
| topics:
| foo:
| - Read
| bar*:
| - Produce
| bob:
| groups:
| bar:
| - Write,Deny,12.34.56.78
| bob*:
| - All
| transactional_ids:
| bar-*:
| - All
| peter:
| clusters:
| kafka-cluster:
| - Create""".stripMargin

val file = File.createTempFile("ksm", "test")
val filepath = file.getAbsolutePath
println(filepath)
Files.write(
Paths.get(file.toURI),
yamlContent.getBytes(StandardCharsets.UTF_8)
)

val execSourceAcl = new ExecSourceAcl(aclParserRegistryMock)
execSourceAcl.configure("/bin/cat", filepath, "|", "yaml")

val parsingContext = execSourceAcl.refresh().get

yamlAclParser.aclsFromReader(parsingContext.reader).result.isRight shouldBe true
}

"Test 2" should "retun None on non-zero exit status" in {

val execSourceAcl = new ExecSourceAcl(aclParserRegistryMock)
execSourceAcl.configure("/bin/false", "", "|")

execSourceAcl.refresh() shouldBe None

}



}